Scala

Developing Modern Applications with Scala: Reactive Applications

This article is part of our Academy Course titled Developing Modern Applications with Scala.

In this course, we provide a framework and toolset so that you can develop modern Scala applications. We cover a wide range of topics, from SBT build and reactive applications, to testing and database acceess. With our straightforward tutorials, you will be able to get your own projects up and running in minimum time. Check it out here!

1. Introduction

In the last couple of years many software systems, used by millions and even billions of people every day, have started to face unprecedented scalability requirements. In many regard the traditional software architectures were pushed to its limits, unveiling the urgent need to come up with other architectural styles which better suit the demands of the modern world.

This was the moment where another paradigm, reactive programming, has started to emerge and widespread very fast. Along with functional programming, reactive programming has shaken the industry and uncovered a whole new class of applications, which we call reactive applications these days.

2. Being Reactive

Building applications in accordance to reactive programming paradigm implies to follow a different architectural style and design principles, which are best described in The Reactive Manifesto, published by Jonas Bonér around 2013.

  • Responsive: the system responds in a timely manner if at all possible.
  • Resilient: the system stays responsive in the face of failure.
  • Elastic: the system stays responsive under varying workload.
  • Message Driven: reactive systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency.

Needless to say that each and every of those principles makes a lot of sense and all together they represent a perfect recipe for building modern software applications. But certainly, there is no silver bullet in there. There is no magic which will suddenly make any application or system reactive. It is a combination of asynchronous and non-blocking programming, message-passing concurrency complemented by immutability and back pressure, to name a few key ones.

Along this tutorial we are going to learn about all the necessary building blocks for developing truly reactive applications using Scala programming language and ecosystem, focusing in this section on the first one of them, reactive streams.

3. Reactive Streams Specification

The purpose of reactive streams is to serve as a solid foundation for asynchronous stream processing with non-blocking back pressure. In this regards, reactive streams specification stands out as an ongoing effort to provide an interoperable standard which conforming implementations are going to follow.

The reactive streams specification is written strictly following the principles outlined by The Reactive Manifesto and the first official version 1.0.0 for JVM platform has been already released.

4. Reactive Streams in the Wild

Once the official reactive streams API for JVM platform went public, a number of very popular open source projects announced the immediate availability of compliant implementations. Although the complete list includes a dozen of those, here are just a few best-known ones:

It is worth to mention that RxJava is one of the first and most advanced JVM libraries which introduced the powerful principles of the reactive programming paradigm to Java developers. Although it also has a port to Scala language, called RxScala, we are going to focus on Akka Streams, pure Scala-based implementation of the reactive streams specification.

Akka Streams: Reactive Streams Implementation

As we already briefly mentioned Akka Streams is the just a part of more comprehensive Akka Toolkit distribution. The latest released version of Akka Toolkit at the moment of writing is 2.4.8 and this is the one we are going to use in the rest of the section.

However, do not panic if your version of Akka Toolkit is not the latest one, the implementation of reactive streams specification is provided by Akka Streams since quite a long time.

5.1. Basic Concepts

Akka Streams is built on top of a just a few basic pieces, which interoperate with each other and allow to describe very complex stream processing pipelines.

  • Source: something with exactly one output stream (conceptually, represents a Publisher)
  • Sink: something with exactly one input stream (conceptually, represents a Subscriber)
  • Flow: something with exactly one input and one output stream (conceptually, represents a Processor)
  • BidiFlow: something with exactly two input streams and two output streams
  • Graph: a stream processing topology that accepts certain inputs and exposes certain outputs

For the curious readers, Akka Streams fully implements reactive streams specification but hides this fact behind more concise user-facing API abstractions, introducing own basic primitives. That is why if you look at the reactive streams API for JVM, you may not find the straightforward matches to Akka Streams ones, although the respective transformations are supported.

One of the key design goals of the Akka Streams is reusability. All the building blocks described above could be shared or/and composed into more complex stream processing topologies as we are going to see pretty soon.

5.2. Materialization

Akka Streams makes a very clear separation between stream definition and actual stream execution. The mechanism of taking an arbitrary stream definition and providing all the necessary resources it needs to run is called materialization in Akka Streams terminology.

