Core Java

Playing With Java Concurrency

Recently I needed to transform some filet that each has a list (array) of objects in JSON format to files that each has separated lines of the same data (objects). It was a one time task and simple one. I did the reading and writing using some feature of Java nio. I used GSON in the simplest way. One thread runs over the files, converts and writes. The whole operation finished in a few seconds. However, I wanted to play a little bit with concurrency. So I enhanced the tool to work concurrently.

 
 
 
 

Threads

Runnable for reading file.

The reader threads are submitted to ExecutorService. The output, which is a list of objects (User in the example), will be put in a BlockingQueue.

Runnable for writing file.

Each runnable will poll from the blocking queue. It will write lines of data to a file. I don’t add the writer Runnable to the ExecutorService, but instead just start a thread with it. The runnable has a while(some boolen is true) {...} pattern. More about that below…

Synchronizing Everything

BlockingQueue is the interface of both types of threads. As the writer runnable runs in a while loop (consumer), I wanted to be able to make it stop so the tool will terminate. So I used two objects for that:

Semaphore

The loop that reads the input files increments a counter. Once I finished traversing the input files and submitted the writers, I initialized a semaphore in the main thread:semaphore.acquire(numberOfFiles);

In each reader runable, I released the semaphore: semaphore.release();

AtomicBoolean

The while loop of the writers uses an AtomicBoolean. As long as AtomicBoolean==true, the writer will continue. In the main thread, just after the acquire of the semaphore, I set the AtomicBoolean to false. This enables the writer threads to terminate.

Using Java NIO

In order to scan, read and write the file system, I used some features of Java NIO.

Scanning: Files.newDirectoryStream(inputFilesDirectory, "*.json");
Deleting output directory before starting: Files.walkFileTree...
BufferedReader and BufferedWriter: Files.newBufferedReader(filePath); Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());

One note. In order to generate random files for this example, I used apache commons lang: RandomStringUtils.randomAlphabetic
All code in GitHub.

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
public class JsonArrayToJsonLines {
    private final static Path inputFilesDirectory = Paths.get("src\\main\\resources\\files");
    private final static Path outputDirectory = Paths
            .get("src\\main\\resources\\files\\output");
    private final static Gson gson = new Gson();
     
    private final BlockingQueue<EntitiesData> entitiesQueue = new LinkedBlockingQueue<>();
     
    private AtomicBoolean stillWorking = new AtomicBoolean(true);
    private Semaphore semaphore = new Semaphore(0);
    int numberOfFiles = 0;
 
    private JsonArrayToJsonLines() {
    }
 
    public static void main(String[] args) throws IOException, InterruptedException {
        new JsonArrayToJsonLines().process();
    }
 
    private void process() throws IOException, InterruptedException {
        deleteFilesInOutputDir();
        final ExecutorService executorService = createExecutorService();
        DirectoryStream<Path> directoryStream = Files.newDirectoryStream(inputFilesDirectory, "*.json");
         
        for (int i = 0; i < 2; i++) {
            new Thread(new JsonElementsFileWriter(stillWorking, semaphore, entitiesQueue)).start();
        }
 
        directoryStream.forEach(new Consumer<Path>() {
            @Override
            public void accept(Path filePath) {
                numberOfFiles++;
                executorService.submit(new OriginalFileReader(filePath, entitiesQueue));
            }
        });
         
        semaphore.acquire(numberOfFiles);
        stillWorking.set(false);
        shutDownExecutor(executorService);
    }
 
    private void deleteFilesInOutputDir() throws IOException {
        Files.walkFileTree(outputDirectory, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                Files.delete(file);
                return FileVisitResult.CONTINUE;
            }
        });
    }
 
    private ExecutorService createExecutorService() {
        int numberOfCpus = Runtime.getRuntime().availableProcessors();
        return Executors.newFixedThreadPool(numberOfCpus);
    }
 
    private void shutDownExecutor(final ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
 
            if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {
            }
        } catch (InterruptedException ex) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
 
 
    private static final class OriginalFileReader implements Runnable {
        private final Path filePath;
        private final BlockingQueue<EntitiesData> entitiesQueue;
 
        private OriginalFileReader(Path filePath, BlockingQueue<EntitiesData> entitiesQueue) {
            this.filePath = filePath;
            this.entitiesQueue = entitiesQueue;
        }
 
        @Override
        public void run() {
            Path fileName = filePath.getFileName();
            try {
                BufferedReader br = Files.newBufferedReader(filePath);
                User[] entities = gson.fromJson(br, User[].class);
                System.out.println("---> " + fileName);
                entitiesQueue.put(new EntitiesData(fileName.toString(), entities));
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(filePath.toString(), e);
            }
        }
    }
 
    private static final class JsonElementsFileWriter implements Runnable {
        private final BlockingQueue<EntitiesData> entitiesQueue;
        private final AtomicBoolean stillWorking;
        private final Semaphore semaphore;
 
        private JsonElementsFileWriter(AtomicBoolean stillWorking, Semaphore semaphore,
                BlockingQueue<EntitiesData> entitiesQueue) {
            this.stillWorking = stillWorking;
            this.semaphore = semaphore;
            this.entitiesQueue = entitiesQueue;
        }
 
        @Override
        public void run() {
            while (stillWorking.get()) {
                try {
                    EntitiesData data = entitiesQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (data != null) {
                        try {
                            String fileOutput = outputDirectory.toString() + File.separator + data.fileName;
                            Path fileOutputPath = Paths.get(fileOutput);
                            BufferedWriter writer = Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());
                            for (User user : data.entities) {
                                writer.append(gson.toJson(user));
                                writer.newLine();
                            }
                            writer.flush();
                            System.out.println("=======================================>>>>> " + data.fileName);
                        } catch (IOException e) {
                            throw new RuntimeException(data.fileName, e);
                        } finally {
                            semaphore.release();
                        }
                    }
                } catch (InterruptedException e1) {
                }
            }
        }
    }
 
    private static final class EntitiesData {
        private final String fileName;
        private final User[] entities;
 
        private EntitiesData(String fileName, User[] entities) {
            this.fileName = fileName;
            this.entities = entities;
        }
    }
}
Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Eyal Golan

Eyal is a professional software engineer and an architect. He is a developer and leader of highly sophisticated systems in different areas, such as networking, security, commerce and more.
Subscribe
Notify of
guest


This site uses Akismet to reduce spam. Learn how your comment data is processed.

6 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Dunczyk
Dunczyk
10 years ago

I’m wondering why you use consumers without lambdas.

But anyway, nice example.

Eyal Golan
10 years ago
Reply to  Dunczyk

I didn’t use lambdas, as this code is prototype for code that will be used in Java 7 environment.

Thanks for the comment.

Igor
Igor
10 years ago

Nice article.
I wanted to know why did you use newFixedThreadPool rather than newCachedThreadPool?

Eyal Golan
10 years ago
Reply to  Igor

Hi,
Thanks for reading :)

When I first created the tool I thought that reading the files will take some time.
I also wanted to limit the number of concurrent threads running for reading.

using cached thread pool should be good for short async operations.

I knew that I will have many reading task and didn’t want to explode threads.

having said all that, I will run the tool with cached pool. just to see how it works…

Eyal Golan
10 years ago
Reply to  Eyal Golan

Checked :)
It is much more efficient using the fixed pool.
My machine has 4 cores and the concurrent processing worked better with few fixed number of threads.

I didn’t do actual benchmark. just some system out…

Igor
Igor
10 years ago
Reply to  Eyal Golan

Great, thanks for the answer.

Back to top button