Small scale stream processing kata. Part 2: RxJava 1.x/2.x
In part 1: thread pools we designed and implemented relatively simple system for processing events in real time. Make sure you read previous part as it contains some classes that we’ll reuse. Just in case here are the requirements:
A system delivers around one thousand events per second. Each Event
has at least two attributes:
clientId
– we expect up to few events per second for one clientUUID
– globally unique
Consuming one event takes about 10 milliseconds. Design a consumer of such stream that:
- allows processing events in real time
- events related to one client should be processed sequentially and in order, i.e. you can not parallelize events for the same
clientId
- if duplicated
UUID
appeared within 10 seconds, drop it. Assume duplicates will not appear after 10 seconds
What we came up so far was a combination of thread pools and shared cache. This time we will implement the solution using RxJava. First of all I never revealed how EventStream
is implemented, only giving the API:
interface EventStream { void consume(EventConsumer consumer); }
In fact for manual testing I built a simple RxJava stream that behaves like the system from the requirements:
@Slf4j class EventStream { void consume(EventConsumer consumer) { observe() .subscribe( consumer::consume, e -> log.error("Error emitting event", e) ); } Observable<Event> observe() { return Observable .interval(1, TimeUnit.MILLISECONDS) .delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS)) .map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())) .flatMap(this::occasionallyDuplicate, 100) .observeOn(Schedulers.io()); } private Observable<Event> occasionallyDuplicate(Event x) { final Observable<Event> event = Observable.just(x); if (Math.random() >= 0.01) { return event; } final Observable<Event> duplicated = event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS); return event.concatWith(duplicated); } }
Understanding how this simulator works is not essential, but quite interesting. First we generate steady stream of Long
values (0
, 1
, 2
…) every millisecond (thousand events per second) using interval()
operator. Then we delay each event by random amount of time between 0
and 1_000
microseconds with delay()
operator. This way events will appears in less predictable moments in time, a bit more realistic situation. Finally we map (using, ekhem, map()
operator) each Long
value to a random Event
with clientId
somewhere between 1_000
and 1_100
(inclusive-exclusive).
The last bit is interesting. We would like to simulate occasional duplicates. In order to do so we map every event (usingflatMap()
) to itself (in 99% of the cases). However in 1% of the cases we return this event twice, where the second occurrence happens between 10 milliseconds and 5 seconds later. In practice the duplicated instance of the event will appear after hundreds of other events, which makes the stream behave really realistically.
There are two ways to interact with the EventStream
– callback based via consume()
and stream based viaobserve()
. We can take advantage of Observable<Event>
to quickly build processing pipeline very similar in functionality to part 1 but much simpler.
Missing backpressure
The first naive approach to take advantage of RxJava falls short very quickly:
EventStream es = new EventStream(); EventConsumer clientProjection = new ClientProjection( new ProjectionMetrics( new MetricRegistry())); es.observe() .subscribe( clientProjection::consume, e -> log.error("Fatal error", e) );
(ClientProjection
, ProjectionMetrics
et. al. come from part 1). We get MissingBackpressureException
almost instantaneously and that was expected. Remember how our first solution was lagging by handling events with more and more latency? RxJava tries to avoid that, as well as avoiding overflow of queues.MissingBackpressureException
is thrown because consumer (ClientProjection
) is incapable of handling events in real time. This is fail-fast behavior. The quickest solution is to move consumption to a separate thread pool, just like before, but using RxJava’s facilities:
EventStream es = new EventStream(); EventConsumer clientProjection = new FailOnConcurrentModification( new ClientProjection( new ProjectionMetrics( new MetricRegistry()))); es.observe() .flatMap(e -> clientProjection.consume(e, Schedulers.io())) .window(1, TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c -> log.info("Processed {} events/s", c), e -> log.error("Fatal error", e) );
EventConsumer
interface has a helper method that can consume events asynchronously on a supplied Scheduler
:
@FunctionalInterface interface EventConsumer { Event consume(Event event); default Observable<Event> consume(Event event, Scheduler scheduler) { return Observable .fromCallable(() -> this.consume(event)) .subscribeOn(scheduler); } }
By consuming events using flatMap()
in a separate Scheduler.io()
each consumption is invoked asynchronously. This time events are processed near real-time, but there is a bigger problem. I decorated ClientProjection
withFailOnConcurrentModification
for a reason. Events are consumed independently from each other so it may happen that two events for the same clientId
are processed concurrently. Not good. Luckily in RxJava solving this problem is much easier than with plain threads:
es.observe() .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume)) .window(1, TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c -> log.info("Processed {} events/s", c), e -> log.error("Fatal error", e) );
A little bit has changed. First of all we group events by clientId
. This splits single Observable
stream into stream of streams. Each substream named byClient
represents all events related to the same clientId
. Now if we map over this substream we can be sure that events related to the same clientId
are never processed concurrently. The outer stream is lazy so we must subscribe to it. Rather than subscribing to every event separately we collect events every second and count them. This way we receive a single event of type Integer
every second representing the number of events consumed per second.
Impure, non-idiomatic, error-prone, unsafe solution of deduplication using global state
Now we must drop duplicate UUID
s. The simplest, yet very foolish way of discarding duplicates is by taking advantage of global state. We can simply filter out duplicates by looking them up in cache available outside of filter()
operator:
final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder() .expireAfterWrite(10, TimeUnit.SECONDS) .build(); es.observe() .filter(e -> seenUuids.getIfPresent(e.getUuid()) == null) .doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid())) .subscribe( clientProjection::consume, e -> log.error("Fatal error", e) );
If you want to monitor the usage of this mechanism simply add metric:
Meter duplicates = metricRegistry.meter("duplicates"); es.observe() .filter(e -> { if (seenUuids.getIfPresent(e.getUuid()) != null) { duplicates.mark(); return false; } else { return true; } })
Accessing global, especially mutable state from inside of operators is very dangerous and undermines the sole purposes of RxJava – simplifying concurrency. Obviously we use thread-safe Cache
from Guava, but in many cases it’s easy to miss places where shared global mutable state is accessed from multiple threads. If you find yourself mutating some variable outside of the operator chain, be very careful.
Custom distinct()
operator in RxJava 1.x
RxJava 1.x has a distinct()
operator that presumably does the job:
es.observe() .distinct(Event::getUuid) .groupBy(Event::getClientId)
Unfortunately distinct()
stores all keys (UUID
s) internally in ever-growing HashSet
. But we only care about duplicates in last 10 seconds! By copy-pasting the implementation of DistinctOperator
I created DistinctEvent
operator that takes advantage of Guava’s cache to only store last 10 seconds worth of UUID’s. I intentionally hard-coded Event
in this operator rather than making it more generic to keep code easier to understand:
class DistinctEvent implements Observable.Operator<Event, Event> { private final Duration duration; DistinctEvent(Duration duration) { this.duration = duration; } @Override public Subscriber<? super Event> call(Subscriber<? super Event> child) { return new Subscriber<Event>(child) { final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder() .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS) .<UUID, Boolean>build().asMap(); @Override public void onNext(Event event) { if (keyMemory.put(event.getUuid(), true) == null) { child.onNext(event); } else { request(1); } } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onCompleted() { child.onCompleted(); } }; } }
The usage is fairly simple and the whole implementation (plus the custom operator) is as short as:
es.observe() .lift(new DistinctEvent(Duration.ofSeconds(10))) .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) ) .window(1, TimeUnit.SECONDS) .flatMap(Observable::count) .subscribe( c -> log.info("Processed {} events/s", c), e -> log.error("Fatal error", e) );
Actually it can be even shorter if you skip logging every second:
es.observe() .lift(new DistinctEvent(Duration.ofSeconds(10))) .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) ) .subscribe( e -> {}, e -> log.error("Fatal error", e) );
This solution is much shorter than previous one based on thread pools and decorators. The only awkward part is custom operator that avoid memory leak when storing too many historic UUID
s. Luckily RxJava 2 to the rescue!
RxJava 2.x and more powerful built-in distinct()
I was actually this close from submitting a PR to RxJava with more powerful implementation of distinct()
operator. But before I checked 2.x
branch and there it was: distinct()
that allows providing custom Collection
as opposed to hard-coded HashSet
. Believe it or not, dependency inversion is not only about Spring framework or Java EE. When a library allows you to provide custom implementation of its internal data structure, this is also DI. First I create a helper method that can build Set<UUID>
backed by Map<UUID, Boolean>
backed by Cache<UUID, Boolean>
. We sure like delegation!
private Set<UUID> recentUuids() { return Collections.newSetFromMap( CacheBuilder.newBuilder() .expireAfterWrite(10, TimeUnit.SECONDS) .<UUID, Boolean>build() .asMap() ); }
Having this method we can implement the whole task using this expression:
es.observe() .distinct(Event::getUuid, this::recentUuids) .groupBy(Event::getClientId) .flatMap(byClient -> byClient .observeOn(Schedulers.io()) .map(clientProjection::consume) ) .subscribe( e -> {}, e -> log.error("Fatal error", e) );
The elegance, the simplicity, the clarity! It reads almost like a problem:
- observe a stream of events
- take only distinct UUIDs into account
- group events by client
- for each client consume them (sequentially)
Hope you enjoyed all these solutions and you find them useful in your daily work.
See also:
- Small scale stream processing kata. Part 1: thread pools
- Small scale stream processing kata. Part 2: RxJava 1.x/2.x
Reference: | Small scale stream processing kata. Part 2: RxJava 1.x/2.x from our JCG partner Tomasz Nurkiewicz at the Java and neighbourhood blog. |