Core Java

Testing Concurrent Applications

This article is part of our Academy Course titled Java Concurrency Essentials.

In this course, you will dive into the magic of concurrency. You will be introduced to the fundamentals of concurrency and concurrent code and you will learn about concepts like atomicity, synchronization and thread safety. Check it out here!

1. Introduction

This article provides an introduction into the testing of multi-threaded applications. We implement a simple blocking queue and test its blocking behavior as well as its behavior and performance under stress test conditions. Finally we shed some light on available frameworks for unit testing of multi-threaded classes.

2. SimpleBlockingQueue

In this section we are going to implement a very basic and simple blocking Queue. This queue should do nothing more than hold the elements we have put into it and give them back when calling get(). The get() should block until a new element is available.

It is clear that the java.util.concurrent package already provides such functionality and that there is no need to implement this again, but for demonstration purposes we do it here in order to show how to test such a class.

As backing data structure for our queue we choose a standard LinkedList from the java.util package. This list is not synchronized and call its get() method does not block. Hence we have to synchronized access to the list and we have to add the blocking functionality. The latter can be implemented with a simple while() loop that calls the wait() method on the list, when the queue is empty. If the queue is not empty, it returns the first element:

public class SimpleBlockingQueue<T> {
	private List<T> queue = new LinkedList<T>();

	public int getSize() {
		synchronized(queue) {
			return queue.size();
		}
	}
	
	public void put(T obj) {
		synchronized(queue) {
			queue.add(obj);
			queue.notify();
		}
	}
	
	public T get() throws InterruptedException  {
		while(true) {
			synchronized(queue) {
				if(queue.isEmpty()) {
					queue.wait();
				} else {
					return queue.remove(0);
				}
			}
		}
	}
}

2.1. Test blocking operations

Although this implementation is very simple, it is not that easy to test all the functionality, especially the blocking feature. When we just call get() on an empty queue, the current thread is blocked until another threads inserts a new item into the queue. This means that we need at least two different threads in our unit test. While the one thread blocks, the other thread waits for some specific time. If during this time the other thread does not execute further code, we can assume that the blocking feature is working. One way to check that the blocking thread is not executing any further code is the addition of some boolean flags that are set, when either an exception was thrown or the line after the get() call was executed:

	private static class BlockingThread extends Thread {
		private SimpleBlockingQueue queue;
		private boolean wasInterrupted = false;
		private boolean reachedAfterGet = false;
		private boolean throwableThrown;

		public BlockingThread(SimpleBlockingQueue queue) {
			this.queue = queue;
		}
		
		public void run() {
			try {
				try {
					queue.get();
				} catch (InterruptedException e) {
					wasInterrupted = true;
				}
				reachedAfterGet = true;
			} catch (Throwable t) {
				throwableThrown = true;
			}
		}

		public boolean isWasInterrupted() {
			return wasInterrupted;
		}

		public boolean isReachedAfterGet() {
			return reachedAfterGet;
		}
		
		public boolean isThrowableThrown() {
			return throwableThrown;
		}
	}

The flag wasInterrupted indicates whether the blocking thread was interrupted, the flag reachedAfterGet shows that the line after the get has been executed and finally the throwableThrown would tell us that any kind of Throwable was thrown. With the getter methods for these flags we can now write a unit test, that first creates an empty queue, starts our BlockingThread, waits for some time and then inserts a new element into the queue.

	@Test
	public void testPutOnEmptyQueueBlocks() throws InterruptedException {
		final SimpleBlockingQueue queue = new SimpleBlockingQueue();
		BlockingThread blockingThread = new BlockingThread(queue);
		blockingThread.start();
		Thread.sleep(5000);
		assertThat(blockingThread.isReachedAfterGet(), is(false));
		assertThat(blockingThread.isWasInterrupted(), is(false));
		assertThat(blockingThread.isThrowableThrown(), is(false));
		queue.put(new Object());
		Thread.sleep(1000);
		assertThat(blockingThread.isReachedAfterGet(), is(true));
		assertThat(blockingThread.isWasInterrupted(), is(false));
		assertThat(blockingThread.isThrowableThrown(), is(false));
		blockingThread.join();
	}

Before the insertion all flags should be false. If this is the case, we put a new element into the queue and check that the flag reachedAfterGet is set to true. All other flags should still be false. Finally we can join() the blockingThread.

2.2. Testing for correctness