Essentially, the materializer implementation could be anything but by default Akka Streams provides ActorMaterializer which basically maps different processing stages using Actors. It also implies that in general stream processing is going to be fully asynchronous and message-driven.

In the upcoming section of the tutorial, “Concurrency and parallelism: Akka”, we are going to talk about Actors and Actor Model in a great details. Luckily, Akka Streams does a really great job by hiding from us the unnecessary abstractions so the actors will not pop up anywhere else beside ActorMaterializer.

5.3. Sources and Sinks

The input data is the starting point of any stream processing. It could be anything: file, collection, network socket, stream, future, you name it. In the Akka Streams API, this input is represented by parameterized Source[+Out, +Mat] class, where:

  • Out is the type of the elements which source outputs
  • Mat is the type of the some additional value which source may produce (often set to NotUsed but more about that later)

For convenience, Source object has a lot of factory methods which simplify the wrapping of the typical inputs into respective Source class instances, for example:

val source: Source[Int, NotUsed] = Source(1 to 10)

val source: Source[Int, NotUsed] = Source(Set(1, 2, 3, 4, 5))

val source: Source[String, NotUsed] = Source.single("Reactive Streams")

val source: Source[ByteString, _] = FileIO.fromPath(file)

val source: Source[Int, _] = Source.tick(1 second, 10 seconds, Random.nextInt())

Having the input is already enough to start simple stream processing. But as we already know, defining Source will not actually do anything till the materialization moment. The Source class (and some others like Flow f.e.) has a family of so called terminal functions: run() and runWith(). The call to any of this functions triggers materialization process, requiring the materializer to be provided implicitly or explicitly. For example:

implicit val system = ActorSystem("reactive-streams")
implicit val materializer = ActorMaterializer()
  
val numbers = List(100, 200, 300, 400, 500)
val source: Source[Int, NotUsed] = Source(numbers)
  
source
  .runForeach { println _ }
  .onComplete { _ => system.terminate() }

Once the execution of the stream processing terminates, each number is going to be printed out on a console:

100
200
300
400
500

Interestingly, in the code snippet above the call to runForeach under the hood is using another Akka Streams abstraction, Sink, which essentially is a consumer of the stream input at different stages. So our example could be rewritten like that:

source
  .runWith { Sink.foreach { println _ } }
  .onComplete { _ => system.terminate() }

And as you would certainly expect, Source class supports a wide range of functions to transform or process the stream elements, which are called processing stages in Akka Streams, for example:

source
  .map { _ * 2 }
  .runForeach { println _ }
  .onComplete { _ => system.terminate() }

Please notice that the processing stages never modify the current stream definition but rather return a new processing stage. And last, but not least: Akka Streams do not allow null to flow through the stream as an element, something the people coming from Java background should take extreme care of.

5.4. Flows

If Source and Sink are just abstractions over outputs and inputs, Flow is this kind of glue which essentially hooks them up together. Let us get back to the example with the numbers but this time we are going to read them from the file.

val file: Path = Paths.get(getClass.getResource("/numbers.txt").toURI())
val source: Source[ByteString, _] = FileIO.fromPath(file)

The numbers.txt is just a plain old text file, where every line contains some arbitrary number, for example:

100
200
300
400
500

It might sound trivial but let us take a look on the Out type of the Source: it is actually ByteString (more precisely, the file is going to be read in ByteString chunks). This is not really what we want, we would like to read file line by line, number by number, but how we can do that? Luckily, Akka Streams has out of the box support for that in form of Framing and we only need to define the transformation from the stream of ByteString to stream of regular integer numbers:

val flow: Flow[ByteString, Int, _] = Flow[ByteString]
  .via(Framing.delimiter(ByteString("\r\n"), 10, true))
  .map { _.utf8String.toInt }

Here, we have just defined a Flow! It is not attached nor to input, nor to output. However, it could be reused by any stream processing definition, for example:

source
  .via(flow)
  .filter { _ > 200 }
  .runForeach { println _ }

Or we can define yet another stream processing pipeline just to count how many lines we have been processed overall. This is where the Mat type is going to be handy as we are going to use the value from one of the Sinks as the final result, for example:

val future: Future[Int] = source
  .via(flow)
  .toMat(Sink.fold[Int, Int](0){(acc, _) => acc + 1 })(Keep.right)
  .run
  
