Reactive’s Looming Doom. Part II: Fundamentals of Reactive
1. Introduction
The last post provided a comparison of multi-threading APIs throughout Java’s lifespan. In this post, we will dive into the Reactive philosophy to see how it differs from the CompletableFuture
API
Table Of Contents
2. Specs
According to the Reactive Streams Specification:
The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. In other words, back pressure is an integral part of this model
From the API standpoint, there are 4 interfaces available as a dedicated library and integrated into JDK starting from Java 9:
-
Publisher,
represents the source of data in the stream. The primary purpose is to provide ability tosubscribe
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }
- Subscriber, represents the receiving side of the stream. The
main purpose is to
request
data from aPublisher
viaSubscription
and provide callbacks for asynchronous processing of the requested data
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
- Subscription, represents a communication interface between a
Publisher
and itsSubscriber
.
It is passed to
theSubscriber
on subscription via a callback and used by aSubscriber
to asynchronously request finite amount of
data from
thePublisher
public static interface Subscription { public void request(long n); public void cancel(); }
- Processor is just
Publisher
andSubscriber
combined
Requested items are not returned synchronously on Subscription#request(long n)
invocation. Instead, the Publisher
invokes the Subscriber#onNext(T item)
callback for each of the requested items. If all the requested items have been processed, the Subscriber
may request more via the Subscription#request(long n)
. This way back-pressure is addressed in the specification, allowing a Subscriber
to request additional data only when it is ready.
It should be noted that the interfaces above describe only the abstract protocol for asynchronous interaction, while real implementations may well be single-threaded. Therefore, in the textbook implementation example given in the specification itself, two types of Subscriber
are demonstrated: SyncSubscriber
, the onNext
method of which is executed by the Publisher
‘s thread, and AsyncSubscriber
, where onNext
is scheduled on a dedicated Executor
. That’s the reason why Reactive code may be considered concurrency agnostic: instead of enforcing a specific concurrency model, it leaves it up to a developer.
3. Implementation
Every method of Flux
and Mono
, the main API of Project Reactor, is based on the fundamental principles described in the specification. Not let’s see how exactly those principles are applied.
3.1 Basics
In Project Reactor, both Flux
and Mono
implement Publisher
, and their APIs are very similar. Therefore a reference to a Flux
method (or operator, in reactive terms) is assumed to be in Mono
as well, unless stated otherwise.
Since Flux
and Mono
are Publisher
s, each subsequent operator is under the hood subscribes to the upstream Publisher
, processes the element according to its implementation, and passes the result downstream. For example, in this case .map()
subscribes to Flux#just
, requests 1 element, applies the passed lambda to the element, passes the result down to .subscribe()
, and requests the next element:
Flux.just(1, 2, 3) .map(integer -> integer + 1) .subscribe(incrementedInteger -> log.info(String.valueOf(incrementedInteger)));
In the solution to the original problem:
- The
.flatMap()
requests for 32 elements (by default) from the.buffer()
- The
.buffer()
requests32 * batchSize
elements from theFlux.fromIterable(namesList)
, collects them into lists and passes down to the.flatMap()
- The
.flatMap()
invokes theMain::processBatch
for each list and passes the result downstream
var finalCounts = Flux.fromIterable(namesList) // Split to batches .buffer(batchSize) // Aggregate intermediate counts asynchronously .flatMap(Main::processBatch) .........
Both .map()
and .buffer()
do not switch a thread and will be executed where the Publihser
code is executed, but this behavior can be configured by a developer via a special operator.
3.2 Operators .subscribe() and .subscribeOn()
As with Java 8 Stream API
, any chain of operators in Reactor must end with a terminal operation. In Reactor such operation generally is .subscribe()
. This operator accepts a Subscriber
and has a number of overloads, including lambdas: .subscribe(elem -> log.info(elem);
. Conceptually, this method is non-blocking: a thread where .subscribe()
is invoked should not be blocked by it, but in reality it is a bit more tricky. The execution context of the Reactive chain is governed by the Flux#subscribeOn
operator. This operator allows a developer to set the thread pool where the operators of the corresponding chain will be executed. In Project Reactor thread pools are represented by a Scheduler
class, with a set of standard general-purpose implementations supplied in a Schedulers
class , for example Schedulers.boundedElastic()
.
By default, the chain is executed by the thread where the .subscribe()
was invoked:
Flux.just(1, 2, 3) .map(integer -> { System.out.printf("Incrementing on thread: %s \n", Thread.currentThread().getName()); return integer + 1; }) .subscribe(integer -> { System.out.printf("Got %s int on thread: %s \n", integer, Thread.currentThread().getName()); }); System.out.printf("I am after the Flux! on thread: %s \n", Thread.currentThread().getName()); // Output: // --- // Incrementing on thread: main // Got 2 int on thread: main // Incrementing on thread: main // Got 3 int on thread: main // Incrementing on thread: main // Got 4 int on thread: main // I am after the Flux! on thread: main
As you can see from the output, the .subscribe()
call behaves as a blocking one since the chain is executed by the main
thread. However, if you add .subscribeOn(Schedulers.boundedElastic())
to the chain:
Flux.just(1, 2, 3) .map(integer -> { System.out.printf("Incrementing on thread: %s \n", Thread.currentThread().getName()); return integer + 1; }) .subscribeOn(Schedulers.boundedElastic()) .subscribe(integer -> { System.out.printf("Got %s int on thread: %s \n", integer, Thread.currentThread().getName()); }); System.out.printf("I am after the Flux! on thread: %s \n", Thread.currentThread().getName()); Thread.sleep(Long.MAX_VALUE); // Output: // --- // I am after the Flux! on thread: main // Incrementing on thread: boundedElastic-1 // Got 2 int on thread: boundedElastic-1 // Incrementing on thread: boundedElastic-1 // Got 3 int on thread: boundedElastic-1 // Incrementing on thread: boundedElastic-1 // Got 4 int on thread: boundedElastic-1
The output shows that the chain execution thread has changed. Moreover, it was necessary to add Thread.sleep()
at the end so that the program does not exit prematurely.
Also, similar to the Stream API
, chain methods will not be executed until .subscribe()
is called, unlike CompletableFuture#supplyAsync
which runs the passed code immediately.
An attentive reader might wonder what is a .block()
that was used in the solution to the original problem instead of .subscribe()
? It’s simple: Mono#block()
is a Subscriber
implementation that blocks the calling thread until the Mono
is finished and returns the element it produces. There is a similar method for Flux
: Flux#blockLast()
. These methods serve as a bridge between blocking and non-blocking APIs, and their overuse is discouraged.
3.3 Operator .flatMap()
This, essential for every monad, operator in Project Reactor has a special meaning. It accepts a function that returns a Publisher for each element of the stream, subscribes to this publisher, and passes the items produced by the created publisher downstream. Unlike the .map()
operator that simply processes an accepted element, with the .flatMap()
a developer has full control over creating an internal Publisher
, including its execution context via subscribeOn()
!
In order to demonstrate this on the existing solution, let’s increase the names count up to 100000000, reduce the batches count to 10, and add some debug output:
public class Main { public static void main(String[] args) throws InterruptedException, ExecutionException { // Generate names Random r = new Random(); var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice"); var namesList = IntStream.range(0, 100000000) .mapToObj(__ -> names.get(r.nextInt(names.size()))) .collect(Collectors.toList()); int batchSize = namesList.size() / 10; var finalCounts = Flux.fromIterable(namesList) // Split to batches .buffer(batchSize) // Aggregate intermediate counts asynchronously .flatMap(Main::processBatch) .reduce(new HashMap<>(), Main::mergeIntermediateCount) .flatMapIterable(HashMap::entrySet); String mostFrequentName = MathFlux.max(finalCounts, Map.Entry.comparingByValue()) .map(Map.Entry::getKey) .block(); System.out.printf("The most frequent name is %s%n", mostFrequentName); } private static HashMap<String, Long> mergeIntermediateCount(HashMap<String, Long> acc, Map<String, Long> intermediateResult) { intermediateResult.forEach((name, intermediateCount) -> acc.merge(name, intermediateCount, Long::sum)); return acc; } private static Mono<Map<String, Long>> processBatch(List<String> batch) { System.out.printf("[%s][%s] Subscribing to batch processing \n", LocalDateTime.now(), Thread.currentThread().getName()); return Flux.fromIterable(batch) .groupBy(Function.identity()) .flatMap(group -> group.count().map(count -> Tuples.of(group.key(), count))) .collectMap(Tuple2::getT1, Tuple2::getT2) .doOnSubscribe(__ -> System.out.printf("[%s][%s] Processing batch... \n", LocalDateTime.now(), Thread.currentThread().getName())) .subscribeOn(Schedulers.boundedElastic()); } } // Output: // --- // [2022-09-29T16:17:07.199396810][main] Subscribing to batch processing // [2022-09-29T16:17:07.292379575][boundedElastic-1] Processing batch... // [2022-09-29T16:17:08.047585945][main] Subscribing to batch processing // [2022-09-29T16:17:08.061301797][boundedElastic-2] Processing batch... // [2022-09-29T16:17:09.287728886][main] Subscribing to batch processing // [2022-09-29T16:17:09.305248432][boundedElastic-3] Processing batch... // [2022-09-29T16:17:10.202591054][main] Subscribing to batch processing // [2022-09-29T16:17:10.203799927][boundedElastic-1] Processing batch... // [2022-09-29T16:17:11.066984735][main] Subscribing to batch processing // [2022-09-29T16:17:11.067669551][boundedElastic-2] Processing batch... // [2022-09-29T16:17:11.385716328][main] Subscribing to batch processing // [2022-09-29T16:17:11.386001934][boundedElastic-3] Processing batch... // [2022-09-29T16:17:11.678548510][main] Subscribing to batch processing // [2022-09-29T16:17:11.678978961][boundedElastic-1] Processing batch... // [2022-09-29T16:17:11.962963502][main] Subscribing to batch processing // [2022-09-29T16:17:11.963282064][boundedElastic-3] Processing batch... // [2022-09-29T16:17:12.263382545][main] Subscribing to batch processing // [2022-09-29T16:17:12.263699716][boundedElastic-1] Processing batch... // [2022-09-29T16:17:13.662411349][main] Subscribing to batch processing // [2022-09-29T16:17:13.662926817][boundedElastic-3] Processing batch... // The most frequent name is Joe
As expected, the outer Flux
chain is executed by the main
thread, since .subscribeOn()
was not called on it. As a result, the lambda passed to .flatMap()
and the subsequent subscription to the Flux
in the processBatch
is executed by the main
thread. At the same time, subscribeOn(Schedulers.boundedElastic())
was called on the inner Flux
, so its statements are executed on the threads provided by the Schedulers.boundedElastic()
. Essentially, in this code the main
thread prepares the batches and offloads their processing to another thread, what can be confirmed by the thread names and timestamps.
In order to achieve optimal performance for this code developer may tweak the batchSize
parameter to increase batches count, as well as add additional subscribeOn()
to the processBatch()
:
.flatMap(group -> group.count().map(count -> Tuples.of(group.key(), count)).subscribeOn(Schedulers.boundedElastic()))
Without the .subscribeOn()
all code will be executed in the main
thread, as expected:
public class Main { public static void main(String[] args) throws InterruptedException, ExecutionException { // Generate names Random r = new Random(); var names = List.of("Joe", "Monica", "Chandler", "Phoebe", "Rachel", "Ross", "Janice"); var namesList = IntStream.range(0, 100000000) .mapToObj(__ -> names.get(r.nextInt(names.size()))) .collect(Collectors.toList()); int batchSize = namesList.size() / 10; var finalCounts = Flux.fromIterable(namesList) // Split to batches .buffer(batchSize) // Aggregate intermediate counts asynchronously .flatMap(Main::processBatch) .reduce(new HashMap<>(), Main::mergeIntermediateCount) .flatMapIterable(HashMap::entrySet); String mostFrequentName = MathFlux.max(finalCounts, Map.Entry.comparingByValue()) .map(Map.Entry::getKey) .block(); System.out.printf("The most frequent name is %s%n", mostFrequentName); } private static HashMap<String, Long> mergeIntermediateCount(HashMap<String, Long> acc, Map<String, Long> intermediateResult) { intermediateResult.forEach((name, intermediateCount) -> acc.merge(name, intermediateCount, Long::sum)); return acc; } private static Mono<Map<String, Long>> processBatch(List<String> batch) { System.out.printf("[%s][%s] Subscribing to batch processing \n", LocalDateTime.now(), Thread.currentThread().getName()); return Flux.fromIterable(batch) .groupBy(Function.identity()) .flatMap(group -> group.count().map(count -> Tuples.of(group.key(), count))) .collectMap(Tuple2::getT1, Tuple2::getT2) .doOnSubscribe(__ -> System.out.printf("[%s][%s] Processing batch... \n", LocalDateTime.now(), Thread.currentThread().getName())); } } // Output: // --- // [2022-09-29T16:20:26.834341489][main] Subscribing to batch processing // [2022-09-29T16:20:26.858483034][main] Processing batch... // [2022-09-29T16:20:28.732505692][main] Subscribing to batch processing // [2022-09-29T16:20:28.733251231][main] Processing batch... // [2022-09-29T16:20:30.245662536][main] Subscribing to batch processing // [2022-09-29T16:20:30.246063404][main] Processing batch... // [2022-09-29T16:20:30.791314849][main] Subscribing to batch processing // [2022-09-29T16:20:30.791522434][main] Processing batch... // [2022-09-29T16:20:31.367503970][main] Subscribing to batch processing // [2022-09-29T16:20:31.367729165][main] Processing batch... // [2022-09-29T16:20:31.998805328][main] Subscribing to batch processing // [2022-09-29T16:20:31.999009391][main] Processing batch... // [2022-09-29T16:20:32.593334820][main] Subscribing to batch processing // [2022-09-29T16:20:32.593585871][main] Processing batch... // [2022-09-29T16:20:33.186949718][main] Subscribing to batch processing // [2022-09-29T16:20:33.187191706][main] Processing batch... // [2022-09-29T16:20:36.217136910][main] Subscribing to batch processing // [2022-09-29T16:20:36.217389675][main] Processing batch... // [2022-09-29T16:20:36.833071426][main] Subscribing to batch processing // [2022-09-29T16:20:36.833353321][main] Processing batch... // The most frequent name is Monica
4. Conclusion
As was demonstrated, the Reactive API has the following traits that differentiate it from the CompletableFuture
API:
- Execution of Reactive streams is deferred until subscription (more on that later)
- Reactive streams are built with the back-pressure in mind
- The execution context of the code can be switched without a modification of the code, e.g. the code is concurrency-agnostic
And, specific to the Project Reactor: there is a lot of functionality in a form of reactive operators that simplify the implementation of multithreading problems greatly. This vast and mature API will be the topic covered in the next post.