The previous test has just shown how to test a blocking operation. Even more interesting is the real multi-threading test case, where we have more than one thread adding elements to the queue and a bunch of worker threads that retrieve these values from the queue. Basically this means to create a few producer threads that insert new elements into the queue and to setup a bunch of worker threads that call get().

But how do we know that the worker threads have got exactly the same elements from the queue, which the producer threads have inserted before? One possible solution would be to have a second queue where we add and remove the elements based on some unique id (e.g. a UUID). But as we are in a multi-threaded environment, we also have to synchronize access to this second queue and the creation of some unique ID also enforces some kind of synchronization.

A better solution would be some mathematical means that can be implemented without any additional synchronization. The easiest means is here to use integer values as queue elements, which are retrieved from a thread local random generator. The special class ThreadLocalRandom manages a random generator for each thread; hence we don’t have any synchronization overhead. While the producer threads compute the sum over the elements inserted by them, the worker threads also compute their local sum. At the end sum over all producer threads is compared to the sum of all consumer threads. If it is the same, we can assume that with high probability we have retrieved all tasks that have been inserted before.

The following unit test implements these ideas by submitting the consumer and producer threads as tasks to a fixed thread pool:

	@Test
	public void testParallelInsertionAndConsumption() throws InterruptedException, ExecutionException {
		final SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
		ExecutorService threadPool = Executors.newFixedThreadPool(NUM_THREADS);
		final CountDownLatch latch = new CountDownLatch(NUM_THREADS);
		List<Future<Integer>> futuresPut = new ArrayList<Future<Integer>>();
		for (int i = 0; i < 3; i++) {
			Future<Integer> submit = threadPool.submit(new Callable<Integer>() {
				public Integer call() {
					int sum = 0;
					for (int i = 0; i < 1000; i++) {
						int nextInt = ThreadLocalRandom.current().nextInt(100);
						queue.put(nextInt);
						sum += nextInt;
					}
					latch.countDown();
					return sum;
				}
			});
			futuresPut.add(submit);
		}
		List<Future<Integer>> futuresGet = new ArrayList<Future<Integer>>();
		for (int i = 0; i < 3; i++) {
			Future<Integer> submit = threadPool.submit(new Callable<Integer>() {
				public Integer call() {
					int count = 0;
					try {
						for (int i = 0; i < 1000; i++) {
							Integer got = queue.get();
							count += got;
						}
					} catch (InterruptedException e) {

					}
					latch.countDown();
					return count;
				}
			});
			futuresGet.add(submit);
		}
		latch.await();
		int sumPut = 0;
		for (Future<Integer> future : futuresPut) {
			sumPut += future.get();
		}
		int sumGet = 0;
		for (Future<Integer> future : futuresGet) {
			sumGet += future.get();
		}
		assertThat(sumPut, is(sumGet));
	}

We use a CountDownLatch in order to wait until all threads have finished. Finally we can compute the sum over all submitted and retrieved integers and assert that both are equal.

The order in which the different threads are executed is hard to predict. It depends on many dynamic factors like interrupts handled by the operating system and how the scheduler chooses the next thread to execute. In order to achieve more context switches, one can call the method Thread.yield(). This gives the scheduler a hint that the current thread is willing to yield the CPU in favor of another thread. As the javadoc states, this is only a hint, i.e. the JVM can completely ignore this hint and execute the current thread further. But for testing purposes one may use this method to introduce more context switches and therefore provoke race conditions, etc.


 

2.3. Testing performance

Another aspect next to the correct behavior of a class is its performance. In many real world applications performance is an important requirement and therefore has to be tested.

