Java

Reactive’s Looming Doom. Part I: Evolution

1. Introduction

Multithreading in Java has come a long evolutionary way from the original monitor concept and threads mapped to native OS threads to modern asynchronous libraries and implementation of lightweight threads (formerly “fibers”) as part of Project Loom. At the same time, while the language was evolving, the community was developing its own frameworks and practices. With the release of Java 8 and wide adoption of some elements of functional programming, frameworks based on principles of reactive systems described in the Reactive Manifesto and, at the API level, in Reactive Streams Specification, began to gain traction.

In this series of posts, we will discuss the practical advantages and disadvantages of reactive frameworks, what tasks they are best suited for and what future is looming ahead for the Reactive approach.

To gain a deep understanding of the issue, it is important to know the history of the decisions taken. This post will demonstrate the evolution of multithreaded APIs. In the provided examples the code represents practices of multithreading programming that are aligned with the API available at the time, so it won’t use the ConcurrentHashMap#compute() methods in Java 5 code, etc.

 

2. Issues of the traditional approach to multithreading

Before describing the problems of multithreading which is based on the concept of a monitor, it is necessary to clearly define what tasks are solved using multithreading in enterprise development. Such tasks can usually be divided into two categories:

  1. Parallelization of computations, when the task is divided into subtasks and then aggregated into a result
  2. Asynchronous code execution, for example, when implementing an event-driven architecture.

In the traditional approach, to work with each of these tasks, the developer has to use an intermediate state for data exchange between threads. And since this is a shared mutable state in a multithreaded environment, a necessity for a careful synchronization of access to that shared state naturally arises.

Let’s see this in practice with an example of a parallelization problem. Suppose you have a task to find the most frequent name in a list of million names. Single-threaded implementation is quite trivial: it is enough to perform one iteration over the array and aggregate the count of a name encountered in the list in a Map:

// Generate names
Random r = new Random();
var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice");
var namesList = IntStream.range(0, 1000000)
    .mapToObj(__ -> names.get(r.nextInt(names.size())))
    .collect(Collectors.toList());

// Aggregate counts
Map counts = new HashMap();
for (String name: namesList)
{
    counts.compute(name, (n, c) -> c == null ? 1L : c + 1);
}

// Find the max count
String mostFrequentName = counts.entrySet()
    .stream()
    .max(Map.Entry.comparingByValue())
    .get()
    .getKey();

System.out.printf("The most frequent name is %s%n", mostFrequentName);

3. Paleozoic

Now let’s say we need to use all available cores. The most straightforward way to do it is to use the Thread class to create the required number of threads and distribute the work between them. In addition, there is a need for synchronization of access to the final сounts, since the values there will be updated from different threads. Fortunately, Java has a rich selection of thread-safe collections (most available since version 1.5), so synchronization can be achieved using a Hashtable and proper synchronized on it. With all this in mind, the original list can be divided into batches and processed in parallel in the following way:

