Playing With Java Concurrency
Recently I needed to transform some filet that each has a list (array) of objects in JSON format to files that each has separated lines of the same data (objects). It was a one time task and simple one. I did the reading and writing using some feature of Java nio. I used GSON in the simplest way. One thread runs over the files, converts and writes. The whole operation finished in a few seconds. However, I wanted to play a little bit with concurrency. So I enhanced the tool to work concurrently.
Threads
Runnable for reading file.
The reader threads are submitted to ExecutorService. The output, which is a list of objects (User in the example), will be put in a BlockingQueue.
Runnable for writing file.
Each runnable will poll from the blocking queue. It will write lines of data to a file. I don’t add the writer Runnable to the ExecutorService, but instead just start a thread with it. The runnable has a while(some boolen is true) {...}
pattern. More about that below…
Synchronizing Everything
BlockingQueue is the interface of both types of threads. As the writer runnable runs in a while loop (consumer), I wanted to be able to make it stop so the tool will terminate. So I used two objects for that:
Semaphore
The loop that reads the input files increments a counter. Once I finished traversing the input files and submitted the writers, I initialized a semaphore in the main thread:semaphore.acquire(numberOfFiles);
In each reader runable, I released the semaphore: semaphore.release();
AtomicBoolean
The while loop of the writers uses an AtomicBoolean. As long as AtomicBoolean==true, the writer will continue. In the main thread, just after the acquire of the semaphore, I set the AtomicBoolean to false. This enables the writer threads to terminate.
Using Java NIO
In order to scan, read and write the file system, I used some features of Java NIO.
Scanning: Files.newDirectoryStream(inputFilesDirectory, "*.json");
Deleting output directory before starting: Files.walkFileTree...
BufferedReader and BufferedWriter: Files.newBufferedReader(filePath);
Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());
One note. In order to generate random files for this example, I used apache commons lang: RandomStringUtils.randomAlphabetic
All code in GitHub.
public class JsonArrayToJsonLines { private final static Path inputFilesDirectory = Paths.get("src\\main\\resources\\files"); private final static Path outputDirectory = Paths .get("src\\main\\resources\\files\\output"); private final static Gson gson = new Gson(); private final BlockingQueue<EntitiesData> entitiesQueue = new LinkedBlockingQueue<>(); private AtomicBoolean stillWorking = new AtomicBoolean(true); private Semaphore semaphore = new Semaphore(0); int numberOfFiles = 0; private JsonArrayToJsonLines() { } public static void main(String[] args) throws IOException, InterruptedException { new JsonArrayToJsonLines().process(); } private void process() throws IOException, InterruptedException { deleteFilesInOutputDir(); final ExecutorService executorService = createExecutorService(); DirectoryStream<Path> directoryStream = Files.newDirectoryStream(inputFilesDirectory, "*.json"); for (int i = 0; i < 2; i++) { new Thread(new JsonElementsFileWriter(stillWorking, semaphore, entitiesQueue)).start(); } directoryStream.forEach(new Consumer<Path>() { @Override public void accept(Path filePath) { numberOfFiles++; executorService.submit(new OriginalFileReader(filePath, entitiesQueue)); } }); semaphore.acquire(numberOfFiles); stillWorking.set(false); shutDownExecutor(executorService); } private void deleteFilesInOutputDir() throws IOException { Files.walkFileTree(outputDirectory, new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); return FileVisitResult.CONTINUE; } }); } private ExecutorService createExecutorService() { int numberOfCpus = Runtime.getRuntime().availableProcessors(); return Executors.newFixedThreadPool(numberOfCpus); } private void shutDownExecutor(final ExecutorService executorService) { executorService.shutdown(); try { if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) { executorService.shutdownNow(); } if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) { } } catch (InterruptedException ex) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } } private static final class OriginalFileReader implements Runnable { private final Path filePath; private final BlockingQueue<EntitiesData> entitiesQueue; private OriginalFileReader(Path filePath, BlockingQueue<EntitiesData> entitiesQueue) { this.filePath = filePath; this.entitiesQueue = entitiesQueue; } @Override public void run() { Path fileName = filePath.getFileName(); try { BufferedReader br = Files.newBufferedReader(filePath); User[] entities = gson.fromJson(br, User[].class); System.out.println("---> " + fileName); entitiesQueue.put(new EntitiesData(fileName.toString(), entities)); } catch (IOException | InterruptedException e) { throw new RuntimeException(filePath.toString(), e); } } } private static final class JsonElementsFileWriter implements Runnable { private final BlockingQueue<EntitiesData> entitiesQueue; private final AtomicBoolean stillWorking; private final Semaphore semaphore; private JsonElementsFileWriter(AtomicBoolean stillWorking, Semaphore semaphore, BlockingQueue<EntitiesData> entitiesQueue) { this.stillWorking = stillWorking; this.semaphore = semaphore; this.entitiesQueue = entitiesQueue; } @Override public void run() { while (stillWorking.get()) { try { EntitiesData data = entitiesQueue.poll(100, TimeUnit.MILLISECONDS); if (data != null) { try { String fileOutput = outputDirectory.toString() + File.separator + data.fileName; Path fileOutputPath = Paths.get(fileOutput); BufferedWriter writer = Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset()); for (User user : data.entities) { writer.append(gson.toJson(user)); writer.newLine(); } writer.flush(); System.out.println("=======================================>>>>> " + data.fileName); } catch (IOException e) { throw new RuntimeException(data.fileName, e); } finally { semaphore.release(); } } } catch (InterruptedException e1) { } } } } private static final class EntitiesData { private final String fileName; private final User[] entities; private EntitiesData(String fileName, User[] entities) { this.fileName = fileName; this.entities = entities; } } }
Reference: | Playing With Java Concurrency from our JCG partner Eyal Golan at the Learning and Improving as a Craftsman Developer blog. |
I’m wondering why you use consumers without lambdas.
But anyway, nice example.
I didn’t use lambdas, as this code is prototype for code that will be used in Java 7 environment.
Thanks for the comment.
Nice article.
I wanted to know why did you use newFixedThreadPool rather than newCachedThreadPool?
Hi,
Thanks for reading :)
When I first created the tool I thought that reading the files will take some time.
I also wanted to limit the number of concurrent threads running for reading.
using cached thread pool should be good for short async operations.
I knew that I will have many reading task and didn’t want to explode threads.
having said all that, I will run the tool with cached pool. just to see how it works…
Checked :)
It is much more efficient using the fixed pool.
My machine has 4 cores and the concurrent processing worked better with few fixed number of threads.
I didn’t do actual benchmark. just some system out…
Great, thanks for the answer.