Parallel execution of blocking tasks with RxJava and Completable
” How parallel execution of blocking “side-effect only” (aka void) tasks became easier with Completable
abstraction introduced in RxJava 1.1.1. “
As you may have noticed reading my blog I primarily specialize in Software Craftsmanship and automatic code testing. However, in addition I am an enthusiast of Continuous Delivery and broadly defined concurrency. The last point ranges from pure threads and semaphores in C to more high level solutions such as ReactiveX and the actor model. This time an use case for a very convenient (in specific cases) feature introduced in the brand new RxJava 1.1.1 – rx.Completable
. Similarly to many my blog entries this one is also a reflection of the actual event I encountered working on real tasks and use cases.
A task to do
Imagine a system with quite complex processing of asynchronous events coming from different sources. Filtering, merging, transforming, grouping, enriching and more. RxJava suits here very well, especially if we want to be reactive. Let’s assume we have already implemented it (looks and works nicely) and there is only one more thing left. Before we start processing, it is required to tell 3 external systems that we are ready to receive messages. 3 synchronous calls to legacy systems (via RMI, JMX or SOAP). Each of them can last for a number of seconds and we need to wait for all of them before we start. Luckily, they are already implemented and we treat them as black boxes which may succeed (or fail with an exception). We just need to call them (preferably concurrently) and wait for finish.
rx.Observable – approach 1
Having RxJava at fingertips it looks like the obvious approach. Firstly, job execution can be wrapped with Observable
:
private Observable<Void> rxJobExecute(Job job) { return Observable.fromCallable(() -> { job.execute(); return null; }); }
Unfortunately (in our case) Observable
expects to have some element(s) returned. We need to use Void
and awkward return null
(instead of just method reference job::execute
.
Next, we can use subscribeOn()
method to use another thread to execute our job (and not block the main/current thread – we don’t want to execute our jobs sequentially). Schedulers.io()
provides a scheduler with a set of threads intended for IO-bound work.
Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());
Finally we need to wait for all of them to finish (all Obvervable
s to complete). To do that a zip function can be adapted. It combines items emitted by zipped Obserbable
s by their sequence number. In our case we are interested only in the first pseudo-item from each job Observable
(we emit only null
to satisfy API) and wait for them in a blocking way. A zip function in a zip operator needs to return something, thence we need to repeat a workaround with null
.
Observable.zip(run1, run2, (r1, r2) -> return null) .toBlocking() .single();
It is pretty visible that Observable
was designed to work with streams of values and there is some additional work required to adjust it to side-effect only (returning nothing) operations. The situation is getting even worse when we would need to combine (e.g. merge) our side-effect only operation with other returning some value(s) – an uglier cast is required. See the real use case from the RxNetty API.
public void execute() { Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io()); Observable.zip(run1, run2, (r1, r2) -> null) .toBlocking() .single(); } private Observable<Void> rxJobExecute(Job job) { return Observable.fromCallable(() -> { job.execute(); return null; }); }
rx.Observable – approach 2
There could be another approach used. Instead of generating an artificial item, an empty Observable
with our task can be executed as an onComplete
action. This forces us to switch from zip
operation to merge
. As a result we need to provide an onNext
action (which is never executed for empty Observable
) which confirms us in conviction the we try to hack the system.
public void execute() { Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io()); Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io()); Observable.merge(run1, run2) .toBlocking() .subscribe(next -> {}); } private Observable<Object> rxJobExecute(Job job) { return Observable.empty() .doOnCompleted(job::execute); }
rx.Completable
Better support for Observable that doesn’t return any value has been addressed in RxJava 1.1.1. Completable
can be considered as a stripped version of Observable
which can either finish successfully (onCompleted
event is emitted) or fail (onError
). The easiest way to create an Completable
instance is using of a fromAction
method which takes an Action0
which doesn’t return any value (like Runnable
).
Completable completable1 = Completable.fromAction(job1::execute) .subscribeOn(Schedulers.io()); Completable completable2 = Completable.fromAction(job2::execute) .subscribeOn(Schedulers.io());
Next, we can use merge()
method which returns a Completable
instance that subscribes to all downstream Completable
s at once and completes when all of them complete (or one of them fails). As we used subscribeOn
method with an external scheduler all jobs are executed in parallel (in different threads).
Completable.merge(completable1, completable2) .await();
await()
method blocks until all jobs finish (in a case of error an exception will be rethrown). Pure and simple.
public void execute() { Completable completable1 = Completable.fromAction(job1::execute) .subscribeOn(Schedulers.io()); Completable completable2 = Completable.fromAction(job2::execute) .subscribeOn(Schedulers.io()); Completable.merge(completable1, completable2) .await(); }
java.util.concurrent.CompletableFuture
Some of you could ask: Why not to just use CompletableFuture
? It would be a good question. Whereas pure Future
introduced in Java 5 it could require additional work on our side, ListenableFuture
(from Guava) and CompletableFuture
(from Java 8) make it quite trivial.
First, we need to run/schedule our jobs execution. Next, using CompletableFuture.allOf()
method we can create a new CompletableFuture
which is completed the moment all jobs complete (haven’t we seen that conception before?). get()
method just blocks waiting for that.
public void execute() { try { CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute); CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute); CompletableFuture.allOf(run1, run2) .get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Jobs execution failed", e); } }
We need to do something with checked exceptions (very often we don’t want to pollute our API with them), but in general it looks sensible. However, it is worth to remember that CompletableFuture
falls short when more complex chain processing is required. In addition having RxJava already used in our project it is often useful to use the same (or similar) API instead of introducing something completely new.
Summary
Thanks to rx.Completable
an execution of side-effect only (returning nothing) tasks with RxJava is much more comfortable. In codebase already using RxJava it could be a preferred over CompletableFuture
even for simple cases. However, Completable
provides a lot of more advances operators and techniques and in addition can be easily mixed with Observable
what makes it even more powerful.
To read more about Completable
you may want to see the release notes. For those who want to have a deeper insight into topic there is a very detailed introduction to Completable API on Advanced RxJava blog (part 1 and 2).
- The source code for code examples is available from GitHub.
Btw, if you are interested in RxJava in general, I can with a clear conscience recommend you a book which is currently being written by Tomasz Nurkiewicz and Ben Christensen – Reactive Programming with RxJava.
Reference: | Parallel execution of blocking tasks with RxJava and Completable from our JCG partner Marcin Zajaczkowski at the Solid Soft blog. |