Use reactive streams API to combine akka-streams with rxJava
Just a quick article this time, since I’m still experimenting with this stuff. There is a lot of talk around reactive programming. In Java 8 we’ve got the Stream API, we got rxJava we got ratpack and Akka has got akka-streams.
The main issue with these implementations is that they aren’t compatible. You can’t connect the subscriber of one implementation to the publisher of another. Luckily an initiative has started to provide a way that these different implementations can work together:
“It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application.”
From – http://www.reactive-streams.org/
How does this work
Now how do we do this? Lets look at a quick example based on the akka-stream provided examples (from here). In the following listing:
package sample.stream import akka.actor.ActorSystem import akka.stream.FlowMaterializer import akka.stream.scaladsl.{SubscriberSink, PublisherSource, Source} import com.google.common.collect.{DiscreteDomain, ContiguousSet} import rx.RxReactiveStreams import rx.Observable; import scala.collection.JavaConverters._ object BasicTransformation { def main(args: Array[String]): Unit = { // define an implicit actorsystem and import the implicit dispatcher implicit val system = ActorSystem("Sys") import system.dispatcher // flow materializer determines how the stream is realized. // this time as a flow between actors. implicit val materializer = FlowMaterializer() // input text for the stream. val text = """|Lorem Ipsum is simply dummy text of the printing and typesetting industry. |Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, |when an unknown printer took a galley of type and scrambled it to make a type |specimen book.""".stripMargin // create an observable from a simple list (this is in rxjava style) val first = Observable.from(text.split("\\s").toList.asJava); // convert the rxJava observable to a publisher val publisher = RxReactiveStreams.toPublisher(first); // based on the publisher create an akka source val source = PublisherSource(publisher); // now use the akka style syntax to stream the data from the source // to the sink (in this case this is println) source. map(_.toUpperCase). // executed as actors filter(_.length > 3). foreach { el => // the sink/consumer println(el) }. onComplete(_ => system.shutdown()) // lifecycle event } }
The code comments in this example explain pretty much what is happening. What we do here is we create a rxJava based Observable. Convert this Observable to a “reactive streams” publisher and use this publisher to create an akka-streams source. For the rest of the code we can use the akka-stream style flow API to model the stream. In this case we just do some filtering and print out the result.
Reference: | Use reactive streams API to combine akka-streams with rxJava from our JCG partner Jos Dirksen at the Smart Java blog. |