Outline

  • Part 1 - Creating actors and sending messages
  • Part 2 - Actor supervision and death watch
  • Part 3 - Scalability

Motivation

Motivation

  • Concurrency is hard
  • Users expect low latency
  • Users expect availability
  • Applications need to scale

What is Akka

  • "Toolkit" for the JVM
  • Extensive documentation (~350 pages)
  • Concurrency & Distribution
  • Resilient
  • High performance
  • Elastic & Decentralized

Where should you use Akka

  • Transaction processing (banking, gaming, statistics)
  • Service backend (SOAP, REST, WebSockets)
  • Batch processing
  • Concurrency/parallelism

Outline

  • Part 1 - Creating actors and sending messages
  • Part 2 - Actor supervision and death watch
  • Part 3 - Scalability

The Actor Model

  • Actors sends messages asynchronously
  • Actors process messages sequentially

Hello Actor

import akka.actor._

class GreetingActor extends Actor {
  def receive = {
    case message : String => println("Hello " + message)
  }
}

object GreetingActor extends App {
  val system = ActorSystem("MySystem")
  val actorRef = system.actorOf(Props[GreetingActor])
  actorRef ! "Hulk Hogan"
}
GreetingActor_1.scala

Sending Messages

import akka.actor._

case class SayHello(name: String)

class GreetingActor extends Actor {
  def receive = {
    case hello : SayHello => {
      println("Hello " + hello.name)
      sender ! " a reply"
    }
  }
}

object GreetingActor extends App {
  val actorRef = ActorSystem("MySystem").actorOf(Props[GreetingActor])
  actorRef ! SayHello("Pope Benedict")
}
GreetingActor_2.scala

Scheduling Work

import akka.actor._
import scala.concurrent.duration._

case class DoGreeting()

class GreetingActor(delay: FiniteDuration) extends Actor {

  override def preStart() = {
    scheduleNextGreeting()
  }

  def receive = {
    case DoGreeting => {
      println("Hello!")
      scheduleNextGreeting()
    }
  }

  def scheduleNextGreeting() {
    import context.dispatcher
    context.system.scheduler.scheduleOnce(delay, self, DoGreeting)
  }

}
GreetingActor_3.scala

Part 1 - Exercises

Part 1 - Exercises - Running tests in sbt

  • The testOnly task accepts a whitespace separated list of test names to run

    $ ./sbt
    > test-only workshop.part1.BestEverTest workshop.part1.ImaginaryTest
  • The testOnly task supports wildcards and ~ (autorun) as well

    $ ./sbt
    > ~test-only workshop.part*Test
  • Continuously run the part1 tests

    $ ./sbt
    > ~test-only workshop.part1.ComputeActorTest

Outline

  • Part 1 - Creating actors and sending messages
  • Part 2 - Actor supervision and death watch
  • Part 3 - Scalability

Supervision through hierarchies

  • Actor supervision is recursive, enabling delegation of failure handling

Creating a supervisor

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

class Supervisor extends Actor {

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute, loggingEnabled = false) {
        case _: ArithmeticException      => Resume
        case _: NullPointerException     => Restart
        case _: IllegalArgumentException => Stop
        case _: Exception                => Escalate
      }

    def receive = {
      case p: Props => sender ! context.actorOf(p)
    }
}
SupervisorActor.scala

Using Death Watch

import akka.actor.{Props, Actor, Terminated, ActorRef, ActorSystem}

class DeathWatchActor extends Actor {

  override def preStart() {
    val actorRef: ActorRef = context.actorOf(Props[VolatileGreetingActor])
    context.watch(actorRef)
    actorRef ! "print this message, please!"
  }

  def receive = {
    case Terminated(_) => println("looks like an actor has died :(")
  }
}
DeathWatchActor.scala

Life-cycle of Actors

class LifeCycleActor extends Actor {
  def receive = {
    case e: Exception => throw e
  }
  override def preStart() {
    println("preStart() - called by FIRST actor-instance during startup")
  }
  override def postStop() {
    println("postStop() - called by ANY actor-instance during shutdown")
  }
  override def preRestart(reason: Throwable, message: Option[Any]) {
    println("preRestart() - called on ANY running actor about to be restarted")
  }
  override def postRestart(reason: Throwable) {
    println("postRestart() - called on a NEW INSTANCE of this actor after restart")
  }
}
LifeCycleActor.scala

What happens on an actor restart

Part 2 - Architecture


Part 2 - Exercises

Outline

  • Part 1 - Creating actors and sending messages
  • Part 2 - Actor supervision and death watch
  • Part 3 - Scalability

Router example

class ComputeRouter_1 extends Actor {
    val router: ActorRef =
        context.actorOf(RoundRobinPool(50).props(Props[Routee]), "router1")

    def receive = {
        case s : String => router.tell(s, sender())
    }
}

class Routee extends Actor {
    def receive = { case s: String => sender ! s.length() }
}

object ComputeRouterExample extends App {
    val router: ActorRef = ActorSystem("MySystem").actorOf(Props[ComputeRouter_1])
    router ! "how long is this string?"
}
ComputeRouter_1.scala

Part 3 - Exercises

  • Implement SuperComputeActor.scala
  • Make tests in SuperComputerActorTest.scala green