Consuming java.util.concurrent.BlockingQueue as rx.Observable
Classical producer-consumer pattern is relatively simple in Java since we have java.util.concurrent.BlockingQueue
. To avoid busy waiting and error-prone manual locking we simply take advantage of put()
and take()
. They both block if queue is full or empty respectively. All we need is a bunch of threads sharing reference to the same queue: some producing and others consuming. And of course the queue has to have a limited capacity, otherwise we will soon run out of memory in case of producers outperforming consumers. Greg Young couldn’t emphasize enough this rule during Devoxx Poland:
Never, ever create an unbounded queue
Producer-consumer using BlockingQueue
Here is a simplest example. First we need a producer that puts objects in a shared queue:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | import lombok.Value; import lombok.extern.slf4j.Slf4j; @Slf4j @Value class Producer implements Runnable { private final BlockingQueue<User> queue; @Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = new User( "User " + System.currentTimeMillis()); "Producing {}" , user); queue.put(user); TimeUnit.SECONDS.sleep( 1 ); } } catch (Exception e) { log.error( "Interrupted" , e); } } } |
Producer simply publishes an instance of User
class (whatever it is) to a given queue every second. Obviously in real life placing User
in a queue would be a result of some action within a system, like user login. Similarly consumer takes new items from a queue and processes them:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 | @Slf4j @Value class Consumer implements Runnable { private final BlockingQueue<User> queue; @Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { final User user = queue.take(); "Consuming: {}" , user); } } catch (Exception e) { log.error( "Interrupted" , e); } } } |
Again in real life processing would mean storing in database or running some fraud detection on a user. We use queue to decouple processing thread from consuming thread, e.g. to reduce latency. To run a simple test let’s spin up few producer and consumer threads:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | BlockingQueue<User> queue = new ArrayBlockingQueue<>(1_000); final List<Runnable> runnables = Arrays.asList( new Producer(queue), new Producer(queue), new Consumer(queue), new Consumer(queue), new Consumer(queue) ); final List<Thread> threads = runnables .stream() .map(runnable -> new Thread(runnable, threadName(runnable))) .peek(Thread::start) .collect(toList()); TimeUnit.SECONDS.sleep( 5 ); threads.forEach(Thread::interrupt); //... private static String threadName(Runnable runnable) { return runnable.getClass().getSimpleName() + "-" + System.identityHashCode(runnable); } |
We have 2 producers and 3 consumers, everything seems to be working. In real life you would probably have some implicit producer threads, like HTTP request handling threads. On the consumer side you would most likely use a thread pool. This pattern works well, but especially the consuming side is quite low-level.
Introducing ObservableQueue<T>
The purpose of this article is to introduce an abstraction that behaves like a queue from producer side but as an Observable
from RxJava on consumer side. In other words we can treats objects added to a queue as a stream that we can map, filter, compose, etc. on the client side. Interestingly, this is no longer a queue under the hood. ObservableQueue<T>
simply forwards all new objects straight to subscribed consumers and doesn’t buffer events in case of no-one listening (“hot” observable). ObservableQueue<T>
is not a queue per-se, it’s just a bridge between one API and the other. It’s similar to java.util.concurrent.SynchronousQueue
, but if no-one is interested in consuming, object is simply discarded.
Here is a first experimental implementation. It’s just a toy code, do not consider it production ready. Also we’ll greatly simplify it later:
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | public class ObservableQueue<T> implements BlockingQueue<T>, Closeable { private final Set<Subscriber<? super T>> subscribers = Collections.newSetFromMap( new ConcurrentHashMap<>()); private final Observable<T> observable = Observable.create(subscriber -> { subscriber.add( new Subscription() { @Override public void unsubscribe() { subscribers.remove(subscriber); } @Override public boolean isUnsubscribed() { return false ; } }); subscribers.add(subscriber); }); public Observable<T> observe() { return observable; } @Override public boolean add(T t) { return offer(t); } @Override public boolean offer(T t) { subscribers.forEach(subscriber -> subscriber.onNext(t)); return true ; } @Override public T remove() { return noSuchElement(); } @Override public T poll() { return null ; } @Override public T element() { return noSuchElement(); } private T noSuchElement() { throw new NoSuchElementException(); } @Override public T peek() { return null ; } @Override public void put(T t) throws InterruptedException { offer(t); } @Override public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException { return offer(t); } @Override public T take() throws InterruptedException { throw new UnsupportedOperationException( "Use observe() instead" ); } @Override public T poll( long timeout, TimeUnit unit) throws InterruptedException { return null ; } @Override public int remainingCapacity() { return 0 ; } @Override public boolean remove(Object o) { return false ; } @Override public boolean containsAll(Collection<?> c) { return false ; } @Override public boolean addAll(Collection<? extends T> c) { c.forEach( this ::offer); return true ; } @Override public boolean removeAll(Collection<?> c) { return false ; } @Override public boolean retainAll(Collection<?> c) { return false ; } @Override public void clear() { } @Override public int size() { return 0 ; } @Override public boolean isEmpty() { return true ; } @Override public boolean contains(Object o) { return false ; } @Override public Iterator<T> iterator() { return Collections.emptyIterator(); } @Override public Object[] toArray() { return new Object[ 0 ]; } @Override public <T> T[] toArray(T[] a) { return a; } @Override public int drainTo(Collection<? super T> c) { return 0 ; } @Override public int drainTo(Collection<? super T> c, int maxElements) { return 0 ; } @Override public void close() throws IOException { subscribers.forEach(rx.Observer::onCompleted); } } |
There are couple of interesting facts about it:
- We must keep track of all subscribers, i.e. consumers that are willing to receive new items. If one of the subscribers is no longer interested, we must remove such subscriber, otherwise memory leak will occur (keep reading!)
- This queue behaves as if it was always empty. It never holds any items – when you put something into this queue, it is automatically passed to subscribers and forgotten
- Technically this queue is unbounded (!), meaning you can put as many items as you want. However since items are passed to all subscribers (if any) and immediately discarded, this queue is actually always empty (see above)
- Still it’s possible that producer is generating too many events and consumers can’t keep up with that – RxJava now has back pressure support, not covered in this article.
Producer can use ObservableQueue<T>
just like any other BlockingQueue<T>
, assuming I implemented queue contract correctly. However the consumer looks much lighter and smarter:
1 2 3 4 5 6 7 | final ObservableQueue<User> users = new ObservableQueue<>(); final Observable<User> observable = users.observe(); users.offer( new User( "A" )); observable.subscribe(user -> "User logged in: {}" , user)); users.offer( new User( "B" )); users.offer( new User( "C" )); |
Code above prints "B"
and "C"
only. "A"
is lost by design since ObservableQueue
drops items in case no one is listening. Obviously Producer
class now uses users
queue. Everything works fine, you can call users.observe()
at any point and apply one of dozens of Observable
operators. However there is one caveat: by default RxJava doesn’t enforce any threading, so consuming happens in the same thread as producing! We lost the most important feature of producer-consumer pattern, i.e. thread decoupling. Luckily everything is declarative in RxJava, thread scheduling as well:
1 2 3 4 5 6 | users .observe() .observeOn(Schedulers.computation()) .forEach(user -> "User logged in: {}" , user) ); |
Now let’s see some real RxJava power. Imagine you want to count how many users log in per second, where each login is placed as an event into a queue:
1 2 3 4 5 6 7 8 | users .observe() .map(User::getName) .filter(name -> !name.isEmpty()) .window( 1 , TimeUnit.SECONDS) .flatMap(Observable::count) .doOnCompleted(() -> "System shuts down" )) .forEach(c -> "Logins in last second: {}" , c)); |
The performance is also acceptable, such queue can accept around 3 million objects per second on my laptop with one subscriber. Treat this class as an adapter from legacy systems using queues to modern reactive world. But wait! Using ObservableQueue<T>
is easy, but the implementation with subscribers
synchronized set seems too low-level. Luckily there is Subject<T, T>
. Subject
is “the other side” of Observable
– you can push events to Subject
but it still implements Observable
, so you can easily create arbitrary Observable
. Look how beautifully ObservableQueue
looks like with one of the Subject
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | public class ObservableQueue<T> implements BlockingQueue<T>, Closeable { private final Subject<T, T> subject = PublishSubject.create(); public Observable<T> observe() { return subject; } @Override public boolean add(T t) { return offer(t); } @Override public boolean offer(T t) { subject.onNext(t); return true ; } @Override public void close() throws IOException { subject.onCompleted(); } @Override public T remove() { return noSuchElement(); } @Override public T poll() { return null ; } @Override public T element() { return noSuchElement(); } private T noSuchElement() { throw new NoSuchElementException(); } @Override public T peek() { return null ; } @Override public void put(T t) throws InterruptedException { offer(t); } @Override public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException { return offer(t); } @Override public T take() throws InterruptedException { throw new UnsupportedOperationException( "Use observe() instead" ); } @Override public T poll( long timeout, TimeUnit unit) throws InterruptedException { return null ; } @Override public int remainingCapacity() { return 0 ; } @Override public boolean remove(Object o) { return false ; } @Override public boolean containsAll(Collection<?> c) { return false ; } @Override public boolean addAll(Collection<? extends T> c) { c.forEach( this ::offer); return true ; } @Override public boolean removeAll(Collection<?> c) { return false ; } @Override public boolean retainAll(Collection<?> c) { return false ; } @Override public void clear() { } @Override public int size() { return 0 ; } @Override public boolean isEmpty() { return true ; } @Override public boolean contains(Object o) { return false ; } @Override public Iterator<T> iterator() { return Collections.emptyIterator(); } @Override public Object[] toArray() { return new Object[ 0 ]; } @Override public <T> T[] toArray(T[] a) { return a; } @Override public int drainTo(Collection<? super T> c) { return 0 ; } @Override public int drainTo(Collection<? super T> c, int maxElements) { return 0 ; } } |
The implementation above is much cleaner and we don’t have to worry about thread synchronization at all.
Reference: | Consuming java.util.concurrent.BlockingQueue as rx.Observable from our JCG partner Tomasz Nurkiewicz at the Java and neighbourhood blog. |
I have 3 comments:
1) You should call subscribers.add() before you setup the Subscription on the Subscriber. In the current order, an asynchronous unsubscribe() call won’t remove the Subscriber if said call happens just before the subscribers.add().
2) You can use Subscriptions.create(() -> subscribers.remove(subscriber)) instead of implementing the Subscription interface directly.
3) Adding values to a BlockingQueue can happen from multiple threads but PublishSubject requires serialized access to its onXXX methods. Use PublishSubject.create().toSerialized() instead.
Thank you very much for this valuable comment. I will fix all found issues in original article ( since I can’t edit this one.