Fault-tolerance primitives in Scala: links and traps
However, being used unwisely, actors may bring even more pain and boilerplate, than thread-based concurrency or STM. Lack of the in-depth documentation on actor-based modeling leads to misconceptions and myths that can be broken working through best-practices and design guidelines of building fault-tolerant systems in Erlang.
Defensive programing forces you to put a huge effort to protect the system against all kinds of unexpected behavior you may think of, checking inputs and current state whenever possible, and defining ways to escalate the problem on a higher level. Being a defensive programmer you should always be proactive and look for a new potential problems to handle, because even the smallest one causing a failure in insignificant component may lead to a huge system crash.
Fault-tolerant systems as apposed don’t tend to predict all possible failure reasons (to prevent a single component from crash), but rather isolate the component from the rest of the system and restart it, keeping the system work consistently. If restart of the component doesn’t help, the problem may be propagated to the higher level, in order to [possibly] restart the parties that exchange messages with the component.
As Joe Armstrong says on his book “Programming Erlang”, when one actor dies in the room full of actors, others should probably notice that and start fixing the problem (cleaning out the bodies).
In Erlang and Scala this behavior is achieved by linking actors. In the most basic form, when two actors are linked, and one of them dies, it sends an exit signal to another actor to terminate it too.
Actors linking is bidirectional operation, so, when you link actor A to B, B is on the background linked to A, and death of any of them cause sending an exit signal to another. In Erlang it’s possible to create unidirectional linking with monitors (when monitored process dies, ‘DOWN’ message is sent to the handler). There’s no analogue to the monitors in Scala standard library, however implementing it on demand would be easy.
Linked actor can create an exit trap, so that exit signal will be processed as a normal message, not causing actor termination. Exit signal normally contains the reference to the failed actor (that can be used to restart it), and failure reason.
In “Programming Erlang” several link/trap exit scenarios are represented with a simple example:
-module(edemo1). -export([start2]). start(Bool, M) -> A = spawn(fun() -> a() end), B = spawn(fun() -> b(A, Bool) end), C = spawn(fun() -> c(B, M) end), sleep(1000), status(b, B), status(c, C).
In the code above three processes are created, synchronized with sleep, in order to give them time to process the passed messages (ugly in general, but works for a simple example), and afterwards, their state is checked. Actors are defined as follows:
a() -> process_flag(trap_exit, true), wait(a). b(A, Bool) -> process_flag(trap_exit, Bool), link(A), wait(b). c(B, M) -> link(B), case M of {die, Reason} -> exit(Reason); {divide, N} -> 1N, wait(c); normal -> true end.
Process A always traps exits, process B is linked to A and traps exits depending on the function input, process C linked to B receives messages and either makes computation or fails.
wait(Prog) -> receive Any -> io:format('Process ~p received ~p~n' ,[Prog, Any]), wait(Prog) end.
Method wait recursively receives messages printing out the message content.
Being translated into Scala, using standard Actors library, the example looks as follows:
object LinkedActors { case class Die(reason:AnyRef) case class Divide(number: Int) def status(name: String, actor: Actor) = println('Actor %s is %s' format(name, actor.getState)) def printMessage(actorName: String):PartialFunction[Any, Unit] = {case msg => println('Actor %s received %s' format(actorName, msg))} def start(isTrapExit: Boolean, message: Any) = { val A = actor{ self.trapExit = true loop(react(printMessage('a'))) } val B = actor{ self.trapExit = isTrapExit self link A loop(react(printMessage('b'))) } val C = actor{ self link B loop{ react{ case Die(reason) => exit(reason) case Divide(number) => 1number } } } C! message Thread.sleep(1000) status('b', B) status('c', C) } }
Essentially, code is the same with the difference that messages accepted by actor C are classified with case classes and receive behavior of actors B and C is represented with partial functions.
Let’s pass some inputs to the start method in order to see, how will the chained actors behave, when some of them die:
scala> start(false, Die('abc')) Actor a received Exit(scala.actors.Actor$$anon$1@dc8f6d,abc) Actor b is Terminated Actor c is Terminated
Actor C receives message Die and exists with the reason “abc”. Actor B linked to C doesn’t trap exits, therefore it terminates too. As long as A connected to B traps exits, when actor B terminates it sends A a message with the reason why it has failed (case class with the following signature):
** An `Exit` message (an instance of this class) is sent to an actor * with `trapExit` set to `true` whenever one of its linked actors * terminates. * * @param from the actor that terminated * @param reason the reason that caused the actor to terminate * case class Exit(from: AbstractActor, reason: AnyRef)
In the same time, when exit is expected (not caused with computational exception), linked actors are untouched:
scala> start(false, Die('normal)) Actor b is Suspended Actor c is Terminated
In the snippet below, unhandled division by zero exception causes C and B to die:
scala> start(false, Divide(0)) Actor a received Exit(scala.actors.Actor$$anon$1@113eb9c,UncaughtException (scala.actors.Actor$$anon$1@1a1446d,Some(Divide(0)),Some(scala.act ors.ActorProxy@14f83d1),java.lang.ArithmeticException: by zero)) Actor b is Terminated Actor c is Terminated
If we force B to trap exit, the actor stays alive in all the scenarios described above:
scala> start(true, Die('abc')) Actor b received Exit(scala.actors.Actor$$anon$1@13e49a8,abc) Actor b is Suspended Actor c is Terminated
Compared to the first snippet, now B receives exit message from C.
Unhandled errors are also not propagated to A:
scala> start(true, Divide(0)) Actor b received Exit(scala.actors.Actor$$anon$1@119f779,UncaughtException (scala.actors.Actor$$anon$1@119f779,Some(Divide(0)),Some(scala.act ors.ActorProxy@14f83d1),java.lang.ArithmeticException: by zero)) Actor b is Suspended Actor c is Terminated
Exit message contains the reference to the failed actor that can be used to restart it. It’s possible to implement a very simple actor supervisor, by analogy with supervisor behavior in Erlang.
case class ChildSpecification(worker: Worker, restartBehavior: Child RestartBehavior.Value = permanent) case class OneForOne(maxRestarts: Long = 3, maxTime: Long = 3000) extends RestartStrategy case class OneForAll(maxRestarts: Long = 3, maxTime: Long = 3000) extends RestartStrategy class Supervisor(supervisorId: String, strategy: RestartStrategy, childSpecs: List[ChildSpecification]) extends Worker { ... override def act = { self.trapExit = true linkWorkers loop { react{ case Exit(worker: Worker, reason) => println('Worker [%s] has failed due to [%s]' format(worker.id, reason)) if(worker.restartsInPeriod(strategy.maxTime) >= strategy .maxRestarts) exit('Maximum restart intensity for %s is reached!' format(worker.id)) strategy match { case str:OneForOne => restartWorker(worker) case str:OneForAll => childSpecs.foreach{spec => restartWorker(spec.worker)} } case Terminate(reason) => println('Supervisor terminated with reason [%s]' format(reason)) exit(reason) } } } ... }
Supervisor itself is a normal Scala Actor that traps messages from the other actors linked to it (workers, in terms of supervision terminology), and restarts either only one failed actor or all supervised actors. When restart frequency reaches the limit specified by the restart strategy, supervisor terminates, so that supervisor at the higher hierarchy position may try to handle the problem.
In the simplest scenario, supervisor restarts the actor terminated due to uncaught exception:
'Actor terminated due to uncaught exception is restarted by the supervisor' in { val worker = new SimpleWorker('simple_worker') Supervisor('basic_supervisor', OneForOne(), List(ChildSpecification(worker))).start worker !? (1000, Divide(0)) (worker !? (1000, Divide(1))).asInstanceOf[Option[Int]] must be equalTo Some(1) }
Output of the specification is:
Starting worker simple_worker Worker [simple_worker] has failed due to [UncaughtException(com.vasilrem.linked. SupervisorSpec$SimpleWorker@fd54ec,Some(Divide(0)),Some(scal a.actors.Channel@16167ab),java.lang.ArithmeticException: by zero)] Restarting worker [simple_worker]... [info] + Actor terminated due to uncaught exception is restarted by the supervisor
In a more complex scenario, when supervisors are linked in a tree, high-level supervisor restarts low-level supervisor, when it dies, that causes restart of the workers linked to it:
'High-level supervisor restarts low-level supervisor and the wrokers linked to it' in{ val worker = new SimpleWorker('simple_worker') val lowLevel = Supervisor('lowlevel_supervisor', OneForOne(), List(ChildSpecification(worker))) val highLevel = Supervisor('lowlevel_supervisor', OneForOne(), List(ChildSpecification(lowLevel))).start worker.getState must not be equalTo(State.Terminated) lowLevel ! Terminate('Kill lowlevel') Thread.sleep(1000) worker.getState must not be equalTo(State.Terminated) }
Test output is following:
Starting worker lowlevel_supervisor Starting worker simple_worker Supervisor terminated with reason [Kill lowlevel] Worker [lowlevel_supervisor] has failed due to [Kill lowlevel] Restarting worker [lowlevel_supervisor]... Starting worker simple_worker [info] + High-level supervisor restart low-level supervisor
You can find more specifications and code of the supervisor here.
Happy coding and don’t forget to share!
Reference: Fault-tolerance primitives in Scala: links and traps from our JCG partner Vasil Remeniuk at the Vasil Remeniuk blog blog.