Core Java

My Custom Thread Pool Executor in Java

ThreadPoolExecutor is a feature added by java concurrent api to maintain and reuse threads efficiently , so that our programs don’t have to worry about creating and destroying threads and focus on the core functionality. I have created a custom thread pool executor to get better understanding of how thread pool executor would work .

Functionality :

  • It maintains a fixed thread pool ,and creates threads and start the threads even if no task is submitted whereas ThreadPoolExecutor creates threads on demand , i.e. whenever a runnable is submitted to pool and the number of threads are less than core pool size .
  • In ThreadPoolExecutor, we provide a waiting queue ,where new runnable task waits when all threads are busy running existing task. Once the queue is filled , new threads will be created up to maximum pool size. In MyThreadPool , i am storing the runnable in a linked list , so every task will wait in the list and it is unbounded , so no usage of maxPoolSize in this .
  • In ThreadPoolExecutor , we use Future Objects to get the result from task , future.get() method will block if the result is not available , or we use CompletionService . In MyThreadPoolExecutor , i have created a simple interface called ResultListener , user has to provide a implementation of this as to how he wants the output to be processed . After every task is completed , the ResultListener will get callback with the output of task or error method will be called in case of any exception.
  • When shutdown method is called , MyThreadPoolExecutor will stop accepting new tasks and complete the remaining tasks .
  • I have provided very basic functionality as compared to ThreadPoolExecutor , i have used simple thread mechanism like wait() , notify() , notifyAll(), and join().
  • Performance wise it is similar to ThreadPoolExecutor , some times better in some cases. Do let me know if you find any interesting results or ways to improve it .
package com.util;

import java.util.concurrent.Callable;

/**
 * Run submitted task of {@link MyThreadPool} After running the task , It calls
 * on {@link ResultListener}object with {@link Output}which contains returned
 * result of {@link Callable}task. Waits if the pool is empty.
 * 
 * @author abhishek
 * 
 * @param 

 */

