Scala

Fault-tolerance primitives in Scala: links and traps

Over the last decade Actor Model is actively marketed as a simple, effective and save concept for building concurrency. Being popularized by Erlang in early nineties, and used as a primary concurrency construct in Scala, Actors offer share-nothing event-driven model, where creating and maintaining a single actor is cheap (so that you can keep running millions of them), and remote actors working at distributed nodes look and feel as the local ones.

However, being used unwisely, actors may bring even more pain and boilerplate, than thread-based concurrency or STM. Lack of the in-depth documentation on actor-based modeling leads to misconceptions and myths that can be broken working through best-practices and design guidelines of building fault-tolerant systems in Erlang.

Fault-tolerance primitives

Defensive programing forces you to put a huge effort to protect the system against all kinds of unexpected behavior you may think of, checking inputs and current state whenever possible, and defining ways to escalate the problem on a higher level. Being a defensive programmer you should always be proactive and look for a new potential problems to handle, because even the smallest one causing a failure in insignificant component may lead to a huge system crash.

Fault-tolerant systems as apposed don’t tend to predict all possible failure reasons (to prevent a single component from crash), but rather isolate the component from the rest of the system and restart it, keeping the system work consistently. If restart of the component doesn’t help, the problem may be propagated to the higher level, in order to [possibly] restart the parties that exchange messages with the component.

As Joe Armstrong says on his book “Programming Erlang”, when one actor dies in the room full of actors, others should probably notice that and start fixing the problem (cleaning out the bodies).

In Erlang and Scala this behavior is achieved by linking actors. In the most basic form, when two actors are linked, and one of them dies, it sends an exit signal to another actor to terminate it too.

Actors linking is bidirectional operation, so, when you link actor A to B, B is on the background linked to A, and death of any of them cause sending an exit signal to another. In Erlang it’s possible to create unidirectional linking with monitors (when monitored process dies, ‘DOWN’ message is sent to the handler). There’s no analogue to the monitors in Scala standard library, however implementing it on demand would be easy.

Linked actor can create an exit trap, so that exit signal will be processed as a normal message, not causing actor termination. Exit signal normally contains the reference to the failed actor (that can be used to restart it), and failure reason.

In “Programming Erlang” several link/trap exit scenarios are represented with a simple example:

 -module(edemo1).
 -export([start2]).
 
 start(Bool, M) ->
 
    A = spawn(fun() -> a() end),
    B = spawn(fun() -> b(A, Bool) end),
    C = spawn(fun() -> c(B, M) end),
 
    sleep(1000),
 
    status(b, B),
    status(c, C).

In the code above three processes are created, synchronized with sleep, in order to give them time to process the passed messages (ugly in general, but works for a simple example), and afterwards, their state is checked. Actors are defined as follows:

 a() ->
    process_flag(trap_exit, true),
    wait(a).
 
 b(A, Bool) ->
    process_flag(trap_exit, Bool),
    link(A),
    wait(b).
 
 c(B, M) ->
    link(B),
    case M of
       {die, Reason} ->
          exit(Reason);
       {divide, N} ->
          1N,
          wait(c);
       normal ->
          true
 end.

Process A always traps exits, process B is linked to A and traps exits depending on the function input, process C linked to B receives messages and either makes computation or fails.

 wait(Prog) ->
    receive
    Any ->
       io:format('Process ~p received ~p~n' ,[Prog, Any]),
       wait(Prog)
 end.

Method wait recursively receives messages printing out the message content.

Being translated into Scala, using standard Actors library, the example looks as follows:

 object LinkedActors {
 
  case class Die(reason:AnyRef)
  case class Divide(number: Int)
 
  def status(name: String, actor: Actor) = println('Actor %s is %s' 
 format(name, actor.getState))
  def printMessage(actorName: String):PartialFunction[Any, Unit] = 
 {case msg => println('Actor %s received %s' format(actorName, msg))}
 
  def start(isTrapExit: Boolean, message: Any) = {
 
     val A = actor{
      self.trapExit = true
      loop(react(printMessage('a')))
     }
 
     val B = actor{
      self.trapExit = isTrapExit
      self link A
      loop(react(printMessage('b')))
     }
    
     val C = actor{
      self link B
      loop{
        react{
          case Die(reason) => exit(reason)
          case Divide(number) => 1number
        }
      }
     }
 
     C! message
    
     Thread.sleep(1000)
 
     status('b', B)
     status('c', C)
  }
 
 }

Essentially, code is the same with the difference that messages accepted by actor C are classified with case classes and receive behavior of actors B and C is represented with partial functions.

Let’s pass some inputs to the start method in order to see, how will the chained actors behave, when some of them die:

 scala> start(false, Die('abc'))
 Actor a received Exit(scala.actors.Actor$$anon$1@dc8f6d,abc)
 Actor b is Terminated
 Actor c is Terminated

Actor C receives message Die and exists with the reason “abc”. Actor B linked to C doesn’t trap exits, therefore it terminates too. As long as A connected to B traps exits, when actor B terminates it sends A a message with the reason why it has failed (case class with the following signature):

 ** An `Exit` message (an instance of this class) is sent to an actor
 *  with `trapExit` set to `true` whenever one of its linked actors
 *  terminates.
 *
 *  @param from   the actor that terminated
 *  @param reason the reason that caused the actor to terminate
 *
 case class Exit(from: AbstractActor, reason: AnyRef)

