Even simpler scalability with Akka through RegistryActor
Ideally, when you add a new node to existing infrastructure you neither have to create proxies to remote actors on the original node manually, nor change configuration of any node to let it know that an additional element was added. In other words, when any node is added to the system, all other nodes should know automatically, what actors does the new node have, and use them as if they were standard local actors.
Akka’s Actor Registry
Two things you find yourself [re-]implementing, when you’re tied to standard Scala Actors (some production environments are still have to use Java 5, which is unfortunately unsupported by Akka) and making a bet on concurrency through actors, are Supervision Tree (from Erlang, related to Supervisors in Akka) and Registry of Actors.
Though Actor Registry in Akka on the background has a fairly concise implementation (a kind of a smart singleton wrapper around concurrent HashMaps that keeps references to all the actors running on the node) it’s a powerful abstraction that’s hard to survive without, when you’re using Actors in the real-world. E.g. registry significantly simplifies building of load balancers, as long as you no longer should specify explicitly the workers to share the load, but rather the balancer itself looks up for the actors by type or ID on the start-up or during the lifetime in the registry.
The only thing that Akka actor registry lacks as of now is the interface to access it remotely. Adding such an interface makes solving the problem stated above a no-brainer.
Registry Actor
Living in a world of Actors, the first idea you have, when you need to create a remote interface to something, is to create an actor accessible remotely (aka RemoteActor). To a first approximation, there should be an actor that handles messages with the links to the actors on remote nodes, creating proxies and registering them in the local actor registry:
class RegistryActor extends Actor{ ... def defaultMessageHandler: PartialFunction[Any, Unit] = { case RegisterActor(actor) => log.debug("Registering remote actor [%s]" format(actor)) if(!isActorInRegistry(actor.uuid) && !isLinkToLocal(actor)) ActorRegistry.register( // Hack for 0.10, 1.0-M1 RemoteClient.actorFor(actor.uuid.toString, actor.className, actor.hostname, actor.port) ) // RemoteActorRefs will register themselves in 1.0-M1+ case UnregisterActor(actor) => { log.debug("Unregistering remote actor [%s]" format(actor)) ActorRegistry.foreach{act => if(act.uuid == actor.uuid){ ActorRegistry.unregister(act) }} Option(linkedRegistries.get(RegistryLink(actor.hostname, actor.port))) match{ case Some(_) => removeLinkToRegistry(RegistryLink(actor.hostname, actor.port)) case None => log.debug("[%s] is not a registry link" format(actor.uuid)) } } ... } ... }
As a prerequisite for future extension, there should also be a map of references to the actor registries running on other nodes (and the way to add and exchange links to registries in runtime). When a registry actor receives and resolves new reference to another registry actor, it sends back the link to self, and all other known registries (so that both registry actors have the same consistent sets of links):
/** * RegistryActors located on the other hosts */ protected[easyscale] val linkedRegistries = new ConcurrentHashMap[RegistryLink, ActorRef]() def defaultMessageHandler: PartialFunction[Any, Unit] = { ... case AddRegistryLink(link) => if(!linkedRegistries.containsKey(link)) addRegistryLink(link) else log.debug("Link to registry [%s] is already present" format(link)) case RemoveRegistryLink(link) => log.debug("Unlinking from registry [%s]" format(link)) linkedRegistries.remove(link) }
Registering Actors on startup
The second step towards resolving the problem is automatically registering local actors at the remote registry, when when the link to it is added:
/** * Publishes all local actors as remote references to the linked registry * when the registry link is added */ trait StartupActorRefsDistribution extends RegistryActor{ /** * Adds link to remote registry, and register all local actors at there */ protected override def addRegistryLink(link: RegistryLink) = { super.addRegistryLink(link) registerActorsAt(linkedRegistries.get(link)) } /** * Registers all local actors at the remote node */ private def registerActorsAt(remoteRegistry: ActorRef) = { ActorRegistry.filter(actor => actor.id == REGISTRY_ACTOR && isActorLocal(actor)) .foreach{actor => remoteRegistry ! RegisterActor(actor)} remoteRegistry } }
Every new node should initially know about at least one node running on the cluster (neighboring node):
neighbour { # One of the hosts in the group that has a started RegistryActor hostname = "localhost" port = 9999 }
Thus, when a new actor registry lets know the “neighbor” about itself, it starts a chain reaction of all other actor registries populating references of their local actors to the new registry and vice versa, so that all the registries at the end are aware of all the actors running in the cluster (and accessing them either through local interface or though a proxy (RemoteActorRef)).
Registering Actors started during the life-time
Akka’s ActorRegistry has a simple notification mechanism that allows to handle events raised when an actor is registered/unregistered from the system (by default, all actors (except for RemoteActorRefs, in Akka 1.0-M1) register themselves in the registry on start/shutdown). It can be used to populate links to the new actors across the system:
/** * Publishes all registered local actor as a remote ref on all * linked remote registries */ trait InlifeActorRefsDistribution extends RegistryActor{ override def specificMessageHandler = { case ActorRegistered(actor) => log.debug("Actor [%s] is registered" format(actor)) registerOnLinks(actor) case ActorUnregistered(actor) => log.debug("Actor [%s] is unregistered" format(actor)) if(isActorLocal(actor)) actor.id match { case REGISTRY_ACTOR => ActorRegistry.foreach(act => if(act.getClass.isAssignableFrom(classOf[LocalActorRef])) unregisterOnLinks(act)) case _ => unregisterOnLinks(actor) } } /** * Makes the actor remote, and registers at remote nodes */ private def registerOnLinks(actor: ActorRef) = if(isActorLocal(actor)){ ... val iterator = linkedRegistries.values.iterator while(iterator.hasNext) iterator.next ! RegisterActor(actor) } ... }
Using Actor Registry
Now when we automatically get references to all the actors in the cluster, we can create a balancer that will distribute messages across actors of the same type:
class SimpleTypedBalancer[T](implicit manifest: Manifest[T]) extends Actor{ def receive = { case message :AnyRef => forwardMessage(message, self.getSenderFuture orElse self.getSender, idleWorkerId) } def idleWorkerId = Futures.awaitOne{ ActorRegistry.filter{actor => Class.forName(actor.actorClassName).isAssignableFrom(manifest.erasure) }.map(_ !!! IsReady()).toList }.result.flatMap(_ match { case Ready(actorUuid) => Option(actorUuid) case _ => None }) def forwardMessage(message: AnyRef, originalSender: Option[AnyRef], workerId: Option[Uuid]) = { for{id <- workerId; worker <- ActorRegistry.actors.find(actor => actor.id == id.toString || actor.uuid == id )}{ if(originalSender.isDefined) worker.forward(message) else worker.sendOneWay(message) } } }
Problem Solved
Assume there’s a node with 3 actors of type `SimpleActor` running:
RemoteNode.start log.info("Starting registry actor at %s:%s" format(RemoteServer.HOSTNAME, RemoteServer.PORT)) val registryActor = actorOf(new RegistryActor with StartupActorRefsDistribution with InlifeActorRefsDistribution).start RegistryActorUtil.initialize (1 to 3).foreach(_ => actorOf[SimpleActor].start)
This node knows nothing about the infrastructure of cluster in future, and at the moment it only runs remote API to the registry – RegistryActor. Say, we want to use actors (3 instances of `SimpleActor`) running on the node #1, to share the load on `SimpleActor` actors running on the node #2. Node #2 has the same definition as node #1 (for the only difference that node#1 is explicitly configured as a neighboring host).
Let’s see, if the messages sent to the balancer are distributed between local and remote actors:
doBeforeSpec{ (1 to 3).foreach(_ => actorOf[SimpleActor].start) actorOf(new SimpleTypedBalancer[SimpleActor]).start } "Messages sent to the balancer should be distributed across local and remote workers" in { val balancer = ActorRegistry.filter(actor => Class.forName(actor.actorClassName) .isAssignableFrom(classOf[SimpleTypedBalancer[SimpleActor]]) ).head log.info("========SENDING MESSAGES TO BALANCER=========") val start = System.currentTimeMillis val futures = (1 to 30).map(i => balancer !!! "" + i).toList log.info("All messages are disaptched...") Futures.awaitAll(futures) val processedByWorkers = futures.flatMap(future => future.result).toSet.size log.info("Process time by %s workers: %s" format(processedByWorkers, System.currentTimeMillis - start)) processedByWorkers must beGreaterThan(3) }
The test runs fine. which means that more than 3 actors running locally were involved, and therefore, remote actors registered locally were used:
[INFO] [2010-11-23 23:23:18,687] [Thread-18] s.s.a.a.Actor$: Adding RegistryActor as listener to local actor registry [INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Making RegistryActor remote... [INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Adding link to a neighbouring host... [INFO] [2010-11-23 23:23:18,859] [Thread-18] s.s.a.a.Actor$: ========SENDING MESSAGES TO BALANCER========= [INFO] [2010-11-23 23:23:18,890] [Thread-18] s.s.a.a.Actor$: All messages are disaptched... [INFO] [2010-11-23 23:23:19,656] [akka:event-driven:dispatcher:global-1] s.s.a.r.RemoteClient: Starting remote client co nnection to [localhost:9999] [INFO] [2010-11-23 23:23:24,906] [Thread-18] s.s.a.a.Actor$: Process time by 6 workers: 6031 [INFO] [2010-11-23 23:23:24,968] [Thread-18] s.s.a.a.Actor$: ====SHUTTING DOWN ACTOR REGISTRY==== [info] + Messages sent to the balancer should be distributed across local and remote workers [info] == com.vasilrem.akka.easyscale.behavior.RemoteTypedBalancerSpec ==
Reinventing the wheel
As it was once mentioned in the Akka mail lists, one day ActorRegistry will have remote interface out-of-box. Until that time, you’ll have to end up with your own solution, or use experimental support of JCluster that generally targets the same problem, but uses a different approach.
The code of the RegistryActor is available at GitHub. It will change over time, when I’ll be using it in production.
Reference: Even simpler scalability with Akka through RegistryActor from our JCG partner Vasil Remeniuk at the Vasil Remeniuk blog
Related Articles :
- Scala for 2012? Deciding Whether to Invest In a Programming Language
- Yes, Virginia, Scala is hard
- Significant Software Development Developments of 2011
- Scala Tutorial – code blocks, coding style, closures, scala documentation project
- Scala use is less good than Java use for at least half of all Java projects