public class Main
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        // Generate names
        Random r = new Random();
        var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice");
        var namesList = IntStream.range(0, 1000000)
            .mapToObj(__ -> names.get(r.nextInt(names.size())))
            .collect(Collectors.toList());

        // Aggregate counts
        int batchSize = 1000;
        ArrayList tasks = new ArrayList();
        Map finalCounts = new Hashtable();
        for (int i = 0; i < namesList.size(); i += batchSize)
        {
            int batchStart = i;
            int batchEnd = Math.min((batchStart + batchSize), namesList.size());
            final List batch = namesList.subList(batchStart, batchEnd);
            // Split into batches 
            final CountTask task = new CountTask(batch, finalCounts);
            tasks.add(task);
            task.setDaemon(true);
            task.start();
        }

        // Wait until the threads finished
        for (Thread thread: tasks)
        {
            thread.join();
        }

        // Find the max count
        String mostFrequentName = finalCounts.entrySet()
            .stream()
            .max(Map.Entry.comparingByValue())
            .get()
            .getKey();

        System.out.printf("The most frequent name is %s%n", mostFrequentName);
    }

    private static class CountTask extends Thread
    {
        private final List batch;
        private final Map finalCounts;

        private CountTask(List batch, Map finalCounts)
        {
            this.batch = batch;
            this.finalCounts = finalCounts;
        }

        @Override
        public void run()
        {
            Map localCounts = new Hashtable();
            System.out.printf("[%s] Processing batch... \n", Thread.currentThread().getName());
            for (String name: batch)
            {
                localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1);
            }
            for (Map.Entry stringLongEntry: localCounts.entrySet())
            {
                synchronized (finalCounts)
                {
                    final Long existingCount = finalCounts.get(stringLongEntry.getKey());
                    final var newCount = stringLongEntry.getValue();
                    if (existingCount == null)
                    {
                        finalCounts.put(stringLongEntry.getKey(), newCount);
                    }
                    else
                    {
                        finalCounts.put(stringLongEntry.getKey(), existingCount + newCount);
                    }
                }
            }
        }
    }
}

There are several issues with this code:

  1. A dedicated expensive thread is created for each batch
  2. The Thread#join method does not provide an API to get the result of batch processing, only to await the computations. Results are gathered via the mutable state that is shared between threads
  3. Processing requires explicit synchronization of access to finalCounts
  4. The code is extremely verbose

4. Mesozoic

To deal with the first issue, it is enough to use thread pools, which were added to the standard library in Java 5 in the form of the ExecutorService class and a set of its standard implementations in the Executors class. To solve the second one, the Future class was added in the same release, that the ExecutorService produces. The Future represents an asynchronous computation with the only way to get the result is to call a blocking method Future#get. With all those additions, the Java 5 solution looks something like this:

public class Main
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        // Generate names
        Random r = new Random();
        var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice");
        var namesList = IntStream.range(0, 1000000)
            .mapToObj(__ -> names.get(r.nextInt(names.size())))
            .toList();

        int batchSize = 1000;
        int parallelism = (int) Math.ceil(namesList.size() / (double) batchSize);
        System.out.printf("Parallelism is %s \n", parallelism);
        ExecutorService executorService =
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
                new ThreadFactory()
                {
                    @Override
                    public Thread newThread(Runnable runnable)
                    {
                        Thread t = new Thread(runnable);
                        t.setDaemon(true);
                        return t;
                    }
                });

        // Split names into batches
        Map finalCounts = new ConcurrentHashMap();
        List<Callable> tasks = new ArrayList();
        for (int i = 0; i < namesList.size(); i += batchSize)
        {
            int batchStart = i;
            int batchEnd = Math.min((batchStart + batchSize), namesList.size());
            final List batch = namesList.subList(batchStart, batchEnd);
            tasks.add(new CountTask(batch, finalCounts));
        }

        // Wait until tasks are done
        executorService.invokeAll(tasks);


        // Find the max count
        String mostFrequentName = finalCounts.entrySet()
            .stream()
            .max(Map.Entry.comparingByValue())
            .get()
            .getKey();

        System.out.printf("The most frequent name is %s%n", mostFrequentName);
    }

    private static class CountTask implements Callable
    {
        private final List batch;
        private final Map finalCounts;

        private CountTask(List batch, Map finalCounts)
        {
            this.batch = batch;
            this.finalCounts = finalCounts;
        }

        @Override
        public Void call()
        {
            Map localCounts = new HashMap();
            System.out.printf("[%s] Processing batch... \n", Thread.currentThread().getName());
            for (String name: batch)
            {
                localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1);
            }
            for (Map.Entry stringLongEntry: localCounts.entrySet())
            {
                synchronized (finalCounts)
                {
                    final Long existingCount = finalCounts.get(stringLongEntry.getKey());
                    final var newCount = stringLongEntry.getValue();
                    if (existingCount == null)
                    {
                        finalCounts.put(stringLongEntry.getKey(), newCount);
                    }
                    else
                    {
                        finalCounts.put(stringLongEntry.getKey(), existingCount + newCount);
                    }
                }
            }
            return null;
        }
    }
}

