How to implement Thread pool in Java
A thread is an independent program’s path of execution. In java, each thread extends the java.lang.Thread class or implements java.lang.Runnable.
Multithreading refers to the execution of two or more threads concurrently within a single task.In multithreading, each task can have many threads, and these threads can run concurrently, either asynchronously or synchronously. You can find more information about thread and multithreading in another tutorial that I wrote about multithreading here.
1. What is Thread pool
LinkedBlockingQueue
for adding and removing tasks to the queue.wait()
and notify()
to signal waiting threads that new work has arrived. The following example shows a simple work queue which is queue of Runnable
objects. This is a common convention for schedulers and work queues, although there is no particular need imposed by the Thread API to use the Runnable
interface.package tutorials; import java.util.concurrent.LinkedBlockingQueue; public class ThreadPool { private final int nThreads; private final PoolWorker[] threads; private final LinkedBlockingQueue queue; public ThreadPool(int nThreads) { this.nThreads = nThreads; queue = new LinkedBlockingQueue(); threads = new PoolWorker[nThreads]; for (int i = 0; i < nThreads; i++) { threads[i] = new PoolWorker(); threads[i].start(); } } public void execute(Runnable task) { synchronized (queue) { queue.add(task); queue.notify(); } } private class PoolWorker extends Thread { public void run() { Runnable task; while (true) { synchronized (queue) { while (queue.isEmpty()) { try { queue.wait(); } catch (InterruptedException e) { System.out.println("An error occurred while queue is waiting: " + e.getMessage()); } } task = queue.poll(); } // If we don't catch RuntimeException, // the pool could leak threads try { task.run(); } catch (RuntimeException e) { System.out.println("Thread pool is interrupted due to an issue: " + e.getMessage()); } } } } }
It is important to use synchronise block while working with the queue, to control the access of threads to the queue.
package tutorials; public class Task implements Runnable { private int num; public Task(int n) { num = n; } public void run() { System.out.println("Task " + num + " is running."); } }
import tutorials.Task; import tutorials.ThreadPool; public class Main { public static void main(String[] args) { ThreadPool pool = new ThreadPool(7); for (int i = 0; i < 5; i++) { Task task = new Task(i); pool.execute(task); } }
In the above example, we used notify()
instead of notifyAll()
. Because notify()
has more desirable performance characteristics than notifyAll()
; in particular, notify()
causes many fewer context switches, which is important in a server application. But it is important to make sure when using notify()
in other situation as there are subtle risks associated with using notify()
, and it is only appropriate to use it under certain specific conditions.
The following figure demonstrates the thread pool design in the above example.
2. Effective use of thread pools
Thread pool is a powerful mechanism for structuring multithreaded applications, but it is not without risk. Applications built with thread pools could have all the same concurrency risks as any other multithreaded applications, such as deadlock, resource thrashing, synchronization or concurrency errors, thread leakage and request overload.
Here are some points:
- Do not queue tasks which wait synchronously for other tasks as this can cause a deadlock.
- If the task requires to wait for a resource such as I/O, specify a maximum wait time and then fail or requeue the task execution. This guarantees that some progress will be made by freeing the thread for another task that might complete successfully.
- Tune the thread pool size effectively, and understand that having too few threads or too many threads both can cause problems. The optimum size of a thread pool depends on the number of available processors and the nature of the tasks on the work queue.
3. Conclusion
The thread pool is useful for organizing server applications and it is really important to implement it properly to prevent any issues such as deadlock and complexity of usage for wait()
or notify()
. So, it is recommended to consider using one of the Executor
classes from util.concurrent
, such as ThreadPoolExecutor
, rather than writing thread pool from scratch. If the requirement is to create threads to handle short-lived tasks, you could consider using a thread pool instead.
4. Download the Source Code
This was a tutorial for thread pool, to download the source code, click here.
Reference: | How to implement Thread pool in Java from our JCG partner Ima Miri at the Technology. Career blog. |
If you do not recommended to write your own ThreadPool, why did you do it ?
To understand how Thread Pool works and you can implement it yourself in lower level languages. I think so.
For students to understand what happens behind the scenes
Nice and easy explanation. Liked it!
I am wondering why we need a Runnable task to be submitted to the thread pool. Becasue, the runnable task which is passed, in this example, we are explicitly calling the run() method on that task.
@Stimpy, Even for scenarios where numerous short lived tasks might require serving, CachedThreadPool is available. If one wants the best of both the worlds i.e. FixedSizeThreadPool and CachedThreadPool, Custom thread pools using Executos class can be created, with supplying N core threads, N max threads and max idle time. The knowledge of thread pool internals as discussed here can be very useful for defining metrics and identifying test cases in the projects. Furthermore, it makes for interesting interview question ;-) @Vinayak: The run() method instead of start() is called on the task because, calling start() will spawn a new thread.… Read more »
By using task = queue.take() instead of queue.poll(); we would have avoided while a synchronized check on queue.isEmpty() all together.
while(true){
Runnable task = null;
try {
synchronized (object){ //object is just a global Object object here used for to-fro signalling
object.wait();
}
task = workQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
We can further remove the wait/signally thru object totally by relying on BlockingQueue’s take and put totally:
public void enqueueTask(Runnable runnable){
try {
workQueue.put(runnable);
/*synchronized (object){
object.notify();
}*/
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private class Worker extends Thread{
private final String workerID;
Worker(String id){
super();
this.workerID = id;
}
@Override
public void run(){
while(true){
Runnable task = null;
try {
/* synchronized (object){
object.wait();
}*/
task = workQueue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
When using LinkedBlockingQueue then no need for synchronized , wait, notify etc..queue.take and queue.add should be suffice. I think this implementation has some flaws but the overall idea about how to create thread pool is effective.
What about reclaiming of the threads after execution, you have missed the most important part of ThreadPoolExecutors.
Since we are using LinkedBlockingQueue in this example, no need of wait and notify as it is already handled in blockingQueue via put() and take() methods.
Even though we say thread pool , its a pool of thread objects. Thread is an action and no one can ever create a pool of actions.
In an interview where I was rejected , this was the one of the question on how to create custom thread pool in Java. My answer was “its good practice to use tested and well known implementations rather than creating own implementations”, I was not really aware how to create thread pool by own.