Using a ThreadPoolExecutor to Parallelize Independent Single-Threaded Tasks
In this blog post we’ll describe the power, the flexibility and the simplicity of this framework showing off a simple use case.
The Basics
The executor framework introduces an interface to manage task execution: Executor. Executor is the interface you use to submit tasks, represented as Runnable instances. This interface also isolates a task submission from a task execution: executors with different execution policies all publish the same submission interface: should you change your execution policy, your submission logic wouldn’t be affected by the change.
If you want to submit a Runnable instance for execution, it’s as simple as:
Executor exec = …; exec.execute(runnable);
Thread Pools
As outlined in the previous section, how the executor is going to execute your runnable isn’t specified by the Executor contract: it depends on the specific type of executor you’re using. The framework provides some different types of executors, each one with a specific execution policy tailored for different use cases.
The most common type of executors you’ll be dealing with are thread pool executors., which are instances of the ThreadPoolExecutor class (and its subclasses). Thread pool executors manage a thread pool, that is the pool of worker threads that’s going to execute the tasks, and a work queue.
You surely have seen the concept of pool in other technologies. The primary advantage of using a pool is reducing the overhead of resources creation, reusing structures (in this case, threads) that have been released after use. Another implicit advantage of using a pool is the capability of sizing your resource usage: you can tune the thread pool sizes to achieve the load you desire, without jeopardizing system resources.
The framework provides a factory class for thread pools called Executors. Using this factory you’ll be able to create thread pools of different characteristics. Often, the underlying implementation is often the same (ThreadPoolExecutor) but the factory class helps you quickly configure a thread pool without using its more complex constructor. The factory methods are:
- newFixedThreadPool: this method returns a thread pool whose maximum size is fixed. It will create new threads as needed up to the maximum configured size. When the number of threads hits the maximum, the thread pool will maintain the size constant.
- newCachedThreadPool: this method returns an unbounded thread pool, that is a thread pool without a maximum size. However, this kind of thread pool will tear down unused thread when the load reduces.
- newSingleThreadedExecutor: this method returns an executor that guarantees that tasks will be executed in a single thread.
- newScheduledThreadPool: this method returns a fixed size thread pool that supports delayed and timed task execution.
This is just the beginning. Executors also provide other facilities that are out of scope in this tutorial and that I strongly encourage you to study about:
- Life cycle management methods, declared by the ExecutorService interface (such as shutdown() and awaitTermination()).
- Completion services to poll for a task status and retrieve its return value, if applicable.
The ExecutorService interface is particularly important since it provides a way to shutdown a thread pool, which is something you almost surely want to be able to do cleanly. Fortunately, the ExecutorService interface is pretty simple and self-explanatory and I recommend you study its JavaDoc thoroughly.
Basically, you send a shutdown() message to an ExecutorService, after which it won’t accept new submitted tasks, but will continue processing the already enqueued jobs. You can pool for an executor service’s termination status with isTerminated(), or wait until termination using the awaitTermination(…) method. The awaitTermination method won’t wait forever, though: you’ll have to pass the maximum wait timeout as a parameter.
Warning: a source of errors and confusion is a understanding why a JVM process never exits. If you don’t shutdown your executor services, thus tearing down the underlying threads, the JVM will never exit: a JVM exits when its last non-daemon thread exits.
Configuring a ThreadPoolExecutor
If you decide to create a ThreadPoolExecutor manually instead of using the Executors factory class, you will need to create and configure one using one of its constructors. The most extensive constructor of this class is:
public ThreadPoolExecutor( int corePoolSize, int maxPoolSize, long keepAlive, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);
As you can see, you can configure:
- The core pool size (the size the thread pool will try to stick with).
- The maximum pool size.
- The keep alive time, which is a time after which an idle thread is eligible for being torn down.
- The work queue to hold tasks awaiting execution.
- The policy to apply when a task submission is rejected.
Limiting the Number of Queued Tasks
Limiting the number of concurrent tasks being executing, sizing your thread pool, represents a huge benefit for your application and its execution environment in terms of predictability and stability: an unbounded thread creation will eventually exhaust the runtime resources and your application might experience as a consequence, serious performance problems that may lead even to application instability.
That’s a solution to just one part of the problem: you’re capping the number of tasks being executed but aren’t capping the number of jobs that can be submitted and enqueued for later execution. The application will experience resource shortage later, but it will eventually experience it if the submission rate consistently outgrows the execution rate.
The solution to this problem is:
- Providing a blocking queue to the executor to hold the awaiting tasks. In the case the queue fills up, the submitted task will be “rejected”.
- The RejectedExecutionHandler is invoked when a task submission is rejected, and that’s why the verb rejected was quoted in the previous item. You can implement you’re own rejection policy or use one of the built-in policies provided by the framework.
The default rejection policies has the executor throw a RejectedExecutionException. However, other built-in policies let you:
- Discard a job silently.
- Discard the oldest job and try to resubmit the last one.
- Execute the rejected task on the caller’s thread.
When and why would one use such a thread pool configuration? Let’s see an example.
An Example: Parallelizing Independent Single-Threaded Tasks
Recently, I was called to solve a problem with an old job my client was running since a long time ago. Basically, the job is made up of a component that awaits for file system events on a set of directory hierarchies. Whenever an event is fired, a file must be processed. The file processing is performed by a proprietary single threaded process. Truth be said, by its own nature, even if I could, I don’t if I could parallelize it. The arrival rate of events is very high throughout part of the day and there’s no need to process file in real time, they just to get processed before the next day.
The current implementation was a mix and match of technologies, including a UNIX shell script that was responsible for scanning huge directory hierarchies to detect where changes were applied. When that implementation was put in place, the number of cores in the execution environment were two, as much. Also, the rate of events was pretty lower: nowadays they’re in the order of the millions, for a total of between 1 and 2 terabytes of raw data to be processed.
The servers the client is running these processes nowadays are twelve core machines: a huge opportunity to parallelize those old single-threaded tasks. We’ve got basically all of the ingredients for the recipe, we just need to decide how to build and tune it. Some thoughts before writing any code were necessary to understand the nature of the load and these are the constraints I detected:
- A really huge number of files is to be scanned periodically: each directory contains between one and two millions of files.
- The scanning algorithm is very quick and can be parallelized.
- Processing a file will take at least 1 second, with spikes of even 2 or 3 seconds.
- When processing a file, there is no other bottleneck than CPU.
- CPU usage must be tunable, in order to use a different load profile depending on the time of the day.
I’ll thus need a thread pool whose size is determined by the load profile active at the moment of invoking the process. I’m inclined to create, then, a fixed size thread pool executor configured according to the load policy. Since a processing thread is only CPU-bound, its core usage is 100% and waits on no other resources, the load policy is very easy to calculate: just take the number of core available in the processing environment and scale it down using the load factor that’s active at that moment (and check that at least one core is used in the moment of peak):
int cpus = Runtime.getRuntime().availableProcessors(); int maxThreads = cpus * scaleFactor; maxThreads = (maxThreads > 0 ? maxThreads : 1);
Then, I need to create a ThreadPoolExecutor using a blocking queue to bound the number of submitted tasks. Why? Well: the directory scanning algorithms are very quick and will generate a huge number of files to process very quickly. How huge? It’s hard to predict and its variability is pretty high. I’m not going to let the internal queue of my executor fill up indiscriminately with the objects representing my tasks (which include a pretty huge file descriptor). I’ll prefer let the executor reject the files when the queue fills up.
Also, I’ll use the ThreadPoolExecutor.CallerRunsPolicy as rejection policy. Why? Well, because when the queue is filled up and while the threads in the pools are busy processing the file, I’ll have the thread that is submitting the task executing it. This way, the scanning stops to process a file and will resume scanning as soon as it finishes executing the current task.
Here’s the code that creates the executor:
ExecutorService executorService = new ThreadPoolExecutor( maxThreads, // core thread pool size maxThreads, // maximum thread pool size 1, // time to wait before resizing pool TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(maxThreads, true), new ThreadPoolExecutor.CallerRunsPolicy());
The skeleton of the code is the following (greatly simplified):
// scanning loop: fake scanning while (!dirsToProcess.isEmpty()) { File currentDir = dirsToProcess.pop(); // listing children File[] children = currentDir.listFiles(); // processing children for (final File currentFile : children) { // if it's a directory, defer processing if (currentFile.isDirectory()) { dirsToProcess.add(currentFile); continue; } executorService.submit(new Runnable() { @Override public void run() { try { // if it's a file, process it new ConvertTask(currentFile).perform(); } catch (Exception ex) { // error management logic } } }); } // ... // wait for all of the executor threads to finish executorService.shutdown(); try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { // pool didn't terminate after the first try executorService.shutdownNow(); } if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { // pool didn't terminate after the second try } } catch (InterruptedException ex) { executorService.shutdownNow(); Thread.currentThread().interrupt(); }
Conclusion
As you can see, the Java concurrency API is very easy to use, very flexible and extremely powerful. Some years ago, I would have taken much more effort to write such a simple program. This way, I could quickly solve a scalability problem caused by a legacy single threaded component in a matter of hours.
Reference: Using a ThreadPoolExecutor to Parallelize Independent Single-Threaded Tasks from our JCG partner Enrico Crisostomo at the The Grey Blog.
- Blocking Queue example of limited connection pool
- CountDownLatch example of a more general wait/notify mechanism
- Reentrant Lock example of a task runner
- emaphores example limiting URL connections
Related Articles :
Thanks for this, very heplful. One question, though… I am a little confused but your exit strategy. You call awaitTermination() and wait for 60 seconds and then get impatient and call shutdownNow() and awaitTermination() again. It seems to me as long as you don’t have an infinite loop or deadlock, you should really be more patient than 60 second for your queue to finish it’s work. I realize your queue is upper bounded by 12 threads that take at most 3 seconds a piece, but if you suddenly changed situations, maybe 60 seconds isn’t valid and now have your jobs… Read more »
Hi Kevin, This answer comes very late, but it may be useful to somebody else reading this post. To make a long story short, since the pool is bounded and the rejection policy is “caller runs”, when the shutdown() call is performed (which simply instructs the pool not to accept any more tasks and continue processing the existing ones) there will be at most n tasks left to be executed, where n is the maximum size of the pool. In this example, I’m waiting 60 seconds for them to complete, and since they execute at the same time and do… Read more »
One other consideration that occurred to me this morning. Because you have specified maxThreads as being the number of cores on the machine (assuming scaleFactor as 1) and there is the primary thread the programming is running on, and because you have selected CallerRunsPolicy, you should probably reduce maxThreads by 1. If you have a 4 core machine and start a pool of 4 threads, you will actually be running on 5 threads – the 4 in the pool and the one caller that is utilized by the policy. Correct?
Yes, it’s correct, but you CPU load will depend on how much the tasks are CPU-bound (those were not, and that’s why the scaleFactor was introduced in the first place). In certain cases, instead of having the CPUs waiting for some other resource, you can have more threads running and taking advantage of them.
Hi Enrico,
Thanks for the post. Could you tell me how to calculate the “scaleFactor” in the above code. You use the
Cores * scale Factor
to calculate the maximumThreads. How would one arrive at a “scaleFactor”. I presume it’s calculated prior to configuring the thread pool executor, right?