Core Java
My Custom Thread Pool Executor in Java
ThreadPoolExecutor is a feature added by java concurrent api to maintain and reuse threads efficiently , so that our programs don’t have to worry about creating and destroying threads and focus on the core functionality. I have created a custom thread pool executor to get better understanding of how thread pool executor would work .
Functionality :
- It maintains a fixed thread pool ,and creates threads and start the threads even if no task is submitted whereas ThreadPoolExecutor creates threads on demand , i.e. whenever a runnable is submitted to pool and the number of threads are less than core pool size .
- In ThreadPoolExecutor, we provide a waiting queue ,where new runnable task waits when all threads are busy running existing task. Once the queue is filled , new threads will be created up to maximum pool size. In MyThreadPool , i am storing the runnable in a linked list , so every task will wait in the list and it is unbounded , so no usage of maxPoolSize in this .
- In ThreadPoolExecutor , we use Future Objects to get the result from task , future.get() method will block if the result is not available , or we use CompletionService . In MyThreadPoolExecutor , i have created a simple interface called ResultListener , user has to provide a implementation of this as to how he wants the output to be processed . After every task is completed , the ResultListener will get callback with the output of task or error method will be called in case of any exception.
- When shutdown method is called , MyThreadPoolExecutor will stop accepting new tasks and complete the remaining tasks .
- I have provided very basic functionality as compared to ThreadPoolExecutor , i have used simple thread mechanism like wait() , notify() , notifyAll(), and join().
- Performance wise it is similar to ThreadPoolExecutor , some times better in some cases. Do let me know if you find any interesting results or ways to improve it .
package com.util; import java.util.concurrent.Callable; /** * Run submitted task of {@link MyThreadPool} After running the task , It calls * on {@link ResultListener}object with {@link Output}which contains returned * result of {@link Callable}task. Waits if the pool is empty. * * @author abhishek * * @param */ import java.util.concurrent.Callable; /** * Run submitted task of {@link MyThreadPool} After running the task , It calls * on {@link ResultListener}object with {@link Output}which contains returned * result of {@link Callable}task. Waits if the pool is empty. * * @author abhishek * * @param <V> */ public class MyThread<V> extends Thread { /** * MyThreadPool object, from which the task to be run */ private MyThreadPool<V> pool; private boolean active = true; public boolean isActive() { return active; } public void setPool(MyThreadPool<V> p) { pool = p; } /** * Checks if there are any unfinished tasks left. if there are , then runs * the task and call back with output on resultListner Waits if there are no * tasks available to run If shutDown is called on MyThreadPool, all waiting * threads will exit and all running threads will exit after finishing the * task */ public void run() { ResultListener<V> result = pool.getResultListener(); Callable<V> task; while (true) { task = pool.removeFromQueue(); if (task != null) { try { V output = task.call(); result.finish(output); } catch (Exception e) { result.error(e); } } else { if (!isActive()) break; else { synchronized (pool.getWaitLock()) { try { pool.getWaitLock().wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } } void shutdown() { active = false; } }
package com.util; import java.util.LinkedList; import java.util.concurrent.Callable; /** * This class is used to execute submitted {@link Callable} tasks. this class * creates and manages fixed number of threads User will provide a * {@link ResultListener}object in order to get the Result of submitted task * * @author abhishek * * */ public class MyThreadPool<V> { private Object waitLock = new Object(); public Object getWaitLock() { return waitLock; } /** * list of threads for completing submitted tasks */ private final LinkedList<MyThread<V>> threads; /** * submitted task will be kept in this list untill they run by one of * threads in pool */ private final LinkedList<Callable<V>> tasks; /** * shutDown flag to shut Down service */ private volatile boolean shutDown; /** * ResultListener to get back the result of submitted tasks */ private ResultListener<V> resultListener; /** * initializes the threadPool by starting the threads threads will wait till * tasks are not submitted * * @param size * Number of threads to be created and maintained in pool * @param myResultListener * ResultListener to get back result */ public MyThreadPool(int size, ResultListener<V> myResultListener) { tasks = new LinkedList<Callable<V>>(); threads = new LinkedList<MyThread<V>>(); shutDown = false; resultListener = myResultListener; for (int i = 0; i < size; i++) { MyThread<V> myThread = new MyThread<V>(); myThread.setPool(this); threads.add(myThread); myThread.start(); } } public ResultListener<V> getResultListener() { return resultListener; } public void setResultListener(ResultListener<V> resultListener) { this.resultListener = resultListener; } public boolean isShutDown() { return shutDown; } public int getThreadPoolSize() { return threads.size(); } public synchronized Callable<V> removeFromQueue() { return tasks.poll(); } public synchronized void addToTasks(Callable<V> callable) { tasks.add(callable); } /** * submits the task to threadPool. will not accept any new task if shutDown * is called Adds the task to the list and notify any waiting threads * * @param callable */ public void submit(Callable<V> callable) { if (!shutDown) { addToTasks(callable); synchronized (this.waitLock) { waitLock.notify(); } } else { System.out.println('task is rejected.. Pool shutDown executed'); } } /** * Initiates a shutdown in which previously submitted tasks are executed, * but no new tasks will be accepted. Waits if there are unfinished tasks * remaining * */ public void stop() { for (MyThread<V> mythread : threads) { mythread.shutdown(); } synchronized (this.waitLock) { waitLock.notifyAll(); } for (MyThread<V> mythread : threads) { try { mythread.join(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
package com.util; /** * This interface imposes finish method * which is used to get the {@link Output} object * of finished task * @author abhishek * * @param */ public interface ResultListener { public void finish(T obj); public void error(Exception ex); }
you can implement this class as you want to get back and process the result returned by tasks.
package com.util; public class DefaultResultListener implements ResultListener{ @Override public void finish(Object obj) { } @Override public void error(Exception ex) { ex.printStackTrace(); } }
For example this class will add the number returned by tasks .
package com.util; import java.util.concurrent.atomic.AtomicInteger; /** * ResultListener class to keep track of total matched count * @author abhishek * * @param */ public class MatchedCountResultListener implements ResultListener { /** * matchedCount to keep track of the number of matches returned by submitted * task */ AtomicInteger matchedCount = new AtomicInteger(); /** * this method is called by ThreadPool to give back the result of callable * task. if the task completed successfully then increment the matchedCount by * result count */ @Override public void finish(V obj) { //System.out.println('count is '+obj); matchedCount.addAndGet((Integer)obj); } /** * print exception thrown in running the task */ @Override public void error(Exception ex) { ex.printStackTrace(); } /** * returns the final matched count of all the finished tasks * * @return */ public int getFinalCount() { return matchedCount.get(); } }
This is a test class which runs simple for loop using CompletionService and MyThreadPoolExecutor
package test; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.util.DefaultResultListener; import com.util.MyThreadPool; public class TestClass { public static void main(String[] args) throws InterruptedException { CompletionService threadService; ExecutorService service = Executors.newFixedThreadPool(2); threadService = new ExecutorCompletionService (service); long b = System.currentTimeMillis(); for(int i =0;i<50000;i++){ threadService.submit(new MyRunable (i)); } service.shutdown(); System.out.println('time taken by Completion Service ' + (System.currentTimeMillis()-b)); DefaultResultListener result = new DefaultResultListener(); MyThreadPool newPool = new MyThreadPool (2,result); long a = System.currentTimeMillis(); int cc =0; for(int i =0;i<50000;i++) { cc = cc+i; } System.out.println('time taken without any pool ' + (System.currentTimeMillis()-a)); a= System.currentTimeMillis(); for(int i =0;i<5000;i++){ newPool.submit(new MyRunable (i)); } newPool.stop(); System.out.println('time taken by myThreadPool ' + (System.currentTimeMillis()-a)); } } class MyRunable implements Callable { int index = -1; public MyRunable(int index) { this.index = index; } @Override public Integer call() throws Exception { return index; } }
Reference: My Custom Thread Pool Executor in Java from our JCG partner Abhishek Somani at the Java , J2EE , Server blog.
In TestClass, your custom thread pool only runs 1/10 of the transactions as the other test cases. That would account for the increased performance.