future.onSuccess { case count => 
  println(s"Lines processed: $count") 
}

Pretty simple, isn’t it? It is worth to make one important note regarding elements ordering guarantees: Akka Streams preserves the input order of elements in most cases (but some processing stages may not do that).


 

5.5. Graphs and BidiFlows

While Flow is really powerful abstraction, it is limited to only one input and only one output. It could be enough for many use cases but complex stream processing scenarios require more flexibility. Let us get introduced to Graphs and BidiFlows.

Graph may use arbitrary number of inputs and outputs and form really complex topologies but in the nutshell, they are composed out of simple Flows. To illustrate Graphs in action, let us consider this example. The organization is ready to pay its employees annual bonus but all management positions gets 10k bonus to a base salary, while others will get only 5k. The complete stream processing could be illustrated by the image below.

Graph

With expressive Graph DSL, Akka Streams makes it very simple to build really complex scenarios, although ours is pretty naïve.

val employees = List(
  Employee("Tom", "manager", 50000),
  Employee("Bob", "employee", 20000),
  Employee("Mark", "employee", 20000),
  Employee("John", "manager", 55000),
  Employee("Dilan", "employee", 35000)      
)

val graph = GraphDSL.create() { implicit builder => 
  import GraphDSL.Implicits._
   
  val in = Source(employees)
  val out = Sink.foreach { println _ }

  val broadcast = builder.add(Broadcast[Employee](2))
  val merge = builder.add(Merge[Employee](2))

  val manager = Flow[Employee]
    .filter { _.position == "manager" }
    .map { e => e.copy(salary = e.salary + 10000) } 
    
  val employee = Flow[Employee]
    .filter { _.position != "manager" }
    .map { e => e.copy(salary = e.salary + 5000) }

  in ~> broadcast ~> manager  ~> merge ~> out
        broadcast ~> employee ~> merge
          
  ClosedShape
}

The ClosedShape at the end of the Graph means that we have defined a fully connected graph, were all inputs and outputs are plugged in. Fully connected graph could be converted to RunnableGraph and actually executed, for example:

RunnableGraph
  .fromGraph(graph)
  .run

We will see the expected output in the console, once the graph execution completes. Everyone has a respective bonus added to his/her salary:

Employee(Tom,manager,60000)
Employee(Bob,employee,25000)
Employee(Mark,employee,25000)
Employee(John,manager,65000)
Employee(Dilan,employee,40000)

As most of other basic pieces, Graph and RunnableGraph are freely shareable. One of the consequences of that is the ability to construct partial graphs and combine different Sources, Sinks, and Flows together.

In the examples we have seen so far the data flows through the stream in one direction only. BidiFlow is a special case of the graph where there are two flows which go in opposite directions. The best illustration of BidiFlow is a typical request / response communication, for example:

case class Request(payload: ByteString)
case class Response(payload: ByteString)
  
val server = GraphDSL.create() { implicit builder => 
  import GraphDSL.Implicits._
   
  val out = builder.add(Flow[Request].map { _.payload.utf8String })
  val in = builder.add(Flow[String].map {s => Response(ByteString(s.reverse))})

  BidiShape.fromFlows(out, in)
}

In this simplistic example the request’s payload is just reversed and wrapped up as response payload. The server is actually a graph and to create a BidiFlow instance we have to use fromGraph factory method, like this:

val bidiFlow = BidiFlow.fromGraph(server)

We are now ready to materialize and run the BidiFlow instance by supplying a simple request and wiring the both bidiFlow flows together directly.

Source
  .single(Request(ByteString("BidiFlow Example")))
  .via(bidiFlow.join(Flow[String]))
  .map(_.payload.utf8String)
  .runWith(Sink.foreach { println _ })
  .onComplete { _ => system.terminate() }

Not surprising but the reversed version of the “BidiFlow Example” string is going to be printed out in the console:

elpmaxE wolFidiB

5.6. Back-Pressure

The concept of back-pressure is one of the foundational in the reactive streams philosophy. Akka Streams tries very hard to keep stream processing healthy by guaranteeing that publisher will never emit more elements than subscriber is able to handle. In addition to controlling the demand and propagating back-pressure from downstream flows to upstream ones, Akka Streams supports usage of buffering and throttling for more fine-grained and advanced back-pressure management.

