Core Java

Playing With Java Concurrency

Recently I needed to transform some filet that each has a list (array) of objects in JSON format to files that each has separated lines of the same data (objects). It was a one time task and simple one. I did the reading and writing using some feature of Java nio. I used GSON in the simplest way. One thread runs over the files, converts and writes. The whole operation finished in a few seconds. However, I wanted to play a little bit with concurrency. So I enhanced the tool to work concurrently.

 
 
 
 

Threads

Runnable for reading file.

The reader threads are submitted to ExecutorService. The output, which is a list of objects (User in the example), will be put in a BlockingQueue.

Runnable for writing file.

Each runnable will poll from the blocking queue. It will write lines of data to a file. I don’t add the writer Runnable to the ExecutorService, but instead just start a thread with it. The runnable has a while(some boolen is true) {...} pattern. More about that below…

Synchronizing Everything

BlockingQueue is the interface of both types of threads. As the writer runnable runs in a while loop (consumer), I wanted to be able to make it stop so the tool will terminate. So I used two objects for that:

Semaphore

The loop that reads the input files increments a counter. Once I finished traversing the input files and submitted the writers, I initialized a semaphore in the main thread:semaphore.acquire(numberOfFiles);

In each reader runable, I released the semaphore: semaphore.release();

AtomicBoolean

The while loop of the writers uses an AtomicBoolean. As long as AtomicBoolean==true, the writer will continue. In the main thread, just after the acquire of the semaphore, I set the AtomicBoolean to false. This enables the writer threads to terminate.

Using Java NIO

In order to scan, read and write the file system, I used some features of Java NIO.

Scanning: Files.newDirectoryStream(inputFilesDirectory, "*.json");
Deleting output directory before starting: Files.walkFileTree...
BufferedReader and BufferedWriter: Files.newBufferedReader(filePath); Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());

One note. In order to generate random files for this example, I used apache commons lang: RandomStringUtils.randomAlphabetic
All code in GitHub.

public class JsonArrayToJsonLines {
	private final static Path inputFilesDirectory = Paths.get("src\\main\\resources\\files");
	private final static Path outputDirectory = Paths
			.get("src\\main\\resources\\files\\output");
	private final static Gson gson = new Gson();
	
	private final BlockingQueue<EntitiesData> entitiesQueue = new LinkedBlockingQueue<>();
	
	private AtomicBoolean stillWorking = new AtomicBoolean(true);
	private Semaphore semaphore = new Semaphore(0);
	int numberOfFiles = 0;

	private JsonArrayToJsonLines() {
	}

	public static void main(String[] args) throws IOException, InterruptedException {
		new JsonArrayToJsonLines().process();
	}

	private void process() throws IOException, InterruptedException {
		deleteFilesInOutputDir();
		final ExecutorService executorService = createExecutorService();
		DirectoryStream<Path> directoryStream = Files.newDirectoryStream(inputFilesDirectory, "*.json");
		
		for (int i = 0; i < 2; i++) {
			new Thread(new JsonElementsFileWriter(stillWorking, semaphore, entitiesQueue)).start();
		}

		directoryStream.forEach(new Consumer<Path>() {
			@Override
			public void accept(Path filePath) {
				numberOfFiles++;
				executorService.submit(new OriginalFileReader(filePath, entitiesQueue));
			}
		});
		
		semaphore.acquire(numberOfFiles);
		stillWorking.set(false);
		shutDownExecutor(executorService);
	}

