Reactive’s Looming Doom. Part I: Evolution
1. Introduction
Multithreading in Java has come a long evolutionary way from the original monitor concept and threads mapped to native OS threads to modern asynchronous libraries and implementation of lightweight threads (formerly “fibers”) as part of Project Loom. At the same time, while the language was evolving, the community was developing its own frameworks and practices. With the release of Java 8 and wide adoption of some elements of functional programming, frameworks based on principles of reactive systems described in the Reactive Manifesto and, at the API level, in Reactive Streams Specification, began to gain traction.
In this series of posts, we will discuss the practical advantages and disadvantages of reactive frameworks, what tasks they are best suited for and what future is looming ahead for the Reactive approach.
To gain a deep understanding of the issue, it is important to know the history of the decisions taken. This post will demonstrate the evolution of multithreaded APIs. In the provided examples the code represents practices of multithreading programming that are aligned with the API available at the time, so it won’t use the ConcurrentHashMap#compute()
methods in Java 5 code, etc.
Table Of Contents
2. Issues of the traditional approach to multithreading
Before describing the problems of multithreading which is based on the concept of a monitor, it is necessary to clearly define what tasks are solved using multithreading in enterprise development. Such tasks can usually be divided into two categories:
- Parallelization of computations, when the task is divided into subtasks and then aggregated into a result
- Asynchronous code execution, for example, when implementing an event-driven architecture.
In the traditional approach, to work with each of these tasks, the developer has to use an intermediate state for data exchange between threads. And since this is a shared mutable state in a multithreaded environment, a necessity for a careful synchronization of access to that shared state naturally arises.
Let’s see this in practice with an example of a parallelization problem. Suppose you have a task to find the most frequent name in a list of million names. Single-threaded implementation is quite trivial: it is enough to perform one iteration over the array and aggregate the count of a name encountered in the list in a Map
:
// Generate names Random r = new Random(); var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice"); var namesList = IntStream.range(0, 1000000) .mapToObj(__ -> names.get(r.nextInt(names.size()))) .collect(Collectors.toList()); // Aggregate counts Map counts = new HashMap(); for (String name: namesList) { counts.compute(name, (n, c) -> c == null ? 1L : c + 1); } // Find the max count String mostFrequentName = counts.entrySet() .stream() .max(Map.Entry.comparingByValue()) .get() .getKey(); System.out.printf("The most frequent name is %s%n", mostFrequentName);
3. Paleozoic
Now let’s say we need to use all available cores. The most straightforward way to do it is to use the Thread
class to create the required number of threads and distribute the work between them. In addition, there is a need for synchronization of access to the final сounts
, since the values there will be updated from different threads. Fortunately, Java has a rich selection of thread-safe collections (most available since version 1.5), so synchronization can be achieved using a Hashtable
and proper synchronized
on it. With all this in mind, the original list can be divided into batches and processed in parallel in the following way:
public class Main { public static void main(String[] args) throws InterruptedException, ExecutionException { // Generate names Random r = new Random(); var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice"); var namesList = IntStream.range(0, 1000000) .mapToObj(__ -> names.get(r.nextInt(names.size()))) .collect(Collectors.toList()); // Aggregate counts int batchSize = 1000; ArrayList tasks = new ArrayList(); Map finalCounts = new Hashtable(); for (int i = 0; i < namesList.size(); i += batchSize) { int batchStart = i; int batchEnd = Math.min((batchStart + batchSize), namesList.size()); final List batch = namesList.subList(batchStart, batchEnd); // Split into batches final CountTask task = new CountTask(batch, finalCounts); tasks.add(task); task.setDaemon(true); task.start(); } // Wait until the threads finished for (Thread thread: tasks) { thread.join(); } // Find the max count String mostFrequentName = finalCounts.entrySet() .stream() .max(Map.Entry.comparingByValue()) .get() .getKey(); System.out.printf("The most frequent name is %s%n", mostFrequentName); } private static class CountTask extends Thread { private final List batch; private final Map finalCounts; private CountTask(List batch, Map finalCounts) { this.batch = batch; this.finalCounts = finalCounts; } @Override public void run() { Map localCounts = new Hashtable(); System.out.printf("[%s] Processing batch... \n", Thread.currentThread().getName()); for (String name: batch) { localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1); } for (Map.Entry stringLongEntry: localCounts.entrySet()) { synchronized (finalCounts) { final Long existingCount = finalCounts.get(stringLongEntry.getKey()); final var newCount = stringLongEntry.getValue(); if (existingCount == null) { finalCounts.put(stringLongEntry.getKey(), newCount); } else { finalCounts.put(stringLongEntry.getKey(), existingCount + newCount); } } } } } }
There are several issues with this code:
- A dedicated expensive thread is created for each batch
- The
Thread#join
method does not provide an API to get the result of batch processing, only to await the computations. Results are gathered via the mutable state that is shared between threads - Processing requires explicit synchronization of access to
finalCounts
- The code is extremely verbose
4. Mesozoic
To deal with the first issue, it is enough to use thread pools, which were added to the standard library in Java 5 in the form of the ExecutorService class and a set of its standard implementations in the Executors class. To solve the second one, the Future class was added in the same release, that the ExecutorService
produces. The Future
represents an asynchronous computation with the only way to get the result is to call a blocking method Future#get
. With all those additions, the Java 5 solution looks something like this:
public class Main { public static void main(String[] args) throws InterruptedException, ExecutionException { // Generate names Random r = new Random(); var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice"); var namesList = IntStream.range(0, 1000000) .mapToObj(__ -> names.get(r.nextInt(names.size()))) .toList(); int batchSize = 1000; int parallelism = (int) Math.ceil(namesList.size() / (double) batchSize); System.out.printf("Parallelism is %s \n", parallelism); ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { Thread t = new Thread(runnable); t.setDaemon(true); return t; } }); // Split names into batches Map finalCounts = new ConcurrentHashMap(); List<Callable> tasks = new ArrayList(); for (int i = 0; i < namesList.size(); i += batchSize) { int batchStart = i; int batchEnd = Math.min((batchStart + batchSize), namesList.size()); final List batch = namesList.subList(batchStart, batchEnd); tasks.add(new CountTask(batch, finalCounts)); } // Wait until tasks are done executorService.invokeAll(tasks); // Find the max count String mostFrequentName = finalCounts.entrySet() .stream() .max(Map.Entry.comparingByValue()) .get() .getKey(); System.out.printf("The most frequent name is %s%n", mostFrequentName); } private static class CountTask implements Callable { private final List batch; private final Map finalCounts; private CountTask(List batch, Map finalCounts) { this.batch = batch; this.finalCounts = finalCounts; } @Override public Void call() { Map localCounts = new HashMap(); System.out.printf("[%s] Processing batch... \n", Thread.currentThread().getName()); for (String name: batch) { localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1); } for (Map.Entry stringLongEntry: localCounts.entrySet()) { synchronized (finalCounts) { final Long existingCount = finalCounts.get(stringLongEntry.getKey()); final var newCount = stringLongEntry.getValue(); if (existingCount == null) { finalCounts.put(stringLongEntry.getKey(), newCount); } else { finalCounts.put(stringLongEntry.getKey(), existingCount + newCount); } } } return null; } } }
This code uses Runtime.getRuntime().availableProcessors()
threads, thus optimally consuming resources, however, conceptually it is almost identical to the Thread
solution. Moreover, the API is not optimized specifically for this task: you have to implement a workaround in a form of implementing a Callable<Void>
class and producing Future<Void>
from the Executor
in order to await the computations.
5. Modern history
Code from the last example solved almost every issue but remained pretty cumbersome and required explicit ExecutorService#invokeAll
to collect the result. In order to address that, Java 8 provided a CompletableFuture class which works especially well with newly introduced lambdas. The key API difference from the Future
is a possibility to provide an asynchronous callback to invoke when the result is ready instead of blocking on Future#get
.
Also, for the CompletableFuture
dependency on the ExecutorService
was inverted: instead of producing CompletableFutures
via the ExecutorService
, the Executor
was injected into the CompletableFuture
on its creation. It allowed to easily configure which Executor
to use for a specific CompletableFuture
, and even not provide the Executor
at all: by default CompletableFuture
are executed on ForkJoinPool.
The following code illustrates a solution that employs the CompletableFuture
:
public class Main { public static void main(String[] args) { // Generate names Random r = new Random(); var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice"); var namesList = IntStream.range(0, 1000000) .mapToObj(__ -> names.get(r.nextInt(names.size()))) .toList(); // Split into batches int batchSize = 1000; CompletableFuture<Map> finalCountsFuture = IntStream.iterate(0, batchStart -> batchStart batchStart + batchSize) .mapToObj(batchStart -> prepareBatch(namesList, batchStart, batchSize)) .reduce(Main::combineFeatures) .get(); // Wait for the result to be computed Map finalCounts = finalCountsFuture.join(); // Find the max count String mostFrequentName = finalCounts.entrySet() .stream() .max(Map.Entry.comparingByValue()) .get() .getKey(); System.out.printf("The most frequent name is %s%n", mostFrequentName); } private static CompletableFuture<Map> combineFeatures( CompletableFuture<Map> firstFeature, CompletableFuture<Map> secondFeature) { return firstFeature.thenCombineAsync(secondFeature, Main::mergeCounts); } private static Map mergeCounts(Map stringLongMap, Map stringLongMap2) { System.out.printf("[%s] Merging counts... \n", Thread.currentThread().getName()); Map accumulator = new HashMap(stringLongMap); stringLongMap2.forEach((key, value) -> accumulator.compute(key, (n, c) -> c == null ? value : c + value)); return accumulator; } private static CompletableFuture<Map> prepareBatch(List namesList, int batchStart, int batchSize) { return CompletableFuture.supplyAsync(() -> { Map localCounts = new ConcurrentHashMap(); int batchEnd = Math.min((batchStart + batchSize), namesList.size()); System.out.printf("[%s] Processing batch... \n", Thread.currentThread().getName()); for (String name: namesList.subList(batchStart, batchEnd)) { localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1); } return localCounts; }); } }
The main thing to note in this solution is that finalCounts
variable no longer represents a shared mutable state. Instead, it is returned from a CompletableFuture#join
call. This is because the CompletableFuture
API allows combining results of multiple features asynchronously via a collection of then*
methods (thenCombineAsync
in this case). This approach effectively abstracts away the shared mutable state from a programmer, allowing him to express what they want to achieve instead of coding how to achieve that. Such traits are typical for a functional style of programming and monadic API.
Also, it should be noted that no explicit Executor
is defined. As was said earlier, CompletableFuture
employ ForkJoinPool
by default.
While the provided solution eliminated some drawbacks of the Mesozoic code, it is still very basic and inconvenient in some aspects. As an obvious example, there are no ways to reduce
a collection of CompletableFuture
, so you have to additionally employ the Stream
API.
6. Present time
Since the standard API did not provide the necessary level of convenience, the community took matters into their own hands and developed the Reactive Streams Specification, which is ideologically similar to ideas behind the CompletableFuture
. Since Java 9 this specification is also represented in the standard library by the java.util.concurrent.Flow
class, which contains the java.util.concurrent.Flow.Publisher
and java.util.concurrent.Flow.Subscriber
interfaces. However, it should be clarified that reactive streams are not just improved CompletableFuture
. They are a rethinking of the general approach to asynchronous computation, which has some similarity to the CompletableFuture
API. This will be covered in more details in the next post.
There are two implementations of the Reactive Streams Specification on the JVM, RxJava and the more popular Project Reactor. The latter will be used in future examples.
Conceptually, Project Reactor provides two classes with a monadic interface:
- Mono — to process 0 to 1 elements
- Flux — to process from 0 to infinity elements
Mono
functionality is similar to the CompletableFuture
but with an incline to the Reactive Streams Specification and far richer API.
Flux
can be considered a java.util.stream.Stream
where the execution of each chain method (which is called a reactive operator) can be offloaded to a different thread, similar to the CompletableFuture.supplyAsync(Supplier<U>, Executor)
method. Flux
also has rich API for asynchronous processing.
With Project Reactor the solution to the original problem will look like this:
public class Main { public static void main(String[] args) throws InterruptedException, ExecutionException { // Generate names Random r = new Random(); var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice"); var namesList = IntStream.range(0, 1000000) .mapToObj(__ -> names.get(r.nextInt(names.size()))) .collect(Collectors.toList()); int batchSize = 1000; var finalCounts = Flux.fromIterable(namesList) // Split to batches .buffer(batchSize) // Aggregate intermediate counts asynchronously .flatMap(Main::processBatch) .reduce(new HashMap(), Main::mergeIntermediateCount) .flatMapIterable(HashMap::entrySet); String mostFrequentName = MathFlux.max(finalCounts, Map.Entry.comparingByValue()) .map(Map.Entry::getKey) .block(); System.out.printf("The most frequent name is %s%n", mostFrequentName); } private static HashMap mergeIntermediateCount(HashMap acc, Map intermediateResult) { intermediateResult.forEach((name, intermediateCount) -> acc.merge(name, intermediateCount, Long::sum)); return acc; } private static Mono<Map> processBatch(List batch) { return Flux.fromIterable(batch) .groupBy(Function.identity()) .flatMap(group -> group.count().map(count -> Tuples.of(group.key(), count))) .collectMap(Tuple2::getT1, Tuple2::getT2) .doOnSubscribe(__ -> System.out.printf("[%s] Processing batch... \n", Thread.currentThread().getName())) .subscribeOn(Schedulers.boundedElastic()); } }
Since Project Reactor API is designed with common operations of asynchronous data processing, it provides all necessary functionality out of the box, including batch splitting, splitting the batch into groups, counting, etc. It allows code to be uniform and frees a developer from the necessity to create temporary collections and other imperative constructs. Also, this version is multithreaded because of a single line — .subscribeOn(Schedulers.boundedElastic());
: it switches the processing of the batch to a dedicated thread pool. Without this line, code will still work, but on a single thread. it is an important trait of the reactive code: it is concurrency agnostic.
Reactor, like RxJava, can be considered to be concurrency-agnostic. That is, it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.
Reactor docs
All above allows to state that the Reactor API
is superior to the CompletableFuture
, but it comes with serious drawbacks which will be discussed in later posts.
7. Near future
Project Loom is released in a preview state in JDK 19. In addition to the virtual threads, it provides an API for the so-called structured concurrency in an incubator state. Structured concurrency introduces the concept of “scope” for asynchronous tasks, which allows you to congregate asynchronous computation within this “scope” and structure their execution in a way similar to the Java 5 Future
s, but with more capabilities. Those scopes, naturally, use virtual threads in order to carry out submitted tasks.
In practice, Loom solution looks like this:
public class MainVirtualThreads { public static void main(String[] args) throws InterruptedException { Random r = new Random(); var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice"); var namesList = IntStream.range(0, 1000000) .mapToObj(__ -> names.get(r.nextInt(names.size()))) .toList(); try (var scope = new BatchScope()) { int batchSize = 1000; IntStream.iterate(0, batchStart -> batchStart + batchSize) .mapToObj(batchStart -> prepareBatch(namesList, batchStart, batchSize)) .forEach(scope::fork); scope.join(); System.out.println("The most frequent name is " + scope.mostFrequentName()); } } private static Callable<Map> prepareBatch(List namesList, int batchStart, int batchSize) { return () -> { Map localCounts = new ConcurrentHashMap(); int batchEnd = Math.min((batchStart + batchSize), namesList.size()); System.out.printf("[virtual=%s] Processing batch... \n", Thread.currentThread().isVirtual()); for (String name: namesList.subList(batchStart, batchEnd)) { localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1); } return localCounts; }; } private static class BatchScope extends StructuredTaskScope<Map> { private final ConcurrentHashMap result = new ConcurrentHashMap(); @Override protected void handleComplete(Future<Map> future) { final var intermediateResult = future.resultNow(); for (var stringLongEntry: intermediateResult.entrySet()) { result.compute(stringLongEntry.getKey(), (n, c) -> updateCount(stringLongEntry.getValue(), c)); } } private long updateCount(Long newCount, Long existingCount) { return existingCount == null ? newCount : existingCount + newCount; } public String mostFrequentName() { return result.entrySet() .stream() .max(Map.Entry.comparingByValue()) .get() .getKey(); } } }
As you can see, this approach is in some way a return to the Paleozoic approach, but with modifications and improvements, that are in line with contemporary trends and approaches. The new API provides a harmonious relationship between asynchronous tasks and callbacks to collect the result of their work, as well as other features, described in the JEP.
This iteration of API evolution clearly demonstrates that the Java architects decided to move in a way that is more natural to the JVM platform than Reactive: it relies on a familiar (and verbose) imperative approach, but with the performance of virtual threads and the possibility to conveniently structure asynchronous tasks.
But does this mean the unconditional demise of Reactive? Although some Java architects believe that the answer is yes, further posts will demonstrate that there are other possibilities.
8. Conclusion
In this post, the evolution of multithreaded APIs in Java was demonstrated using a simple computational problem as an example. In the next post, we will analyze what “rethinking of the general approach to asynchronous processing” actually stands for and how it can be applied in practice.
9. Download the Source Code
You can download the full source code of this example here:
Reactive’s Looming Doom. Part I: Evolution
There is a big problem. Var doesn’t exist in earlier versions of Java. The var capability was added in Java 10. It’s also a bad idea to use it.
I wouldn’t call this a big problem, it is more of a minor issue :)
I tried to employ appropriate code features only in the multi-threading code, while generation of the test data I decided to do the same everywhere. Sorry for the confusion it caused.
And I think it is too broad to declare all
var
s are a bad idea. They are fine if used carefully :)How do you figure “more popular”? Judging by stars, watches and forks on GitHub, RxJava is 10x more popular. Just curious.
That’s a good point, thanks for noticing: since I come from back-end development with Spring-ish stack, I was under the impression that RxJava was a pioneer of a reactive approach that was surpassed by the Project Reactor. That’s clearly not the case if android development is considered, and in general, RxJava is far from being dead.
I apologize for my bias :)