Threading stories: about robust thread pools
To be useful in a wide range of contexts ThreadPoolExecutor provides some adjustable parameters. This is nice, but it also leaves the decision to us, the developers, to choose the right settings for our concrete cases. Here is the largest constructor for a ThreadPoolExecutor.
1 2 3 4 5 6 7 | public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... } |
Thread pool types
Some of the parameters shown in the construtor above are very sensible ones in terms of resource consumption and resulting system stability. Based on the different parameter settings of the constructor it’s possible to distinguish some fundamental categories of thread pools. Here are some default thread pool settings offered by the Executors class.
01 02 03 04 05 06 07 08 09 10 11 | public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0 , Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool( int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } |
In the “cached thread pool” the number of threads is unbounded. This is caused by a maximumPoolSize of Integer.MAX_VALUE in conjunction with SynchronousQueue. If you submit tasks in a burst to that thread pool, it will likely create a thread for each single task. In this scenario created threads terminate when they were idle for 60 seconds. The second example shows a “fixed thread pool”, where maximumPoolSize is set to a specific fixed value. The pools thread count will never exceed this value. If tasks arive in a burst and all threads are busy then they will queue up in the work queue (here a LinkedBlockingQueue). Threads in this fixed thread pool never die. The drawback of unbounded pools is obvious: both settings can cause JVM memory trouble (you get OutOfMemoryErrors – if you’re lucky).
Let’s look at some bounded thread pool settings:
01 02 03 04 05 06 07 08 09 10 11 | ThreadPoolExecutor pool = new ThreadPoolExecutor( 0 , 50 , 60 , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); pool.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy()); ThreadPoolExecutor pool = new ThreadPoolExecutor( 50 , 50 , 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>( 100000 )); pool.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy()); |
The first snippet creates a chached thread pool with the number of threads bounded to a value of 50. If tasks arrive in a burst and all the threads are busy, a call to the ThreadPoolExecutor.execute() method would now be rejected by issueing a RejectedExecutionException. Often this isn’t what I typically want, therefore I change the saturation policy by setting the rejected-execution-handler to CallerRunsPolicy. This policy pushes the work back to the caller. That is, the client thread that issued the task for asynchronous execution will now run the task synchronously. You can develop your own saturation policy by implementing your own RejectedExecutionHandler. The second snippet creates a fixed thread pool with 50 threads and a work queue that is bounded to a value of 100000 tasks. If the work queue is full, the saturation policy pushes the work back to the client. The cached pool creates threads on demand and it terminates the threads if they idle for 60 seconds. The fixed pool keeps the threads alive.
Thread pool boundaries
As shown above there are two basic approaches to define thread pools: bounded and unbounded thread pools. Unbounded thread pools, like the default ones of Executors class work fine, as long you don’t submit too many tasks in a burst. If that happens unbounded thread pools can harm your systems stability. Either too many threads are created by a cached thread pool, or too many tasks are queued in a fixed thread pool. The letter is more difficult to achieve, but still possible. For production use it may be better to set the boundaries to some meaningfull values like in the last two thread pool settings. Because it can be tricky to define those “meaningfull boundaries” I have developed a little program that does that work for me.
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | /** * A class that calculates the optimal thread pool boundaries. It takes the desired target utilization and the desired * work queue memory consumption as input and retuns thread count and work queue capacity. * * @author Niklas Schlimm * */ public abstract class PoolSizeCalculator { /** * The sample queue size to calculate the size of a single {@link Runnable} element. */ private final int SAMPLE_QUEUE_SIZE = 1000 ; /** * Accuracy of test run. It must finish within 20ms of the testTime otherwise we retry the test. This could be * configurable. */ private final int EPSYLON = 20 ; /** * Control variable for the CPU time investigation. */ private volatile boolean expired; /** * Time (millis) of the test run in the CPU time calculation. */ private final long testtime = 3000 ; /** * Calculates the boundaries of a thread pool for a given {@link Runnable}. * * @param targetUtilization * the desired utilization of the CPUs (0 <= targetUtilization <= 1) * @param targetQueueSizeBytes * the desired maximum work queue size of the thread pool (bytes) */ protected void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) { calculateOptimalCapacity(targetQueueSizeBytes); Runnable task = creatTask(); start(task); start(task); // warm up phase long cputime = getCurrentThreadCPUTime(); start(task); // test intervall cputime = getCurrentThreadCPUTime() - cputime; long waittime = (testtime * 1000000 ) - cputime; calculateOptimalThreadCount(cputime, waittime, targetUtilization); } private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) { long mem = calculateMemoryUsage(); BigDecimal queueCapacity = targetQueueSizeBytes.divide( new BigDecimal(mem), RoundingMode.HALF_UP); System.out.println( "Target queue memory usage (bytes): " + targetQueueSizeBytes); System.out.println( "createTask() produced " + creatTask().getClass().getName() + " which took " + mem + " bytes in a queue" ); System.out.println( "Formula: " + targetQueueSizeBytes + " / " + mem); System.out.println( "* Recommended queue capacity (bytes): " + queueCapacity); } /** * Brian Goetz' optimal thread count formula, see 'Java Concurrency in Practice' (chapter 8.2) * * @param cpu * cpu time consumed by considered task * @param wait * wait time of considered task * @param targetUtilization * target utilization of the system */ private void calculateOptimalThreadCount( long cpu, long wait, BigDecimal targetUtilization) { BigDecimal waitTime = new BigDecimal(wait); BigDecimal computeTime = new BigDecimal(cpu); BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime().availableProcessors()); BigDecimal optimalthreadcount = numberOfCPU.multiply(targetUtilization).multiply( new BigDecimal( 1 ).add(waitTime.divide(computeTime, RoundingMode.HALF_UP))); System.out.println( "Number of CPU: " + numberOfCPU); System.out.println( "Target utilization: " + targetUtilization); System.out.println( "Elapsed time (nanos): " + (testtime * 1000000 )); System.out.println( "Compute time (nanos): " + cpu); System.out.println( "Wait time (nanos): " + wait); System.out.println( "Formula: " + numberOfCPU + " * " + targetUtilization + " * (1 + " + waitTime + " / " + computeTime + ")" ); System.out.println( "* Optimal thread count: " + optimalthreadcount); } /** * Runs the {@link Runnable} over a period defined in {@link #testtime}. Based on Heinz Kabbutz' ideas * * @param task * the runnable under investigation */ public void start(Runnable task) { long start = 0 ; int runs = 0 ; do { if (++runs > 5 ) { throw new IllegalStateException( "Test not accurate" ); } expired = false ; start = System.currentTimeMillis(); Timer timer = new Timer(); timer.schedule( new TimerTask() { public void run() { expired = true ; } }, testtime); while (!expired) { task.run(); } start = System.currentTimeMillis() - start; timer.cancel(); } while (Math.abs(start - testtime) > EPSYLON); collectGarbage( 3 ); } private void collectGarbage( int times) { for ( int i = 0 ; i < times; i++) { System.gc(); try { Thread.sleep( 10 ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break ; } } } /** * Calculates the memory usage of a single element in a work queue. Based on Heinz Kabbutz' ideas * * @return memory usage of a single {@link Runnable} element in the thread pools work queue */ public long calculateMemoryUsage() { BlockingQueue<Runnable> queue = createWorkQueue(); for ( int i = 0 ; i < SAMPLE_QUEUE_SIZE; i++) { queue.add(creatTask()); } long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); queue = null ; collectGarbage( 15 ); mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); queue = createWorkQueue(); for ( int i = 0 ; i < SAMPLE_QUEUE_SIZE; i++) { queue.add(creatTask()); } collectGarbage( 15 ); mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); return (mem1 - mem0) / SAMPLE_QUEUE_SIZE; } /** * Create your runnable task here. * * @return an instance of your runnable task under investigation */ protected abstract Runnable creatTask(); /** * Return an instance of the queue used in the thread pool. * * @return queue instance */ protected abstract BlockingQueue<Runnable> createWorkQueue(); /** * Calculate current cpu time. Various frameworks may be used here, depending on the operating system in use. (e.g. * http://www.hyperic.com/products/sigar). The more accurate the CPU time measurement, the more accurate the results * for thread count boundaries. * * @return current cpu time of current thread */ protected abstract long getCurrentThreadCPUTime(); } |
The program will find ideal thread pool boundaries for the maximum capacity of your work queue and the required thread count. The algorithms are based on the work of Brian Goetz and Dr. Heinz Kabutz, you can find the references in the Javadoc. Calculating the capacity required by your work queue in a fixed thread pool is relatively simple. All you need is the desired target size of the work queue in bytes divided by the average size of your submitted tasks in bytes. Unfortunately, calculating the maximum thread count *isn’t* an exact science. However, if you use the formulas in the program you avoid the harmful extremes of too large work queues and too many threads. Calculating the ideal pool size depends on the wait time to compute time ratio of your task. The more wait time, the more threads required to achieve a given utilization. The PoolSizeCalculator requires the desired target utilization and the desired maximum work queue memory consumption as input. Based on an investigation of object sizes and CPU time it returns the ideal settings for maximum thread count and work queue capacity in the thread pool.
Let’s go through an example. The following snippet shows how to use the PoolSizeCalculator in a scenario of 1.0 (=100%) desired utilization and 100000 bytes maximum work queue size.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class MyPoolSizeCalculator extends PoolSizeCalculator { public static void main(String[] args) throws InterruptedException, InstantiationException, IllegalAccessException, ClassNotFoundException { MyThreadSizeCalculator calculator = new MyThreadSizeCalculator(); calculator.calculateBoundaries( new BigDecimal( 1.0 ), new BigDecimal( 100000 )); } protected long getCurrentThreadCPUTime() { return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime(); } protected Runnable creatTask() { return new AsynchronousTask( 0 , "IO" , 1000000 ); } protected BlockingQueue createWorkQueue() { return new LinkedBlockingQueue<>(); } } |
MyPoolSizeCalculator extends the abstract PoolSizeCalculator. You need to implement three template methods: getCurrentThreadCPUTime, creatTask, createWorkQueue. The snippet applies standard Java Management Extensions for the CPU time measurement (line 13). If JMX isn’t accurate enough, then other frameworks can be considered (e.g. SIGAR API). Thread pools work best when tasks are homogeneous and independent. Therefore the createTask-method creates an instance of a single type of Runnable task (line 17). This task will be investigated to calculate wait time to CPU time ratio. Finally, I need to create an instance of the work queue to calculate memory usage of submitted task (line 21). The output of that program shows the ideal settings for the work queue capacity and the maximum pool size (number of threads). These are the results for my examplary I/O intense AsynchronousTask on a dual core machine.
01 02 03 04 05 06 07 08 09 10 11 | Target queue memory usage (bytes): 100000 createTask() produced com.schlimm.java7.nio.threadpools.AsynchronousTask which took 40 bytes in a queue Formula: 100000 / 40 * Recommended queue capacity (bytes): 2500 Number of CPU: 2 Target utilization: 1.0 Elapsed time (nanos): 3000000000 Compute time (nanos): 906250000 Wait time (nanos): 2093750000 Formula: 2 * 1.0 * (1 + 2093750000 / 906250000) * Optimal thread count: 6.0 |
The ‘recommended queue capacity’ and the ‘optimal thread count’ are the important values. An ideal setting for my AsynchronousTask would be as follows:
1 2 3 4 5 | ThreadPoolExecutor pool = new ThreadPoolExecutor( 6 , 6 , 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>( 2500 )); pool.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy()); |
Using those settings your work queue cannot grow larger then the desired 100000 bytes. And since the desired utilization is 1.0 (100%), it does not make sense to make the pool larger then 6 threads (wait time to compute time ratio is three – for each compute time intervall, three wait time intervalls follow). The results of the program largely depend on the type of tasks you process. If the tasks are homogenous and compute intense the program will likely recommend to set the pool size to the number of available CPU. But if the task has wait time, like in I/O intense tasks, the program will recomend to increase the thread count to achieve 100% utilization. Also notice, some tasks change their wait time to compute time ratio after some time of processing, e.g. if the file size of an I/O operation increases. This fact suggests to develop a self-tuning thread pool (one of my follow up blogs). In any case, you should make the thread pool sizes configurable, so that you can adjust them at runtime.
OK, that’s all in terms of robust thread pools for now. I hope you enjoyed it a little. And don’t blame me if the formula isn’t 100% accurate in terms of maximum pool size. As I said, it’s not an exact science, it’s about getting an idea of the ideal pool size.
Reference: “Threading stories: about robust thread pools” from our JCG partner Niklas.