	private void deleteFilesInOutputDir() throws IOException {
		Files.walkFileTree(outputDirectory, new SimpleFileVisitor<Path>() {
			@Override
			public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
				Files.delete(file);
				return FileVisitResult.CONTINUE;
			}
		});
	}

	private ExecutorService createExecutorService() {
		int numberOfCpus = Runtime.getRuntime().availableProcessors();
		return Executors.newFixedThreadPool(numberOfCpus);
	}

	private void shutDownExecutor(final ExecutorService executorService) {
		executorService.shutdown();
		try {
			if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {
				executorService.shutdownNow();
			}

			if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {
			}
		} catch (InterruptedException ex) {
			executorService.shutdownNow();
			Thread.currentThread().interrupt();
		}
	}


	private static final class OriginalFileReader implements Runnable {
		private final Path filePath;
		private final BlockingQueue<EntitiesData> entitiesQueue;

		private OriginalFileReader(Path filePath, BlockingQueue<EntitiesData> entitiesQueue) {
			this.filePath = filePath;
			this.entitiesQueue = entitiesQueue;
		}

		@Override
		public void run() {
			Path fileName = filePath.getFileName();
			try {
				BufferedReader br = Files.newBufferedReader(filePath);
				User[] entities = gson.fromJson(br, User[].class);
				System.out.println("---> " + fileName);
				entitiesQueue.put(new EntitiesData(fileName.toString(), entities));
			} catch (IOException | InterruptedException e) {
				throw new RuntimeException(filePath.toString(), e);
			}
		}
	}

	private static final class JsonElementsFileWriter implements Runnable {
		private final BlockingQueue<EntitiesData> entitiesQueue;
		private final AtomicBoolean stillWorking;
		private final Semaphore semaphore;

		private JsonElementsFileWriter(AtomicBoolean stillWorking, Semaphore semaphore,
				BlockingQueue<EntitiesData> entitiesQueue) {
			this.stillWorking = stillWorking;
			this.semaphore = semaphore;
			this.entitiesQueue = entitiesQueue;
		}

		@Override
		public void run() {
			while (stillWorking.get()) {
				try {
					EntitiesData data = entitiesQueue.poll(100, TimeUnit.MILLISECONDS);
					if (data != null) {
						try {
							String fileOutput = outputDirectory.toString() + File.separator + data.fileName;
							Path fileOutputPath = Paths.get(fileOutput);
							BufferedWriter writer = Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());
							for (User user : data.entities) {
								writer.append(gson.toJson(user));
								writer.newLine();
							}
							writer.flush();
							System.out.println("=======================================>>>>> " + data.fileName);
						} catch (IOException e) {
							throw new RuntimeException(data.fileName, e);
						} finally {
							semaphore.release();
						}
					}
				} catch (InterruptedException e1) {
				}
			}
		}
	}

	private static final class EntitiesData {
		private final String fileName;
		private final User[] entities;

		private EntitiesData(String fileName, User[] entities) {
			this.fileName = fileName;
			this.entities = entities;
		}
	}
}

Eyal Golan

Eyal is a professional software engineer and an architect. He is a developer and leader of highly sophisticated systems in different areas, such as networking, security, commerce and more.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

6 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Dunczyk
Dunczyk
9 years ago

I’m wondering why you use consumers without lambdas.

But anyway, nice example.

Eyal Golan
9 years ago
Reply to  Dunczyk

I didn’t use lambdas, as this code is prototype for code that will be used in Java 7 environment.

Thanks for the comment.

Igor
Igor
9 years ago

Nice article.
I wanted to know why did you use newFixedThreadPool rather than newCachedThreadPool?

Eyal Golan
9 years ago
Reply to  Igor

Hi,
Thanks for reading :)

When I first created the tool I thought that reading the files will take some time.
I also wanted to limit the number of concurrent threads running for reading.

using cached thread pool should be good for short async operations.

I knew that I will have many reading task and didn’t want to explode threads.

having said all that, I will run the tool with cached pool. just to see how it works…

Eyal Golan
9 years ago
Reply to  Eyal Golan

Checked :)
It is much more efficient using the fixed pool.
My machine has 4 cores and the concurrent processing worked better with few fixed number of threads.

I didn’t do actual benchmark. just some system out…

Igor
Igor
9 years ago
Reply to  Eyal Golan

Great, thanks for the answer.

Back to top button