Fork/Join Framework
This article is part of our Academy Course titled Java Concurrency Essentials.
In this course, you will dive into the magic of concurrency. You will be introduced to the fundamentals of concurrency and concurrent code and you will learn about concepts like atomicity, synchronization and thread safety. Check it out here!
Table Of Contents
1. Introduction
This article gives an introduction into the Fork/Join Framework that is part of the JDK since version 1.7. It describes the basic features of the frameworks and provides some examples in order to provide some practical experience.
2. Fork/Join
The base class of the Fork/Join Framework is java.util.concurrent.ForkJoinPool
. This class implements the two interfaces Executor
and ExecutorService
and subclasses the AbstractExecutorService
. Hence the ForkJoinPool
is basically a thread pool that takes special kinds of tasks, namely the ForkJoinTask
. This class implements the already known interface Future
and therewith methods like get()
, cancel()
and isDone()
. Beyond that this class also provides two methods that gave the whole framework its name: fork()
and join()
.
While a call of fork()
will start an asynchronous execution of the task, a call of join()
will wait until the task has finished and retrieve its result. Hence we can split a given task into multiple smaller tasks, fork each task and finally wait for all tasks to finish. This makes the implementation of complex problems easier.
In computer science this approach is also known as divide-and-conquer approach. Every time a problem is too complex to solve at once, it is divided into multiple smaller and easier to solve problems. This can be written in pseudo code like that:
if(problem.getSize() > THRESHOLD) { SmallerProblem smallerProblem1 = new SmallerProblem(); smallerProblem1.fork(); SmallerProblem smallerProblem2 = new SmallerProblem(); smallerProblem2.fork(); return problem.solve(smallerProblem1.join(), smallerProblem2.join()); } else { return problem.solve(); }
First we check if the current size of the problem is bigger than a given threshold. If this is the case, we divide the problem into smaller problems, fork()
each new task and then wait for the results by calling join()
. As join()
returns the results for each subtask, we have to find the best solution of the smaller problems and return this as our best solution. These steps are repeated until the given threshold is too low and the problem so small that we can compute its solution directly without further division.
2.1. RecursiveTask
To grasp this procedure a little bit better, we implement an algorithm that finds the smallest number within an array of integer values. This problem is not one you would solve in your day to day work using a ForkJoinPool
, but the following implementation shows the basic principles very clearly. In the main()
method we setup an integer array with random values and create a new ForkJoinPool
.
The first parameter passed to its constructor is an indicator for the level of desired parallelism. Here we query the Runtime
for the number of available CPU cores. Then we call the invoke()
method and pass an instance of FindMin
. FindMin
extends the class RecursiveTask
, which itself is a subclass of the ForkJoinTask
mentioned before. The class ForkJoinTask
has actually two subclasses: one is designed for tasks that return a value (RecursiveTask
) and one that is designed for tasks without return value (RecursiveAction
). The superclass forces us to implement compute()
. Here we take a look at the given slice of the integer array and decide whether the current problem is too big to be solved immediately or not.
When finding the smallest number within an array, the smallest problem size to be solved directly is to compare two elements with each other and return the smallest value of these. If we have currently more than two elements, we divide the array into two parts and find again the smallest number within these two parts. This is done by creating two new instances of FindMin
.
The constructor is fed with the array and the start and end index. Then we start the execution of these two task asynchronously by calling fork()
. This call submits the two tasks in the queue of the thread pool. The thread pool implements a strategy called work-stealing, i.e. if all other threads have enough to do, the current threads steals its work from one of the other tasks. This makes sure that tasks get executed as fast as possible.
public class FindMin extends RecursiveTask<Integer> { private static final long serialVersionUID = 1L; private int[] numbers; private int startIndex; private int endIndex; public FindMin(int[] numbers, int startIndex, int endIndex) { this.numbers = numbers; this.startIndex = startIndex; this.endIndex = endIndex; } @Override protected Integer compute() { int sliceLength = (endIndex - startIndex) + 1; if (sliceLength > 2) { FindMin lowerFindMin = new FindMin(numbers, startIndex, startIndex + (sliceLength / 2) - 1); lowerFindMin.fork(); FindMin upperFindMin = new FindMin(numbers, startIndex + (sliceLength / 2), endIndex); upperFindMin.fork(); return Math.min(lowerFindMin.join(), upperFindMin.join()); } else { return Math.min(numbers[startIndex], numbers[endIndex]); } } public static void main(String[] args) { int[] numbers = new int[100]; Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < numbers.length; i++) { numbers[i] = random.nextInt(100); } ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); Integer min = pool.invoke(new FindMin(numbers, 0, numbers.length - 1)); System.out.println(min); } }
2.2. RecursiveAction
As mentioned above next to RecursiveTask
we also have the class RecursiveAction
. In contrast to RecursiveTask
it does not have to return a value, hence it can be used for asynchronous computations that can be directly performed on a given data structure. Such an example is the computation of a grayscale image out of a colored image. All we have to do is to iterate over each pixel of the image and compute the grayscale value out of the RGB value using the following formula:
gray = 0.2126 * red + 0.7152 * green + 0.0722 * blue
The floating point numbers represent how much the specific color contributes to our human perception of gray. As the highest value is used for green, we can conclude that a grayscale image is computed to nearly 3/4 just of the green part. So the basic implementation would look like this, assuming that image is our object representing the actual pixel data and the methods setRGB()
and getRGB()
are used to retrieve the actual RGB value:
for (int row = 0; row < height; row++) { for (int column = 0; column < bufferedImage.getWidth(); column++) { int grayscale = computeGrayscale(image.getRGB(column, row)); image.setRGB(column, row, grayscale); } }
The above implementation works fine on a single CPU machine. But if we have more than one CPU available, we might want to distribute this work to the available cores. So instead of iterating in two nested for loops over all pixels, we can use a ForkJoinPool
and submit a new task for each row (or column) of the image. Once one row has been converted to grayscale, the current thread can work on another row.
This principle is implemented in the following example:
public class GrayscaleImageAction extends RecursiveAction { private static final long serialVersionUID = 1L; private int row; private BufferedImage bufferedImage; public GrayscaleImageAction(int row, BufferedImage bufferedImage) { this.row = row; this.bufferedImage = bufferedImage; } @Override protected void compute() { for (int column = 0; column < bufferedImage.getWidth(); column++) { int rgb = bufferedImage.getRGB(column, row); int r = (rgb >> 16) & 0xFF; int g = (rgb >> 8) & 0xFF; int b = (rgb & 0xFF); int gray = (int) (0.2126 * (float) r + 0.7152 * (float) g + 0.0722 * (float) b); gray = (gray << 16) + (gray << 8) + gray; bufferedImage.setRGB(column, row, gray); } } public static void main(String[] args) throws IOException { ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); BufferedImage bufferedImage = ImageIO.read(new File(args[0])); for (int row = 0; row < bufferedImage.getHeight(); row++) { GrayscaleImageAction action = new GrayscaleImageAction(row, bufferedImage); pool.execute(action); } pool.shutdown(); ImageIO.write(bufferedImage, "jpg", new File(args[1])); } }
Within our main() method we read the image using Java’s ImageIO
class. The returned instance of BufferedImage
has all the methods we need. We can query the number of rows and columns and retrieve and set the RGB value for each pixel. So all we do is to iterate over all rows and submit a new GrayscaleImageAction
to our ForkJoinPool
. The latter has received a hint about the available processors as a parameter to his constructor.
The ForkJoinPool
now starts the tasks asynchronously by invoking their compute()
method. In this method we iterate over each row and update the corresponding RGB values by its grayscale value. After having submitted all tasks to the pool we wait in the main thread for the shutdown of the whole pool and then write the updated BufferedImage
back to disc by using the ImageIO.write()
method.
Astonishingly we need only a few lines of code more than we would have needed without using the available processors. This again shows how much work we can save by using the available resources of the java.util.concurrent
package.
The ForkJoinPool
offers three different methods for submitting a task:
execute(ForkJoinTask)
: This methods executes the given task asynchronously. It has no return value.invoke(ForkJoinTask)
: This methods awaits the tasks return value.submit(ForkJoinTask)
: This methods executes the given task asynchronously. It returns a reference to the task itself. Hence the task reference can be used to query the result (as it implements the interfaceFuture
).
With this knowledge it is clear why we have submitted the GrayscaleImageAction
above using the method execute(). If we would have used invoke()
instead, the main thread would have waited for the task to finish and we would not have utilized the available level of parallelism.
We find the same differences when we take a closer look at the ForkJoinTask-API:
ForkJoinTask.fork()
: TheForkJoinTask
is executed asynchronously. It has no return value.ForkJoinTask.invoke()
: Immediately executes theForkJoinTask
and returns the result after completion.
2.3. ForkJoinPool and ExecutorService
Now that we know the ExecutorService
and the ForkJoinPool
, you may ask yourself why we should use the ForkJoinPool
and not the ExecutorService
. The difference between both is not that big. Both have execute()
and submit()
methods and take either instances of some common interface like Runnable
, Callable
, RecursiveAction
or RecursiveTask
.
To understand the difference a little bit better, let’s try to implement the FindMin
class from above using an ExecutorService
:
public class FindMinTask implements Callable<Integer> { private int[] numbers; private int startIndex; private int endIndex; private ExecutorService executorService; public FindMinTask(ExecutorService executorService, int[] numbers, int startIndex, int endIndex) { this.executorService = executorService; this.numbers = numbers; this.startIndex = startIndex; this.endIndex = endIndex; } public Integer call() throws Exception { int sliceLength = (endIndex - startIndex) + 1; if (sliceLength > 2) { FindMinTask lowerFindMin = new FindMinTask(executorService, numbers, startIndex, startIndex + (sliceLength / 2) - 1); Future<Integer> futureLowerFindMin = executorService.submit(lowerFindMin); FindMinTask upperFindMin = new FindMinTask(executorService, numbers, startIndex + (sliceLength / 2), endIndex); Future<Integer> futureUpperFindMin = executorService.submit(upperFindMin); return Math.min(futureLowerFindMin.get(), futureUpperFindMin.get()); } else { return Math.min(numbers[startIndex], numbers[endIndex]); } } public static void main(String[] args) throws InterruptedException, ExecutionException { int[] numbers = new int[100]; Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < numbers.length; i++) { numbers[i] = random.nextInt(100); } ExecutorService executorService = Executors.newFixedThreadPool(64); Future<Integer> futureResult = executorService.submit(new FindMinTask(executorService, numbers, 0, numbers.length-1)); System.out.println(futureResult.get()); executorService.shutdown(); } }
The code looks very similar, expect the fact that we submit()
our tasks to the ExecutorService
and then use the returned instance of Future
to wait()
for the result. The main difference between both implementations can be found at the point where the thread pool is constructed. In the example above, we create a fixed thread pool with 64(!) threads. Why did I choose such a big number? The reason here is, that calling get()
for each returned Future
block the current thread until the result is available. If we would only provide as many threads to the pool as we have CPUs available, the program would run out of resources and hang indefinitely.
The ForkJoinPool
implements the already mentioned work-stealing strategy, i.e. every time a running thread has to wait for some result; the thread removes the current task from the work queue and executes some other task ready to run. This way the current thread is not blocked and can be used to execute other tasks. Once the result for the originally suspended task has been computed the task gets executed again and the join() method returns the result. This is an important difference to the normal ExecutorService
where you would have to block the current thread while waiting for a result.