Scala

Akka Notes – ActorSystem (Configuration and Scheduling) – 4

As we saw from our previous posts, we could create an Actor using the actorOf method of the ActorSystem. There’s actually much more you could do with ActorSystem. We’ll touch upon just the Configuration and the Scheduling bit in this write-up.

Let’s look at the subsets of methods available in the ActorSystem.

 
 
 
 
 
 
ActorSystemAPI

1. Configuration Management

Remember the application.conf file we used for configuring our log level in the previous write-up? This configuration file is just like those .properties files in Java applications and much more. We’ll be soon seeing how we could use this configuration file to customize our dispatchers, mailboxes etc. (I am not even closely doing justice to the power of the typesafe config. Please go through some examples to really appreciate its awesomeness).

So, when we create the ActorSystem using the ActorSystem object’s apply method without specifying any configuration, it looks out for application.conf, application.json and application.properties in the root of the classpath and loads them automatically.

So:

val system=ActorSystem("UniversityMessagingSystem")

is the same as:

val system=ActorSystem("UniversityMessagingSystem", ConfigFactory.load())

To provide evidence to that argument, check out the apply method in ActorSystem.scala:

def apply(name: String, config: Option[Config] = None, classLoader: Option[ClassLoader] = None, defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem = {
    val cl = classLoader.getOrElse(findClassLoader())
    val appConfig = config.getOrElse(ConfigFactory.load(cl))
    new ActorSystemImpl(name, appConfig, cl, defaultExecutionContext).start()
  }

a. Overriding default configuration

If you are not keen on using the application.conf (as in testcases) or would like to have your own custom configuration file (as in testing againt different configuration or deploying to different environments), you are free to override this by passing in your own configuration instead of wanting the one from the classpath.

ConfigFactory.parseString is one option

val actorSystem=ActorSystem("UniversityMessageSystem", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]"""))

or simply in your Testcase as:

class TeacherTestLogListener extends TestKit(ActorSystem("UniversityMessageSystem", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")))  
  with WordSpecLike
  with MustMatchers
  with BeforeAndAfterAll {

There’s also a ConfigFactory.load

val system = ActorSystem("UniversityMessageSystem", ConfigFactory.load("uat-application.conf"))

If you need access to your own config parameters in runtime, you could do it via its API like so :

val system=ActorSystem("UniversityMessageSystem", ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]"""))  
println (system.settings.config.getValue("akka.loggers")) // Results in > SimpleConfigList(["akka.testkit.TestEventListener"])

b. Extending default configuration

Other than overriding, you could also extend the default configuration with your custom configuration using the withFallback method of the Config.

Let’s say your application.conf looks like :

akka{  
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    loglevel = DEBUG
    arun="hello"
}

and you decide to override the akka.loggers property like :

val config=ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")
    val system=ActorSystem("UniversityMessageSystem", config.withFallback(ConfigFactory.load()))

You end up with a merged configuration of both :

println (system.settings.config.getValue("akka.arun")) //> ConfigString("hello")
    println (system.settings.config.getValue("akka.loggers")) //> SimpleConfigList(["akka.testkit.TestEventListener"])

So, why did I tell this whole story on configuration? Because our ActorSystem is the one which loads and provides access to all the configuration information.

IMPORTANT NOTE

Watch out the order of falling back here – which is the default and which is the extension configuration. Remember, you have to fall back to the default configuration. So:

config.withFallback(ConfigFactory.load())

would work but:

ConfigFactory.load().withFallback(config)

would not get the results that you may need.

2. Scheduler

Scheduler

As you can see from the API of ActorSystem, there is a powerful little method in ActorSystem called scheduler which returns a Scheduler. The Scheduler has a variety of schedule methods with which we could do some fun stuff inside the Actor environment.

a. Schedule something to execute once

DelayedActorTimer

Taking our Student-Teacher example, assume our StudentActor would want to send message to the teacher only after 5 seconds of it receiving the InitSignal from our Testcase and not immediately, our code looks like :

class StudentDelayedActor (teacherActorRef:ActorRef) extends Actor with ActorLogging {

  def receive = {
    case InitSignal=> {
      import context.dispatcher
      context.system.scheduler.scheduleOnce(5 seconds, teacherActorRef, QuoteRequest)
      //teacherActorRef!QuoteRequest
    }
    ...
    ...
  }
}

Testcase

Let’s cook up a testcase to verify this :

"A delayed student" must {

    "fire the QuoteRequest after 5 seconds when an InitSignal is sent to it" in {

      import me.rerun.akkanotes.messaging.protocols.StudentProtocol._

      val teacherRef = system.actorOf(Props[TeacherActor], "teacherActorDelayed")
      val studentRef = system.actorOf(Props(new StudentDelayedActor(teacherRef)), "studentDelayedActor")

      EventFilter.info (start="Printing from Student Actor", occurrences=1).intercept{
          studentRef!InitSignal
      }
    }

  }

Increasing the timeout for Eventfilter interception

Ouch. The default timeout for the EventFilter to wait for the message to appear in the EventStream is 3 seconds. Let’s increase that to 7 seconds now to verify our testcase. The filter-leeway configuration property helps us achieve that.

class RequestResponseTest extends TestKit(ActorSystem("TestUniversityMessageSystem", ConfigFactory.parseString("""  
                                            akka{
                                              loggers = ["akka.testkit.TestEventListener"]
                                              test{
                                                  filter-leeway = 7s
                                              }
                                            }
                                    """)))
  with WordSpecLike
  with MustMatchers
  with BeforeAndAfterAll 
  with ImplicitSender {
  ...
  ...

b. Schedule something to execute repeatedly

In order to execute something repeatedly, you use the schedule method of the Scheduler.

One of the frequently used overload of the schedule method is the one which sends a message to the Actor on a regular basis. It acccepts 4 parameters :

  1. How long should be initial delay be before the first execution begins
  2. Frequency of subsequent executions
  3. The target ActorRef that we are going to send a message to
  4. The Message
case InitSignal=> {  
      import context.dispatcher
      context.system.scheduler.schedule(0 seconds, 5 seconds, teacherActorRef, QuoteRequest)
      //teacherActorRef!QuoteRequest
    }

TRIVIA

The import import context.dispatcher is very important here.

The schedule methods requires a very important implicit parameter – ExecutionContext, the reason for which would be pretty obvious once we see the implementation of the schedule method :

final def schedule(  
    initialDelay: FiniteDuration,
    interval: FiniteDuration,
    receiver: ActorRef,
    message: Any)(implicit executor: ExecutionContext,
                  sender: ActorRef = Actor.noSender): Cancellable =
    schedule(initialDelay, interval, new Runnable {
      def run = {
        receiver ! message
        if (receiver.isTerminated)
          throw new SchedulerException("timer active for terminated actor")
      }
    })

The schedule method just wraps the tell in a Runnable which eventually is executed by the ExecutionContext that we pass in.

In order to make an ExecutionContext available in scope as an implicit, we leverage upon the implicit dispatcher available on the context.

From ActorCell.scala (Context):

/**
   * Returns the dispatcher (MessageDispatcher) that is used for this Actor.
   * Importing this member will place an implicit ExecutionContext in scope.
   */
  implicit def dispatcher: ExecutionContextExecutor

Code

As always, the entire project could be downloaded from github here.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button