Scala

Developing Modern Applications with Scala: Concurrency and parallelism with Akka

This article is part of our Academy Course titled Developing Modern Applications with Scala.

In this course, we provide a framework and toolset so that you can develop modern Scala applications. We cover a wide range of topics, from SBT build and reactive applications, to testing and database acceess. With our straightforward tutorials, you will be able to get your own projects up and running in minimum time. Check it out here!

It would be fair to say that the epoch of computers with single CPU (or just one core) has mostly become a forgotten history these days. Now, most of the devices, no matter how small are they, have been built using multi-core CPU architectures aimed to increase the overall computational power.

Such advances and innovations on hardware front forced us to rethink the approaches of how to develop and run software systems in order to effectively utilize all available resources.

1. Introduction

In this section of the tutorial we are going to talk about two core concepts which modern software systems are relying upon: Concurrency and parallelism. Although those are very close to each other, there is a slight but important distinction. Concurrency describes the process when multiple tasks could make a progress over time while parallelism describes the process when multiple tasks are being executed simultaneously.

2. Threads

Traditionally, most of concurrent programming models were built around threads, and Java was not an exception. A typical JVM application (run as a process) may spawn many threads in order to execute some work concurrently or, ideally, in parallel.

jvm proccess
Fig.1 Typical JVM process spawns a couple of threads

This model worked somewhat well for a while, but there is at least one fundamental flaw of thread-based concurrency models: state or resource sharing. In most cases, threads need to exchange share some data or utilize another shared resource(s) in order to accomplish their work. Without proper mechanisms in place, uncontrolled access to shared resources or modification of the shared state by multiple threads (known as race conditions) leads to data corruption and often causes application to crash.

synchronization
Fig.2 Using synchronization to access shared state

Proper usage of the synchronization primitives (locks, mutexes, semaphores, monitors, …) solves the problem of accessing shared state (or resource), however the price to pay is really high. Not only it complicates the programming models, non-deterministic nature of the multi-threaded execution flows makes the process of troubleshooting the issues very time-consuming and difficult. Moreover, the whole new class of the problems have arisen: lock contention, thread starvation, deadlocks,  livelocks, and more.

Along with that, threads and thread management consumes quite a lot of resources of the underlying operating system and  essentially, at some point creation of more threads will actually have a negative impact on the application performance and scalability.

3. Reactors and Event Loops

The flaws of thread-based concurrency models turned industry’s attention into search of alternatives which more adequately address the demands of modern software systems. Reactor pattern and event handling, the foundations of the modern asynchronous and non-blocking programming paradigms, are some of the emerging programming models to dial with concurrency at scale.

reactor
Fig. 3 The Reactor pattern (simplified)

Please notice that this is a very simplified visualization of one of the possible implementations of the Reactor pattern however it illustrates its key elements pretty well. In the core of Reactor is a single-threaded event loop. Under the hood, event handling may use one or more threads (often grouped in a pools) to do the actual work, however the purpose and usage of threads in this model are quite different and shielded from applications completely.

4. Actors and Messages

Message-passing, or to be more precise, asynchronous message-passing is another very interesting and powerful concurrency model which gained a lot of traction recently. Actor Model, originated back in 1973, is one of the best examples of asynchronous message-passing concurrency.

Actors are the core entities of Actor Model. Each actor has own message queue (called mailbox), single-threaded message handler and communicates with other actors only by means of asynchronous messages (not to omit the fact that actors may create another actors as well).

actor
Fig. 4 Actor in Akka

This is typical share-nothing architecture: actors may have own state but they never share anything with any other actor. Actors may live within same JVM process, or multiple JVM processes on the same physical node, or even be spread over the network, it does not really matter as far as they are able to reference each other and communicate over messages.

5. Meet Akka

We already met Akka toolkit when we talked about reactive applications and learnt about Akka Streams. However, going back into the history a little bit, it is worth to mention that Akka toolkit had started as an Actor Model implementation on JVM platform. Since then it had seen many releases (with most recent one being 2.4.10), gained a lot of additional features and capabilities but nonetheless actors are the bare bones of Akka even today.

ActorSystem is the entry point into Akka actors’ universe. It is a single place in the application to create and manage actors.

implicit val system = ActorSystem("akka-actors")

With that, we are ready to create new actors! In Akka, every actor should subclass (or mix with) Actor trait and implement receive function, for example:

import akka.actor.Actor