In the same time, when exit is expected (not caused with computational exception), linked actors are untouched:

 scala> start(false, Die('normal))
 Actor b is Suspended
 Actor c is Terminated

In the snippet below, unhandled division by zero exception causes C and B to die:

 scala> start(false, Divide(0))
 Actor a received Exit(scala.actors.Actor$$anon$1@113eb9c,UncaughtException
 (scala.actors.Actor$$anon$1@1a1446d,Some(Divide(0)),Some(scala.act
 ors.ActorProxy@14f83d1),java.lang.ArithmeticException:  by zero))
 Actor b is Terminated
 Actor c is Terminated

If we force B to trap exit, the actor stays alive in all the scenarios described above:

 scala> start(true, Die('abc'))
 Actor b received Exit(scala.actors.Actor$$anon$1@13e49a8,abc)
 Actor b is Suspended
 Actor c is Terminated

Compared to the first snippet, now B receives exit message from C.

Unhandled errors are also not propagated to A:

 scala> start(true, Divide(0))
 Actor b received Exit(scala.actors.Actor$$anon$1@119f779,UncaughtException
 (scala.actors.Actor$$anon$1@119f779,Some(Divide(0)),Some(scala.act
 ors.ActorProxy@14f83d1),java.lang.ArithmeticException:  by zero))
 Actor b is Suspended
 Actor c is Terminated

Basic Supervisor

Exit message contains the reference to the failed actor that can be used to restart it. It’s possible to implement a very simple actor supervisor, by analogy with supervisor behavior in Erlang.

 case class ChildSpecification(worker: Worker, restartBehavior: Child
 RestartBehavior.Value = permanent)
 case class OneForOne(maxRestarts: Long = 3, maxTime: Long = 3000) 
 extends RestartStrategy
 case class OneForAll(maxRestarts: Long = 3, maxTime: Long = 3000) 
 extends RestartStrategy
 
 class Supervisor(supervisorId: String, strategy: RestartStrategy, 
 childSpecs: List[ChildSpecification]) extends Worker {
 
 ...
 
  override def act = {
     self.trapExit = true
     linkWorkers
     loop {
      react{
        case Exit(worker: Worker, reason) =>
          println('Worker [%s] has failed due to [%s]' format(worker.id, 
 reason))
          if(worker.restartsInPeriod(strategy.maxTime) >= strategy
 .maxRestarts) exit('Maximum restart intensity for %s is reached!' format(worker.id))
          strategy match {
            case str:OneForOne => restartWorker(worker)
            case str:OneForAll => childSpecs.foreach{spec => 
 restartWorker(spec.worker)}
          }
        case Terminate(reason) => println('Supervisor terminated with 
 reason [%s]' format(reason))
          exit(reason)
      }
     }
  }
 
 ...
 }

Supervisor itself is a normal Scala Actor that traps messages from the other actors linked to it (workers, in terms of supervision terminology), and restarts either only one failed actor or all supervised actors. When restart frequency reaches the limit specified by the restart strategy, supervisor terminates, so that supervisor at the higher hierarchy position may try to handle the problem.

In the simplest scenario, supervisor restarts the actor terminated due to uncaught exception:

   'Actor terminated due to uncaught exception is restarted by the supervisor' in {
     val worker = new SimpleWorker('simple_worker')
     Supervisor('basic_supervisor', OneForOne(),
               List(ChildSpecification(worker))).start
     worker !? (1000, Divide(0))
     (worker !? (1000, Divide(1))).asInstanceOf[Option[Int]] must be equalTo Some(1)
  }

Output of the specification is:

 Starting worker simple_worker
 Worker [simple_worker] has failed due to [UncaughtException(com.vasilrem.linked.
 SupervisorSpec$SimpleWorker@fd54ec,Some(Divide(0)),Some(scal
 a.actors.Channel@16167ab),java.lang.ArithmeticException:  by zero)]
 Restarting worker [simple_worker]...
 [info]   + Actor terminated due to uncaught exception is restarted by the supervisor

In a more complex scenario, when supervisors are linked in a tree, high-level supervisor restarts low-level supervisor, when it dies, that causes restart of the workers linked to it:

   'High-level supervisor restarts low-level supervisor and the wrokers linked to it' in{
     val worker = new SimpleWorker('simple_worker')
     val lowLevel = Supervisor('lowlevel_supervisor', OneForOne(),
                              List(ChildSpecification(worker)))
     val highLevel = Supervisor('lowlevel_supervisor', OneForOne(),
                               List(ChildSpecification(lowLevel))).start
     worker.getState must not be equalTo(State.Terminated)
     lowLevel ! Terminate('Kill lowlevel')
     Thread.sleep(1000)
     worker.getState must not be equalTo(State.Terminated)
  }

Test output is following:

 Starting worker lowlevel_supervisor
 Starting worker simple_worker
 Supervisor terminated with reason [Kill lowlevel]
 Worker [lowlevel_supervisor] has failed due to [Kill lowlevel]
 Restarting worker [lowlevel_supervisor]...
 Starting worker simple_worker
 [info]   + High-level supervisor restart low-level supervisor

You can find more specifications and code of the supervisor here.

Happy coding and don’t forget to share!

Reference: Fault-tolerance primitives in Scala: links and traps from our JCG partner Vasil Remeniuk at the Vasil Remeniuk blog blog.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button