Playing With Java Concurrency
Recently I needed to transform some filet that each has a list (array) of objects in JSON format to files that each has separated lines of the same data (objects). It was a one time task and simple one. I did the reading and writing using some feature of Java nio. I used GSON in the simplest way. One thread runs over the files, converts and writes. The whole operation finished in a few seconds. However, I wanted to play a little bit with concurrency. So I enhanced the tool to work concurrently.
Threads
Runnable for reading file.
The reader threads are submitted to ExecutorService. The output, which is a list of objects (User in the example), will be put in a BlockingQueue.
Runnable for writing file.
Each runnable will poll from the blocking queue. It will write lines of data to a file. I don’t add the writer Runnable to the ExecutorService, but instead just start a thread with it. The runnable has a while(some boolen is true) {...}
pattern. More about that below…
Synchronizing Everything
BlockingQueue is the interface of both types of threads. As the writer runnable runs in a while loop (consumer), I wanted to be able to make it stop so the tool will terminate. So I used two objects for that:
Semaphore
The loop that reads the input files increments a counter. Once I finished traversing the input files and submitted the writers, I initialized a semaphore in the main thread:semaphore.acquire(numberOfFiles);
In each reader runable, I released the semaphore: semaphore.release();
AtomicBoolean
The while loop of the writers uses an AtomicBoolean. As long as AtomicBoolean==true, the writer will continue. In the main thread, just after the acquire of the semaphore, I set the AtomicBoolean to false. This enables the writer threads to terminate.
Using Java NIO
In order to scan, read and write the file system, I used some features of Java NIO.
Scanning: Files.newDirectoryStream(inputFilesDirectory, "*.json");
Deleting output directory before starting: Files.walkFileTree...
BufferedReader and BufferedWriter: Files.newBufferedReader(filePath);
Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());
One note. In order to generate random files for this example, I used apache commons lang: RandomStringUtils.randomAlphabetic
All code in GitHub.
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | public class JsonArrayToJsonLines { private final static Path inputFilesDirectory = Paths.get( "src\\main\\resources\\files" ); private final static Path outputDirectory = Paths .get( "src\\main\\resources\\files\\output" ); private final static Gson gson = new Gson(); private final BlockingQueue<EntitiesData> entitiesQueue = new LinkedBlockingQueue<>(); private AtomicBoolean stillWorking = new AtomicBoolean( true ); private Semaphore semaphore = new Semaphore( 0 ); int numberOfFiles = 0 ; private JsonArrayToJsonLines() { } public static void main(String[] args) throws IOException, InterruptedException { new JsonArrayToJsonLines().process(); } private void process() throws IOException, InterruptedException { deleteFilesInOutputDir(); final ExecutorService executorService = createExecutorService(); DirectoryStream<Path> directoryStream = Files.newDirectoryStream(inputFilesDirectory, "*.json" ); for ( int i = 0 ; i < 2 ; i++) { new Thread( new JsonElementsFileWriter(stillWorking, semaphore, entitiesQueue)).start(); } directoryStream.forEach( new Consumer<Path>() { @Override public void accept(Path filePath) { numberOfFiles++; executorService.submit( new OriginalFileReader(filePath, entitiesQueue)); } }); semaphore.acquire(numberOfFiles); stillWorking.set( false ); shutDownExecutor(executorService); } private void deleteFilesInOutputDir() throws IOException { Files.walkFileTree(outputDirectory, new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); return FileVisitResult.CONTINUE; } }); } private ExecutorService createExecutorService() { int numberOfCpus = Runtime.getRuntime().availableProcessors(); return Executors.newFixedThreadPool(numberOfCpus); } private void shutDownExecutor( final ExecutorService executorService) { executorService.shutdown(); try { if (!executorService.awaitTermination( 120 , TimeUnit.SECONDS)) { executorService.shutdownNow(); } if (!executorService.awaitTermination( 120 , TimeUnit.SECONDS)) { } } catch (InterruptedException ex) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } } private static final class OriginalFileReader implements Runnable { private final Path filePath; private final BlockingQueue<EntitiesData> entitiesQueue; private OriginalFileReader(Path filePath, BlockingQueue<EntitiesData> entitiesQueue) { this .filePath = filePath; this .entitiesQueue = entitiesQueue; } @Override public void run() { Path fileName = filePath.getFileName(); try { BufferedReader br = Files.newBufferedReader(filePath); User[] entities = gson.fromJson(br, User[]. class ); System.out.println( "---> " + fileName); entitiesQueue.put( new EntitiesData(fileName.toString(), entities)); } catch (IOException | InterruptedException e) { throw new RuntimeException(filePath.toString(), e); } } } private static final class JsonElementsFileWriter implements Runnable { private final BlockingQueue<EntitiesData> entitiesQueue; private final AtomicBoolean stillWorking; private final Semaphore semaphore; private JsonElementsFileWriter(AtomicBoolean stillWorking, Semaphore semaphore, BlockingQueue<EntitiesData> entitiesQueue) { this .stillWorking = stillWorking; this .semaphore = semaphore; this .entitiesQueue = entitiesQueue; } @Override public void run() { while (stillWorking.get()) { try { EntitiesData data = entitiesQueue.poll( 100 , TimeUnit.MILLISECONDS); if (data != null ) { try { String fileOutput = outputDirectory.toString() + File.separator + data.fileName; Path fileOutputPath = Paths.get(fileOutput); BufferedWriter writer = Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset()); for (User user : data.entities) { writer.append(gson.toJson(user)); writer.newLine(); } writer.flush(); System.out.println( "=======================================>>>>> " + data.fileName); } catch (IOException e) { throw new RuntimeException(data.fileName, e); } finally { semaphore.release(); } } } catch (InterruptedException e1) { } } } } private static final class EntitiesData { private final String fileName; private final User[] entities; private EntitiesData(String fileName, User[] entities) { this .fileName = fileName; this .entities = entities; } } } |
Reference: | Playing With Java Concurrency from our JCG partner Eyal Golan at the Learning and Improving as a Craftsman Developer blog. |
I’m wondering why you use consumers without lambdas.
But anyway, nice example.
I didn’t use lambdas, as this code is prototype for code that will be used in Java 7 environment.
Thanks for the comment.
Nice article.
I wanted to know why did you use newFixedThreadPool rather than newCachedThreadPool?
Hi,
Thanks for reading :)
When I first created the tool I thought that reading the files will take some time.
I also wanted to limit the number of concurrent threads running for reading.
using cached thread pool should be good for short async operations.
I knew that I will have many reading task and didn’t want to explode threads.
having said all that, I will run the tool with cached pool. just to see how it works…
Checked :)
It is much more efficient using the fixed pool.
My machine has 4 cores and the concurrent processing worked better with few fixed number of threads.
I didn’t do actual benchmark. just some system out…
Great, thanks for the answer.