We can utilize an ExecutorService to setup a varying number of threads. Each thread is supposed to insert an element into our queue and take it afterwards from it. Within an outer loop we increment the number of threads in order to see how the number of threads impacts the throughput.

	@Test
	public void testPerformance() throws InterruptedException {
		for (int numThreads = 1; numThreads < THREADS_MAX; numThreads++) {
			long startMillis = System.currentTimeMillis();
			final SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
			ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
			for (int i = 0; i < numThreads; i++) {
				threadPool.submit(new Runnable() {
					public void run() {
						for (long i = 0; i < ITERATIONS; i++) {
							int nextInt = ThreadLocalRandom.current().nextInt(100);
							try {
								queue.put(nextInt);
								nextInt = queue.get();
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
						}
					}
				});
			}
			threadPool.shutdown();
			threadPool.awaitTermination(5, TimeUnit.MINUTES);
			long totalMillis = System.currentTimeMillis() - startMillis;
			double throughput = (double)(numThreads * ITERATIONS * 2) / (double) totalMillis;
			System.out.println(String.format("%s with %d threads: %dms (throughput: %.1f ops/s)", LinkedBlockingQueue.class.getSimpleName(), numThreads, totalMillis, throughput));
		}
	}

To get a feeling on how our simple queue implementation performs, we can compare it to an implementation from the JDK. A candidate for this is the LinkedBlockingQueue. Its two methods put() and take() work similar to our implementation, expect the circumstance that LinkedBlockingQueue is optionally bounded and therefore has to keep track of the number of inserted elements and lets the current thread sleep if the queue is full. This functionality needs additional bookkeeping and checking on insert operations. On the other hand the JDK implementation does not use synchronized blocks and has been implemented with tedious performance measurements.

When we implement the same test case as above using the LinkedBlockingQueue, we get the following output for both test cases:

Figure 1
Figure 1

The figure clearly shows that the throughput rate, i.e. the number of operations performed per time unit, is nearly twice as good for the LinkedBlockingQueue implementation. But it also shows that with only one thread both implementations make about the same number of operations per second. Adding more threads improves the throughput, although the curve converges very soon against its saturation value. Adding more threads does not improve the performance of the application any more.

3. Testing frameworks

Instead of writing your own framework for implementing multi-threaded test cases for your application, you can have look at the available test framework. This section puts a light on two of them: JMock and Grobo Utils.

3.1. JMock

For stress testing purposes the mocking framework JMock provides the class Blitzer. This class implements functionality similar to what we have done in section “Testing for correctness” as it internally set ups a ThreadPool to which tasks are submitted that execute some specific action. You provide the number of tasks/actions to perform as well as the number of threads to the constructor:

Blitzer blitzer = new Blitzer(25000, 6);

This instance has a method blitz() to which you just provide an implementation of the Runnable interface:

	@Test
	public void stressTest() throws InterruptedException {
		final SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
		blitzer.blitz(new Runnable() {
			public void run() {
				try {
					queue.put(42);
					queue.get();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
		assertThat(queue.getSize(), is(0));
	}

The Blitzer class therefore makes the implementation of stress tests even simpler than it is with an ExecutorService.

3.2. Grobo Utils

Grobo Utils is a framework providing support for testing multi-threaded applications. The ideas behind the framework are described in article.

Similar to the previous example we have the class MultiThreadedTestRunner that internally constructs a thread pool and executes a given number of Runnable implementations as separate threads. The Runnable instances have to implement a special interface called TestRunnable. It is worth mentioning that its only method runTest() throws an exception. This way, exceptions thrown within the threads have an influence on the test result. This is not the case when we use a normal ExecutorService. Here the tasks have to implement Runnable and its only method run() does not throw any exception. Exceptions thrown within these tasks are swallowed and don’t break the test.

After having constructed the MultiThreadedTestRunner we can call its runTestRunnables() method and provide the number of milliseconds it should wait until the test should fail. Finally the assertThat() call verifies that the queue is empty again, as all test threads have removed the previously added element.

public class SimpleBlockingQueueGroboUtilTest {

	private static class MyTestRunnable extends TestRunnable {
		private SimpleBlockingQueue<Integer> queue;

		public MyTestRunnable(SimpleBlockingQueue<Integer> queue) {
			this.queue = queue;
		}

		@Override
		public void runTest() throws Throwable {
			for (int i = 0; i < 1000000; i++) {
				this.queue.put(42);
				this.queue.get();
			}
		}
	}

	@Test
	public void stressTest() throws Throwable {
		SimpleBlockingQueue<Integer> queue = new SimpleBlockingQueue<Integer>();
		TestRunnable[] testRunnables = new TestRunnable[6];
		for (int i = 0; i < testRunnables.length; i++) {
			testRunnables[i] = new MyTestRunnable(queue);
		}
		MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(testRunnables);
		mttr.runTestRunnables(2 * 60 * 1000);
		assertThat(queue.getSize(), is(0));
	}
}

Martin Mois

Martin is a Java EE enthusiast and works for an international operating company. He is interested in clean code and the software craftsmanship approach. He also strongly believes in automated testing and continuous integration.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

4 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Jacob Zimmerman
9 years ago

It looks like you put the wrong article under this heading…

Eleftheria Kiourtzoglou

Updated! Thank you Jacob.

Xiaolu
Xiaolu
9 years ago

In the 2.2 example code, is NUM_THREADS equals to 3?

Martin Mois
Martin Mois
9 years ago
Reply to  Xiaolu

NUM_THREADS is 6, as we submit six instances to the pool.

Back to top button