5.7. Handling Errors

With any more or less real-life scenario, the stream processing pipelines built with Akka Streams would refer to application specific logic, including database access or external service calls. Errors may and will happen, resulting in premature stream processing termination.

Luckily, Akka Streams provides at least three strategies to handle exceptions raised as part of the application code execution:

  • Stop: the stream is completed with failure (the default strategy)
  • Resume: the element is dropped and the stream continues
  • Restart: the element is dropped and the stream continues after restarting the stage

Those are heavily influenced by the Actor Model which Akka Streams uses to materialize the stream processing flows and also are referred to as supervision strategies. To illustrate how it works, let us consider a simple stream which uses numbers as a source and raises an exception every time it encounters an even number.

val source = Source
  .unfold(0) { e => Some(e + 1, e + 1) }
  .map { e => if (e % 2 != 0) e else throw new IllegalArgumentException("Only odd numbers are allowed") }
  .withAttributes(ActorAttributes.supervisionStrategy(_ => Supervision.Resume))
  .take(10)
  .runForeach { println _ }
  .onComplete { _ => system.terminate() }

Without supervision, the stream will end once the number 2 is emitted. But with resuming strategy, the stream continues an execution from where it left, skipping the problematic element. As expected, we should see only odd numbers in the console:

1
3
5
7
9
11
13
15
17
19

5.8. Testing

As we are going to see, most of the frameworks and libraries developed by Scala community have been built with superior testing support. Akka Streams is certainly not an exception and provides a dedicated test kit for developing comprehensive testing scenarios. To showcase that, let us get back to the sample flow which we have defined in the Flows section.

val flow: Flow[ByteString, Int, _] = Flow[ByteString]
  .via(Framing.delimiter(ByteString("\r\n"), 10, true))
  .map { _.utf8String.toInt }

It sounds like a good idea to have a test case which verifies that framing works exactly as we expected so let us create one.

class FlowsSpec extends TestKit(ActorSystem("reactive-streams-test")) 
    with SpecificationLike with FutureMatchers with AfterAll {
  
  implicit val materializer = ActorMaterializer()
  def afterAll = system.terminate()

  "Stream" >> {
    "should return an expected value" >> {   
      val (publisher, subscriber) = TestSource.probe[ByteString]
        .via(flow)
        .toMat(TestSink.probe[Int])(Keep.both)
        .run()
        
      subscriber.request(2)
      publisher.sendNext(ByteString("20"))  
      publisher.sendNext(ByteString("0\r\n"))
      publisher.sendComplete()
      
      subscriber.expectNext() must be_==(200)
    }
  }
}

The classes like TestSource and TestSink (and many more) give a complete control over the stream processing so it is possible to test a very sophisticated pipelines. Also, Akka Streams does not mandate the testing framework to use, so the example we came up with is a typical Specs2 specification.

6. Conclusions

This section was just a shallow introduction into the world of reactive programming and reactive streams in particular. Despite the fact that we have talked about Akka Streams quite a lot, we have only touched a tiny part of it, just scratching the tip of the iceberg: a long list of features and intrinsic details stayed uncovered. Akka Streams official documentation is a great source of the comprehensive knowledge about the subject, full of examples. Please do not hesitate to go through it.

7. What’s next

In the next section of the tutorial we are going to talk about accessing relation databases from within your Scala applications. The topics of this section are going to be of great use as we will see the importance of the reactive streams in backing certain database access patterns.

The complete projects are available for download.

Andrey Redko

Andriy is a well-grounded software developer with more then 12 years of practical experience using Java/EE, C#/.NET, C++, Groovy, Ruby, functional programming (Scala), databases (MySQL, PostgreSQL, Oracle) and NoSQL solutions (MongoDB, Redis).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Abdelhamid Fouad Ghareeb
Abdelhamid Fouad Ghareeb
8 years ago

Can we have the source code related to this article ?

Andriy Redko
8 years ago

Hi Abdelhamid,

Yes, absolutely, the post will be updated with complete source code examples in the next couple of days.
My apologies for not including them in the first place.
Thank you.

Back to top button