import java.util.concurrent.Callable;
/**
* Run submitted task of {@link MyThreadPool} After running the task , It calls
* on {@link ResultListener}object with {@link Output}which contains returned
* result of {@link Callable}task. Waits if the pool is empty.
*
* @author abhishek
*
* @param <V>
*/
public class MyThread<V> extends Thread {
    /**
    * MyThreadPool object, from which the task to be run
    */
    private MyThreadPool<V> pool;
    private boolean active = true;
    public boolean isActive() {
        return active;
    }
    public void setPool(MyThreadPool<V> p) {
        pool = p;
    }
    /**
    * Checks if there are any unfinished tasks left. if there are , then runs
    * the task and call back with output on resultListner Waits if there are no
    * tasks available to run If shutDown is called on MyThreadPool, all waiting
    * threads will exit and all running threads will exit after finishing the
    * task
    */
    public void run() {
        ResultListener<V> result = pool.getResultListener();
        Callable<V> task;
        while (true)
        {
            task = pool.removeFromQueue();
            if (task != null)
            {
                try
                {
                    V output = task.call();
                    result.finish(output);
                } catch (Exception e)
                {
                    result.error(e);
                }
            } else
            {
                if (!isActive())
                break;
                else
                {
                    synchronized (pool.getWaitLock())
                    {
                        try
                        {
                            pool.getWaitLock().wait();
                        } catch (InterruptedException e)
                        {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    void shutdown() {
        active = false;
    }
}
package com.util;
import java.util.LinkedList;
import java.util.concurrent.Callable;
/**
* This class is used to execute submitted {@link Callable} tasks. this class
* creates and manages fixed number of threads User will provide a
* {@link ResultListener}object in order to get the Result of submitted task
*
* @author abhishek
*
*
*/
public class MyThreadPool<V> {
    private Object waitLock = new Object();
    public Object getWaitLock() {
        return waitLock;
    }
    /**
    * list of threads for completing submitted tasks
    */
    private final LinkedList<MyThread<V>> threads;
    /**
    * submitted task will be kept in this list untill they run by one of
    * threads in pool
    */
    private final LinkedList<Callable<V>> tasks;
    /**
    * shutDown flag to shut Down service
    */
    private volatile boolean shutDown;
    /**
    * ResultListener to get back the result of submitted tasks
    */
    private ResultListener<V> resultListener;
    /**
    * initializes the threadPool by starting the threads threads will wait till
    * tasks are not submitted
    *
    * @param size
    * Number of threads to be created and maintained in pool
    * @param myResultListener
    * ResultListener to get back result
    */
    public MyThreadPool(int size, ResultListener<V> myResultListener) {
        tasks = new LinkedList<Callable<V>>();
        threads = new LinkedList<MyThread<V>>();
        shutDown = false;
        resultListener = myResultListener;
        for (int i = 0; i < size; i++) {
            MyThread<V> myThread = new MyThread<V>();
            myThread.setPool(this);
            threads.add(myThread);
            myThread.start();
        }
    }
    public ResultListener<V> getResultListener() {
        return resultListener;
    }
    public void setResultListener(ResultListener<V> resultListener) {
        this.resultListener = resultListener;
    }
    public boolean isShutDown() {
        return shutDown;
    }
    public int getThreadPoolSize() {
        return threads.size();
    }
    public synchronized Callable<V> removeFromQueue() {
        return tasks.poll();
    }
    public synchronized void addToTasks(Callable<V> callable) {
        tasks.add(callable);
    }
    /**
    * submits the task to threadPool. will not accept any new task if shutDown
    * is called Adds the task to the list and notify any waiting threads
    *
    * @param callable
    */
    public void submit(Callable<V> callable) {
        if (!shutDown) {
            addToTasks(callable);
            synchronized (this.waitLock) {
                waitLock.notify();
            }
            } else {
            System.out.println('task is rejected.. Pool shutDown executed');
        }
    }
    /**
    * Initiates a shutdown in which previously submitted tasks are executed,
    * but no new tasks will be accepted. Waits if there are unfinished tasks
    * remaining
    *
    */
    public void stop() {
        for (MyThread<V> mythread : threads) {
            mythread.shutdown();
        }
        synchronized (this.waitLock) {
            waitLock.notifyAll();
        }
        for (MyThread<V> mythread : threads) {
            try {
                mythread.join();
                } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}
package com.util;

/**
 * This interface imposes finish method 
 * which is used to get the {@link Output} object 
 * of finished task
 * @author abhishek
 *
 * @param 

 */

public interface ResultListener 

                        {

 public void finish(T obj);
 public void error(Exception ex);

}

you can implement this class as you want to get back and process the result returned by tasks.

package com.util;

public class DefaultResultListener implements ResultListener{

 @Override
 public void finish(Object obj) {

 }

 @Override
 public void error(Exception ex) {
  ex.printStackTrace();
 }

}

For example this class will add the number returned by tasks .

package com.util;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * ResultListener class to keep track of total matched count
 * @author abhishek
 * 
 * @param 

 */
public class MatchedCountResultListener

                        implements ResultListener

                         {

	/**
	 * matchedCount to keep track of the number of matches returned by submitted
	 * task
	 */
	AtomicInteger matchedCount = new AtomicInteger();

	/**
	 * this method is called by ThreadPool to give back the result of callable
	 * task. if the task completed successfully then increment the matchedCount by
	 * result count
	 */
	@Override
	public void finish(V obj) {
		//System.out.println('count is '+obj);
		matchedCount.addAndGet((Integer)obj);
	}

	/**
	 * print exception thrown in running the task
	 */
	@Override
	public void error(Exception ex) {
		ex.printStackTrace();
	}

	/**
	 * returns the final matched count of all the finished tasks
	 * 
	 * @return
	 */
	public int getFinalCount() {
		return matchedCount.get();
	}
}

This is a test class which runs simple for loop using CompletionService and MyThreadPoolExecutor

package test;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.util.DefaultResultListener;
import com.util.MyThreadPool;

public class TestClass {

	public static void main(String[] args) throws InterruptedException {
		CompletionService

                       threadService;
		ExecutorService service = Executors.newFixedThreadPool(2);
		threadService = new ExecutorCompletionService

                       (service);

		long b = System.currentTimeMillis();
		for(int i =0;i<50000;i++){
			threadService.submit(new MyRunable (i));
		}

		service.shutdown();
		System.out.println('time taken by Completion Service ' + (System.currentTimeMillis()-b));

		DefaultResultListener result = new DefaultResultListener();
		MyThreadPool

                         newPool = new MyThreadPool

                         (2,result);
		long a = System.currentTimeMillis();

		int cc =0;
		for(int i =0;i<50000;i++)
		{
			cc = cc+i;
		}
		System.out.println('time taken without any pool ' + (System.currentTimeMillis()-a));
		a= System.currentTimeMillis();

		for(int i =0;i<5000;i++){
			newPool.submit(new MyRunable (i));
		}

		newPool.stop();
		System.out.println('time taken by myThreadPool ' + (System.currentTimeMillis()-a));
	}

}

class MyRunable implements Callable

{
	int index = -1;
	public MyRunable(int index)
	{
		this.index = index;
	}
	@Override
	public Integer call() throws Exception {
		return index;
	}

}

 

Reference: My Custom Thread Pool Executor in Java from our JCG partner Abhishek Somani at the Java , J2EE , Server blog.

Abhishek Somani

Abhishek is working as a senior java developer in a product start up .He has worked on various java related enterprise applications and frameworks. He loves to explore new technologies
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Bill Williams
Bill Williams
11 years ago

In TestClass, your custom thread pool only runs 1/10 of the transactions as the other test cases. That would account for the increased performance.

Back to top button