This code uses Runtime.getRuntime().availableProcessors() threads, thus optimally consuming resources, however, conceptually it is almost identical to the Thread solution. Moreover, the API is not optimized specifically for this task: you have to implement a workaround in a form of implementing a Callable<Void> class and producing Future<Void> from the Executor in order to await the computations.

5. Modern history

Code from the last example solved almost every issue but remained pretty cumbersome and required explicit ExecutorService#invokeAll to collect the result. In order to address that, Java 8 provided a CompletableFuture class which works especially well with newly introduced lambdas. The key API difference from the Future is a possibility to provide an asynchronous callback to invoke when the result is ready instead of blocking on Future#get.

Also, for the CompletableFuture dependency on the ExecutorService was inverted: instead of producing CompletableFutures via the ExecutorService, the Executor was injected into the CompletableFuture on its creation. It allowed to easily configure which Executor to use for a specific CompletableFuture, and even not provide the Executor at all: by default CompletableFuture are executed on ForkJoinPool.

The following code illustrates a solution that employs the CompletableFuture:

public class Main
{
    public static void main(String[] args)
    {
        // Generate names
        Random r = new Random();
        var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice");
        var namesList = IntStream.range(0, 1000000)
            .mapToObj(__ -> names.get(r.nextInt(names.size())))
            .toList();

        // Split into batches
        int batchSize = 1000;
        CompletableFuture<Map> finalCountsFuture =
            IntStream.iterate(0, batchStart -> batchStart  batchStart + batchSize)
                .mapToObj(batchStart -> prepareBatch(namesList, batchStart, batchSize))
                .reduce(Main::combineFeatures)
                .get();
                
        // Wait for the result to be computed
        Map finalCounts = finalCountsFuture.join();

        // Find the max count
        String mostFrequentName = finalCounts.entrySet()
            .stream()
            .max(Map.Entry.comparingByValue())
            .get()
            .getKey();

        System.out.printf("The most frequent name is %s%n", mostFrequentName);
    }

    private static CompletableFuture<Map> combineFeatures(
        CompletableFuture<Map> firstFeature,
        CompletableFuture<Map> secondFeature)
    {
        return firstFeature.thenCombineAsync(secondFeature, Main::mergeCounts);
    }

    private static Map mergeCounts(Map stringLongMap, Map stringLongMap2)
    {
        System.out.printf("[%s] Merging counts... \n", Thread.currentThread().getName());
        Map accumulator = new HashMap(stringLongMap);
        stringLongMap2.forEach((key, value) -> accumulator.compute(key, (n, c) -> c == null ? value : c + value));
        return accumulator;
    }

    private static CompletableFuture<Map> prepareBatch(List namesList, int batchStart,
                                                                     int batchSize)
    {
        return CompletableFuture.supplyAsync(() -> {
            Map localCounts = new ConcurrentHashMap();
            int batchEnd = Math.min((batchStart + batchSize), namesList.size());
            System.out.printf("[%s] Processing batch... \n", Thread.currentThread().getName());
            for (String name: namesList.subList(batchStart, batchEnd))
            {
                localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1);
            }
            return localCounts;
        });
    }
}

The main thing to note in this solution is that finalCounts variable no longer represents a shared mutable state. Instead, it is returned from a CompletableFuture#join call. This is because the CompletableFuture API allows combining results of multiple features asynchronously via a collection of then* methods (thenCombineAsync in this case). This approach effectively abstracts away the shared mutable state from a programmer, allowing him to express what they want to achieve instead of coding how to achieve that. Such traits are typical for a functional style of programming and monadic API.

Also, it should be noted that no explicit Executor is defined. As was said earlier, CompletableFuture employ ForkJoinPool by default.

