Outline

  • Part 1 - Creating actors and sending messages
  • Part 2 - Actor supervision and death watch
  • Part 3 - Scalability

Motivation

Motivation

  • Concurrency is hard
  • Users expect low latency
  • Users expect availability
  • Applications need to scale

What is Akka

  • "Toolkit" for the JVM
  • Extensive documentation (~460 pages for Java)
  • Concurrency & Distribution
  • Resilient
  • High performance
  • Elastic & Decentralized

Where should you use Akka

  • Transaction processing (banking, gaming, statistics)
  • Service backend (SOAP, REST, WebSockets)
  • Batch processing
  • Concurrency/parallelism

Outline

  • Part 1 - Creating actors and sending messages
  • Part 2 - Actor supervision and death watch
  • Part 3 - Scalability

The Actor Model

  • Actors sends messages asynchronously
  • Actors process messages sequentially

Hello Actor

import akka.actor._

class GreetingActor extends Actor {
  def receive = {
    case message: String => println("Hello " + message)
  }
}

object GreetingActor extends App {
  val system = ActorSystem("MySystem")
  val greetingActor: ActorRef = system.actorOf(Props[GreetingActor])
  greetingActor ! "Hulk Hogan"
}
GreetingActor_1.java|scala

Sending Messages

import akka.actor._

case class SayHello(name: String)

class GreetingActor extends Actor {
  def receive = {
    case hello: SayHello => {
      println("Hello " + hello.name)
      sender ! hello.name
    }
  }
}

object GreetingActor extends App {
  val greetingActor = ActorSystem("MySystem").actorOf(Props[GreetingActor])
  greetingActor ! SayHello("Pope Benedict")
}
GreetingActor_2.java|scala

Scheduling Work

import akka.actor._
import scala.concurrent.duration._

object DoGreeting

class GreetingActor(delay: FiniteDuration) extends Actor {

  override def preStart() = {
    scheduleNextGreeting()
  }

  def receive = {
    case DoGreeting => {
      println("Hello!")
      scheduleNextGreeting()
    }
  }

  def scheduleNextGreeting() {
    import context.dispatcher
    context.system.scheduler.scheduleOnce(delay, self, DoGreeting)
  }
}
GreetingActor_3.java|scala

Part 1 - Exercises

Exercises - Running tests in SBT

  • Running multiple test classes

    $ ./sbt
    > test-only workshop.part1.ComputeActorTest workshop.part2.ComputeSupervisorTest
  • Using wildcards and ~ (autorun)

    $ ./sbt
    > ~test-only workshop.part2*Test
  • Continuously run the part1 tests

    $ ./sbt
    > ~test-only workshop.part1.ComputeActorTest

Exercises - Running tests in Maven

  • Running multiple test classes

    $ mvn test -Dtest=workshop.part1.ComputeActorTest,ComputeSupervisorTest
  • Using wildcards

    $ mvn test -Dtest="workshop.part2.*"

Outline

  • Part 1 - Creating actors and sending messages
  • Part 2 - Actor supervision and death watch
  • Part 3 - Scalability

Supervision through hierarchies

  • Actor supervision is recursive, enabling delegation of failure handling

Creating a supervisor

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

class Supervisor extends Actor {

    override val supervisorStrategy =
      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute, loggingEnabled = false) {
        case _: ArithmeticException      => Resume
        case _: NullPointerException     => Restart
        case _: IllegalArgumentException => Stop
        case _: Exception                => Escalate
      }

    def receive = {
      case p: Props => sender ! context.actorOf(p)
    }
}
SupervisorActor.java|scala

Using Death Watch

import akka.actor.{Props, Actor, Terminated, ActorRef, ActorSystem}

class DeathWatchActor extends Actor {

  override def preStart() {
    val greetingActor = context.actorOf(Props[VolatileGreetingActor])
    context.watch(greetingActor)
    greetingActor ! "print this message, please!"
  }

  def receive = {
    case Terminated(_) => println("looks like an actor has died :(")
  }
}
DeathWatchActor.java|scala

Life-cycle of Actors

class LifeCycleActor extends Actor {
  def receive = {
    case e: Exception => throw e
  }
  override def preStart() {
    println("preStart() - called by FIRST actor-instance during startup")
  }
  override def postStop() {
    println("postStop() - called by ANY actor-instance during shutdown")
  }
  override def preRestart(reason: Throwable, message: Option[Any]) {
    println("preRestart() - called on ANY running actor about to be restarted")
  }
  override def postRestart(reason: Throwable) {
    println("postRestart() - called on a NEW INSTANCE of this actor after restart")
  }
}
LifeCycleActor.java|scala

What happens on an actor restart

Part 2 - Architecture


Part 2 - Exercises

  • Supervision

    • Implement ComputeSupervisor
    • Make tests in ComputeSupervisorTest green
  • Death Watch

    • Implement ClientActor
    • Make tests in ClientActorTest green

  • Docs:

Outline

  • Part 1 - Creating actors and sending messages
  • Part 2 - Actor supervision and death watch
  • Part 3 - Scalability

Routers

  • Routers are not actors, they are specifically designed for concurrency
  • Akka comes with different routers

Router example

class ComputeRouter_1 extends Actor {
    val router = context.actorOf(RoundRobinPool(50).props(Props[Routee]), "router1")

    def receive = {
        case s: String => router.tell(s, sender())
    }
}

class Routee extends Actor {
    def receive = { case s: String => sender ! s.length() }
}

object ComputeRouterExample extends App {
    val router: ActorRef = ActorSystem("MySystem").actorOf(Props[ComputeRouter_1])
    router ! "how long is this string?"
}
ComputeRouter_1.java|scala

Part 3 - Exercises

  • Implement SuperComputeActor.java|scala
  • Make tests in SuperComputerActorTest.java|scala green

  • Play around with:
    • Create multiple actors (one for each unit of work)
    • Use a routerJava|Scala
    • Use futuresJava|Scala