class SampleActor extends Actor {
  def receive = {
    ...
  }
}

But more often than usual, you would also include ActorLogging trait into the mix to have access to a dedicated logger using log reference, for example:

import akka.actor.Actor
import akka.actor.ActorLogging

class SampleActor extends Actor with ActorLogging {
  def receive = {
    case _ => log.info("Received some message!")
  }
}

Although our SampleActor does not do much at the moment, we could instantiate it and send literally any message to it. As we already mentioned, actors are created only through ActorSystem instance rather than using new operator, for example:

val sampleActor = system.actorOf(Props[SampleActor], "sample-actor")
sampleActor ! "Message!"

If you think that sampleActor variable is the instance of SampleActor class, you are not quite right. It is actually a reference to SampleActor actor, represented as ActorRef class. This is the only mechanism to address particular actor in Akka, direct access to underlying actor class instances is not available.

What is going to happen when we run the application? Not much except the fact that we should see something like that in the console:

[INFO] [akka-actors-akka.actor.default-dispatcher-2] [akka://akka-actors/user/sample-actor] Received some message!

6. Supervision

Interesting but very important property of actors in Akka is that they are organized in hierarchies. Naturally, actors may spawn child actors in order to split the work in smaller pieces and as such, form a parent/child hierarchy.

It sounds like a minor detail but it is not because in Akka parent actors may watch their children, the process known as supervision. In this case, parent actors essentially become the supervisors and may apply different strategies in case when child actors encounter failures (or to be more specific, terminate with an exception).

Official documentation discusses defaults and different supervision strategies in great details but let us have a look at quick example. Our ChildActor is defined in such a way that always throws an exception upon receiving any Message.

class ChildActor extends Actor with ActorLogging {
  def receive = {
    case Message(m) => 
      throw new IllegalStateException("Something unexpected happened")
  }
}

Consequently, our ParentActor actor creates the instance of ChildActor actor and forwards any message it receives to ChildActor instance.

class ParentActor extends Actor with ActorLogging {
  val child = context.actorOf(Props[ChildActor])
  
  override val supervisorStrategy = OneForOneStrategy() {
    case _: IllegalStateException => Resume
    case _: Exception => Escalate
  }

  def receive = {
    case _ => child ! Message("Message from parent")
  }
}

According to default supervision strategy, the actor which raises an Exception is going to be restarted (which is probably the desired behavior in most cases). In our example however, we overwrote this policy (using supervisorStrategy property of the ParentActor) to resume the normal message processing of the supervising actors (ChildActor) in case of IllegalStateException only.

7. Patterns

In its basic form, actors in Akka communicate via asynchronous, one-way messages. It certainly works, however many real-world scenarios require more complex interactions, for example using request/reply communication, circuit breakers, piping messages between actors and others. Luckily, akka.pattern package provides a set of commonly used Akka patterns, ready to be applied. For example, let us change a little bit the SampleActor implementation to handle the messages of type Message and, once received, reply with MessageReply.

case class Message(message: String)
case class MessageReply(reply: String)

class SampleActor extends Actor with ActorLogging {
  def receive = {
    case Message(m) => sender ! MessageReply(s"Reply: $m")
  }
}

Now, we can employ the ask pattern in order to send the message to the actor and wait for reply as well, for example:

import akka.pattern.ask
import akka.util.Timeout

implicit val timeout: Timeout = 1 second
val reply = (sampleActor ? Message("Please reply!")).mapTo[MessageReply]

In this case, the sender sends a message to an actor and waits for the reply back (with 1 second timeout). Please take a note that typical Akka actors do not support any type safety semantics regarding messages: anything could be sent out as well as received as a response. It becomes the responsibility of the sender to make a proper type casting (for example, using mapTo method). Similarly, if the sender sends the message to an actor which it does not know how to handle, the message ends up in dead letters.


 

8. Typed Actors

As it was mentioned before, actors in Akka do not offer any type safety regarding messages they accept or reply with. But for quite some time now Akka includes an experimental support of Typed Actors, where the contacts are explicit and are going to be enforced by compiler.

Definition of the Typed Actors is very different from the regular Akka actors and resembles a lot the way we used to build RPC-style systems. First of all, we have to start by defining the interface and its implementation, for example:

trait Typed {
  def send(message: String): Future[String]
}

class SampleTypedActor extends Typed {
  def send(message: String): Future[String] = Future.successful("Reply: " + message)
}

In turn, the way Typed Actors are instantiated requires a bit more code, although still using ActorSystem under the hood.

implicit val system = ActorSystem("typed-akka-actors")
  
val sampleTypedActor: Typed = 
  TypedActor(system).typedActorOf(TypedProps[SampleTypedActor]())
  
val reply = sampleTypedActor
  .send("Hello Typed Actor!")
  .andThen { case Success(r) => println(r) }

At this moment the logical question may hit your mind: shouldn’t Typed Actors be used everywhere? Good point, but the short answer is: no, probably not. If you are curious, please take some time to go over this great discussion about pros and cons of using Typed Actors versus regular, untyped ones.

9. Scheduler

Beyond providing superior implementation of Actor Model, Akka offers quite a few very helpful utilities around as well. One of them is scheduling support which provides the capability to send a message to a particular actor periodically or at some point in time.

implicit val system = ActorSystem("akka-utilities")
import system.dispatcher
  
val sampleActor = system.actorOf(Props[SampleActor], "sample-actor")  
system.scheduler.schedule(0 seconds, 100 milliseconds, sampleActor, "Wake up!")

Needless to say, the ability to schedule some task executions is a requirement for most of the real-world applications so it is quite handy to have this feature available out of the box.

10. Event Bus

Another very useful utility which Akka contains generic event bus concept and its particular implementation provided by ActorSystem in a form of event stream.

If the actor-to-actor communication assumes that sender of the message somehow knows who the recipient is, with event stream, actors have an option to broadcast any events (messages of some type) to any other actor, without any prior knowledge who will receive it. In this case, the involved parties have to express their interest by subscribing to such events by their type.

For example, assume we have an important message which we simply name Event.

case class Event(id: Int)

Our SampleEventActor is designated to handle this kind of messages and prints out on the console the id of the message every time it receives one.

class SampleEventActor extends Actor with ActorLogging {
  def receive = {
    case Event(id) => log.info(s"Event with '$id' received")
  }
}

It looks easy, but nothing really exciting to the moment. Now, let us take a look at the event stream and publish/subscribe communication pattern in action.

implicit val system = ActorSystem("akka-utilities")
  
val sampleEventActor = system.actorOf(Props[SampleEventActor])  
system.eventStream.subscribe(sampleEventActor, classOf[Event])
  
system.eventStream.publish(Event(1))
system.eventStream.publish(Event(2))
system.eventStream.publish(Event(3))

Our sampleEventActor expresses its interest in receiving messages of type Event by calling system.eventStream.subscribe() method. Now, every time Event is going to be published by means of system.eventStream.publish() invocation, the sampleEventActor is going to receive it, no matter who the publisher was. With logging turned on, we are going to see something like that in the console output:

[INFO] [akka-utilities-akka.actor.default-dispatcher-2] [akka://akka-utilities/user/$a] Event with '1' received
[INFO] [akka-utilities-akka.actor.default-dispatcher-2] [akka://akka-utilities/user/$a] Event with '2' received
[INFO] [akka-utilities-akka.actor.default-dispatcher-2] [akka://akka-utilities/user/$a] Event with '3' received

11. Remoting

All the examples we have seen so far dealt with just one ActorSystem, running in a single JVM within one node. But Akka’s networking extensions support multi-JVM / multi-node deployments so different ActorSystems may communicate with each other in a truly distributed environment.

To enable ActorSystem‘s remote capabilities we would need to change its default actor reference provider and enable network transport. All that is easy to accomplish using application.conf configuration file:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty {
	    tcp {
	      hostname = "localhost"
	      port = ${port}
	    }
	}
  }
}

As an exercise, we are going to run two ActorSystem instances, one with name akka-remote-1 on port 12000, and another one, akka-remote-2 on port 12001. We also are going to define one actor to communicate with, SampleRemoteActor.

class SampleRemoteActor extends Actor with ActorLogging {
  def receive = {
    case m: Any => log.info(s"Received: $m")
  }
}

On the first ActorSystem, akka-remote-1, we are going to create an instance of the SampleRemoteActor and send one message to it.

implicit val system = ActorSystem("akka-remote-1")
  
val sampleActor = system.actorOf(Props[SampleRemoteActor], "sample-actor")
sampleActor ! "Message from Actor System #1!"

However on the second one, akka-remote-2, we are going to send a message to the SampleRemoteActor instance using its remote reference, which among other things includes ActorSystem name (akka-remote-1), host (localhost), port (12000) and given actor name (which in our case is sample-actor):

implicit val system = ActorSystem("akka-remote-2")
  
val sampleActor = system.actorSelection(
  "akka.tcp://akka-remote-1@localhost:12000/user/sample-actor")
sampleActor ! "Message from Actor System #2!"

Quite straightforward, isn’t it? Running both ActorSystem instances side by side will produce the following output in the console of akka-remote-1 JVM process:

[INFO] [main] [akka.remote.Remoting] Starting remoting
[INFO] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://akka-remote-1@localhost:12000]
[INFO] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://akka-remote-1@localhost:12000]
[INFO] [akka-remote-1-akka.actor.default-dispatcher-2] [akka.tcp://akka-remote-1@localhost:12000/user/sample-actor] Received: Message from Actor System #1!
[INFO] [akka-remote-1-akka.actor.default-dispatcher-4] [akka.tcp://akka-remote-1@localhost:12000/user/sample-actor] Received: Message from Actor System #2!

Adding to what we have seen so far, not only one actor system may reference the actors from another actor systems, it can also create new actor instances remotely.

12. Testing

Akka includes a superior testing support to ensure that every single aspect of actors’ behavior and interactions could be covered. In fact, Akka TestKit provides appropriate scaffolding for writing traditional unit tests as well as integration tests.

Unit testing techniques revolve around TestActorRef which is a simplification around regular ActorRef implementation, with no concurrency involved and access to actor state internals. Let us start with this one and come up with a simple unit test for our SampleActor using already familiar to us specs2 framework.

class SampleActorTest extends Specification with AfterAll {
  implicit val timeout: Timeout = 1 second
  implicit lazy val system = ActorSystem("test")
  
  "Sample actor" >> {
    "should reply on message" >> { implicit ee: ExecutionEnv =>
      val actorRef = TestActorRef(new SampleActor)
      actorRef ? Message("Hello") must be_==(MessageReply("Reply: Hello")).await
    }
  }
  
  def afterAll() = {
    system.terminate()
  }
}

Unit testing is certainly a very good start but at the same time, it is often quite limited as it relies on a simplified view of the system. However, again thanks to Akka TestKit, there are more powerful testing techniques at our disposal.

class SampleActorIntegrationTest extends TestKit(ActorSystem("test")) 
    with ImplicitSender with WordSpecLike with BeforeAndAfterAll {
  
  "Sample actor" should {
    "should reply on message" in {
      val actorRef = system.actorOf(Props[SampleActor])
      actorRef ! Message("Hello")
      expectMsg(MessageReply("Reply: Hello"))
    }
    
    "should log an event" in {
      val actorRef = system.actorOf(Props[SampleActor])      
      EventFilter.info(
        message = "Event with '100' received", occurrences = 1) intercept {
        actorRef ! Event(100)
      }
    }
  }
  
  override def afterAll() = {
    shutdown()
  }
}

This time we have used ScalaTest framework perspective and took a slightly different approach relying on TestKit class which offers a rich set of assertions over message expectations. Not only we have an ability to spy on messages, we are also able to make assertions over expected log records using EventFilter class, backed by TestEventListener in the application.conf file.

akka {
  loggers = [
    akka.testkit.TestEventListener
  ]
}

Really nice, the test cases look simple, readable and maintainable. However, Akka testing capabilities do not stop here and are still evolving very fast. For example, it is worth to mention the availability of the experimental multi node testing support.

13. Conclusions

Akka is a terrific toolkit and serves as a solid foundation for many other libraries and frameworks. As Actor Model implementation, it yet offers another approach to concurrency and parallelism using asynchronous message passing and promoting immutability.

It is worth to mention that these days Akka is being actively developed and goes way beyond the Actor Model. With every new release it includes more and more tools (stable or/and experimental) for building highly concurrent and distributed systems. Many of its advanced features like clustering, persistence, finite state machines, routing, … we have not touched upon at all but official Akka documentation is the best place to get familiarized with all of them.

14. What’s next

In the next section of the tutorial we are going to talk about Play! Framework: powerful, highly productive and feature-rich framework for building scalable, full-fledged web applications in Scala.

Andrey Redko

Andriy is a well-grounded software developer with more then 12 years of practical experience using Java/EE, C#/.NET, C++, Groovy, Ruby, functional programming (Scala), databases (MySQL, PostgreSQL, Oracle) and NoSQL solutions (MongoDB, Redis).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button