While the provided solution eliminated some drawbacks of the Mesozoic code, it is still very basic and inconvenient in some aspects. As an obvious example, there are no ways to reduce a collection of CompletableFuture, so you have to additionally employ the Stream API.

6. Present time

Since the standard API did not provide the necessary level of convenience, the community took matters into their own hands and developed the Reactive Streams Specification, which is ideologically similar to ideas behind the CompletableFuture. Since Java 9 this specification is also represented in the standard library by the java.util.concurrent.Flow class, which contains the java.util.concurrent.Flow.Publisher and java.util.concurrent.Flow.Subscriber interfaces. However, it should be clarified that reactive streams are not just improved CompletableFuture. They are a rethinking of the general approach to asynchronous computation, which has some similarity to the CompletableFuture API. This will be covered in more details in the next post.

There are two implementations of the Reactive Streams Specification on the JVM, RxJava and the more popular Project Reactor. The latter will be used in future examples.

Conceptually, Project Reactor provides two classes with a monadic interface:

  • Mono — to process 0 to 1 elements
  • Flux — to process from 0 to infinity elements

Mono functionality is similar to the CompletableFuture but with an incline to the Reactive Streams Specification and far richer API.

Flux can be considered a java.util.stream.Stream where the execution of each chain method (which is called a reactive operator) can be offloaded to a different thread, similar to the CompletableFuture.supplyAsync(Supplier<U>, Executor) method. Flux also has rich API for asynchronous processing.

With Project Reactor the solution to the original problem will look like this:

public class Main
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        // Generate names
        Random r = new Random();
        var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice");
        var namesList = IntStream.range(0, 1000000)
            .mapToObj(__ -> names.get(r.nextInt(names.size())))
            .collect(Collectors.toList());


        int batchSize = 1000;
        var finalCounts = Flux.fromIterable(namesList)
            // Split to batches
            .buffer(batchSize)
            // Aggregate intermediate counts asynchronously
            .flatMap(Main::processBatch)
            .reduce(new HashMap(), Main::mergeIntermediateCount)
            .flatMapIterable(HashMap::entrySet);
        String mostFrequentName = MathFlux.max(finalCounts, Map.Entry.comparingByValue())
            .map(Map.Entry::getKey)
            .block();


        System.out.printf("The most frequent name is %s%n", mostFrequentName);
    }

    private static HashMap mergeIntermediateCount(HashMap acc,
                                                                Map intermediateResult)
    {
        intermediateResult.forEach((name, intermediateCount) -> acc.merge(name, intermediateCount,
            Long::sum));
        return acc;
    }

    private static Mono<Map> processBatch(List batch)
    {

        return Flux.fromIterable(batch)
            .groupBy(Function.identity())
            .flatMap(group -> group.count().map(count -> Tuples.of(group.key(), count)))
            .collectMap(Tuple2::getT1, Tuple2::getT2)
            .doOnSubscribe(__ -> System.out.printf("[%s] Processing batch... \n", Thread.currentThread().getName()))
            .subscribeOn(Schedulers.boundedElastic());
    }
}

Since Project Reactor API is designed with common operations of asynchronous data processing, it provides all necessary functionality out of the box, including batch splitting, splitting the batch into groups, counting, etc. It allows code to be uniform and frees a developer from the necessity to create temporary collections and other imperative constructs. Also, this version is multithreaded because of a single line — .subscribeOn(Schedulers.boundedElastic());: it switches the processing of the batch to a dedicated thread pool. Without this line, code will still work, but on a single thread. it is an important trait of the reactive code: it is concurrency agnostic.

Reactor, like RxJava, can be considered to be concurrency-agnostic. That is, it does not enforce a concurrency model. Rather, it leaves you, the developer, in command. However, that does not prevent the library from helping you with concurrency.

Reactor docs

All above allows to state that the Reactor API is superior to the CompletableFuture, but it comes with serious drawbacks which will be discussed in later posts.

7. Near future

Project Loom is released in a preview state in JDK 19. In addition to the virtual threads, it provides an API for the so-called structured concurrency in an incubator state. Structured concurrency introduces the concept of “scope” for asynchronous tasks, which allows you to congregate asynchronous computation within this “scope” and structure their execution in a way similar to the Java 5 Futures, but with more capabilities. Those scopes, naturally, use virtual threads in order to carry out submitted tasks.

In practice, Loom solution looks like this:

public class MainVirtualThreads
{

    public static void main(String[] args) throws InterruptedException
    {

        Random r = new Random();
        var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice");
        var namesList = IntStream.range(0, 1000000)
            .mapToObj(__ -> names.get(r.nextInt(names.size())))
            .toList();


        try (var scope = new BatchScope())
        {
            int batchSize = 1000;

            IntStream.iterate(0, batchStart -> batchStart + batchSize)
                .mapToObj(batchStart -> prepareBatch(namesList, batchStart, batchSize))
                .forEach(scope::fork);

            scope.join();

            System.out.println("The most frequent name is " + scope.mostFrequentName());
        }
    }

    private static Callable<Map> prepareBatch(List namesList, int batchStart, int batchSize)
    {
        return () -> {
            Map localCounts = new ConcurrentHashMap();
            int batchEnd = Math.min((batchStart + batchSize), namesList.size());
            System.out.printf("[virtual=%s] Processing batch... \n", Thread.currentThread().isVirtual());
            for (String name: namesList.subList(batchStart, batchEnd))
            {
                localCounts.compute(name, (n, c) -> c == null ? 1L : c + 1);
            }
            return localCounts;
        };
    }

    private static class BatchScope extends StructuredTaskScope<Map>
    {

        private final ConcurrentHashMap result = new ConcurrentHashMap();

        @Override
        protected void handleComplete(Future<Map> future)
        {
            final var intermediateResult = future.resultNow();
            for (var stringLongEntry: intermediateResult.entrySet())
            {
                result.compute(stringLongEntry.getKey(), (n, c) -> updateCount(stringLongEntry.getValue(), c));
            }
        }

        private long updateCount(Long newCount, Long existingCount)
        {
            return existingCount == null ? newCount : existingCount + newCount;
        }

        public String mostFrequentName()
        {
            return result.entrySet()
                .stream()
                .max(Map.Entry.comparingByValue())
                .get()
                .getKey();
        }
    }
}

As you can see, this approach is in some way a return to the Paleozoic approach, but with modifications and improvements, that are in line with contemporary trends and approaches. The new API provides a harmonious relationship between asynchronous tasks and callbacks to collect the result of their work, as well as other features, described in the JEP.

This iteration of API evolution clearly demonstrates that the Java architects decided to move in a way that is more natural to the JVM platform than Reactive: it relies on a familiar (and verbose) imperative approach, but with the performance of virtual threads and the possibility to conveniently structure asynchronous tasks.

But does this mean the unconditional demise of Reactive? Although some Java architects believe that the answer is yes, further posts will demonstrate that there are other possibilities.

8. Conclusion

In this post, the evolution of multithreaded APIs in Java was demonstrated using a simple computational problem as an example. In the next post, we will analyze what “rethinking of the general approach to asynchronous processing” actually stands for and how it can be applied in practice.

9. Download the Source Code

Download

You can download the full source code of this example here:
Reactive’s Looming Doom. Part I: Evolution

Ivan Vyazmitinov

Tech Lead at Vizor Games, primarily focused on data-warehousing and JVM technologies, excited about the Open-Source community, and willing to share knowledge and experience.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

4 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Joseph
Joseph
2 years ago

There is a big problem. Var doesn’t exist in earlier versions of Java. The var capability was added in Java 10. It’s also a bad idea to use it.

Darren
Darren
2 years ago

RxJava and the more popular Project Reactor

How do you figure “more popular”? Judging by stars, watches and forks on GitHub, RxJava is 10x more popular. Just curious.

Back to top button