Close panel

Close panel

Close panel

Close panel

Programming 06 Mar 2018

Akka Actors evolution. Towards type-safe distributed computations

At BBVA Labs, we have been using Akka Actors in different projects for a long time due to its inherent computation distribution capabilities in high load scenarios, using actors as stateless or stateful units of execution that send messages among them in an asynchronous fashion.

In this post, we will cover some of the weak points of working with untyped classic actors and we will show what are the benefits that Akka Typed brings to the scene. Finally, we will present a brief description of session types and the Process DSL API created by Dr. Roland Kuhn. All examples are written using the Scala API.

Akka is a complete set of tools and runtime infrastructure, and one of the main players in the industry. It allows to build concurrent, scalable and resilient systems in Scala and Java, besides of having a complete and very powerful suite for stream processing. Akka is an open source project with almost five hundred contributors which arose in order to address the requirements for new applications covered in the reactive manifesto. Deeper information about it is available on the akka.io website which has been recently changed and offers, in author’s opinion, a much better understanding.

Akka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala”

Actors

An Akka actor is mainly a class with one partial function which provides its behavior and an actor reference, represented in the ActorRef type, as a point of entry to its mailbox. We can consider that an actor is a computation unit which sends and receives messages. Once a message is received by the actor, it behaves according to the code inside its receive partial function. When the execution is finished, the actor can (or cannot) change its behavior to accept different kind of messages and perform a different task.

Let’s assume for example that we have to build a service which receives purchase events that must be enriched with a merchant fraud index to allow or deny the operation by a third party which needs this kind of information. This small protocol can be modeled by one actor switching its internal behavior using the become or unbecome functions. Once a purchase event is received, the actor asks another service about the fraud index of the merchant to whom the purchase has been sent. When the response is received, the actor sends the result to the saver which stores the result in some place for being processed later by a fraud detector.

The following code shows the types used and the actor implementation:

// Represents the incoming event
case class EventRow(id: String, customer: String, merchant: String, timestamp: Long)

// Enriched event with the merchant fraud index
case class MerchantFraudIndex(fraudCoefficient: Double)

// Coproduct to report if the operation was correctly processed
sealed trait EventOperation
case class OperationOk(id: String) extends EventOperation
case class OperationError(id: String, err: String) extends EventOperation

// Coproduct to handle the process state
sealed trait ActorState
case object Processing extends ActorState
case object Enriched extends ActorState
case object Done extends ActorState

final class EventProcessor(enricherActor: ActorRef, saver: ActorRef, id: String)
   extends Actor {

 // private variable to store internal transaction state
 private var state: ActorState = Processing

 override def receive: Receive = {
   case newEvent: EventRow =>
     enricherActor ! newEvent
     context.become {
       case MerchantFraudIndex(index) =>
         state = Enriched
         context.become {
           case OperationOk(id) =>
             println("Event enriched and stored OK!!")
             state = Done
           case OperationError(id, errmsg) =>
             println(
               s"There was an error processing event ${id}. Message: ${errmsg}")
             state = Done
         }
         saver ! (newEvent, index)
     }
 }
}

Each time one event enters the system, a new EventProcessor actor is spawned to do the task. The actor behaves according to its partial function and waits for an EventRow, then calls the enricher actor and changes its internal behavior to accept the merchant fraud index required for the saver actor. Besides, the actor uses a private mutable variable which acts as an internal state, to be monitorized if it is desired. Akka actors guarantee safe concurrent access to this variable when it is updated inside the receive method if we need to track the state of each transaction.

To analyze the common pitfalls which may happen dealing with classic Akka actors, a possible implementation of the enricher actor is shown below:

final class EnricherActor extends Actor {
 override def receive: Receive = {
   case n: EventRow =>
     val fraudIndex: Long = ??? // Get from database or whatever merchant fraud index
     sender ! MerchantFraudIndex(fraudIndex)
 }
}

The code above shows a kind of actor’s minimum viable product. For actors it is only necessary to implement their partial function, which is of the type PartialFunction[Any, Unit]. Clients can send data of any type through actor references, they are not required by the compiler to send messages of a specific type.

Another controversial feature is the sender property. It is a very common used property to send back a response to the caller, but it is prone to errors in runtime if your actors deal with asynchronous operations in the receive method. Many Scala an Java APIs work asynchronously, the calling thread does not stop waiting to one function to complete. If one function returns a type which represents an asynchronous computation and thereby may change the context of the execution, accessing to the actor’s sender function in the type’s callback could refer to a wrong sender because another message might have been pulled from the mailbox before that computation is completed.

final class EnricherActor extends Actor {
 override def receive: Receive = {
   case n: EventRow =>
     // Get from database merchant with an asynchronous call
     val fraudIndex: Future[Long] = ???
     fraudIndex.onComplete { case Success(res) => sender ! res // bad idea
   }
 }
}

Although there are also workarounds to handle this possible inconsistency, like saving the sender in another variable when a new message reaches it, it is still something to muddle through. Akka Typed comes into the picture to give elegant answers to these questions.

Akka Typed is still in an experimentational state; nevertheless we will describe how a representation of the same protocol could be to see if we can obtain some benefit with this alternative.

Akka Typed

We will use the immutable behaviors implementation. Akka documentation encourages using that approach to avoid possible problems that could happen in the mutable scenario as we described in the common untyped actors. More details, pros and cons are described in this blog series if a further analysis comparing the mutable and immutable is desired.

The first thing that can look odd to developers used to working with Akka Actors is that there is no Actor trait to extend from. Instead, they have to get used to working with immutable behaviors and other concepts we will explain. For example, the enricher actor can be represented as the following behavior:

def enricherBehavior =
 Actor.immutable[EventRow] { (ctx, m) =>
 val fraudIndex: Long = ???
 m.replyTo ! MerchantFraudIndex(fraudIndex)
 Actor.same
}

First, notice that the behavior implementation is parameterized, which means that it only accepts EventRow data. Besides, the immutable function provides access to the actor context thus giving the capability of creating actors if that was necessary, among other things it provides. The compiler also infers the type of the m parameter giving direct access to the object properties. Actors references created from this behavior definition will only accept messages of the type EventRow when other actors want to send messages through them, thereby providing type safety at compile time.

There is no sender function anymore, you must include the actor reference in your types explicitly for the target actors to return back messages and thereby eliminating any possible runtime error caused by some kind of side effect as we have seen before. The delivery for the enricher actor includes the main actor reference explicitly via the replyTo property of the EventRow type. It might seem like a regression turning away this actor sender property and needing the sender reference as a property of case classes, but in this way it makes the protocol definition more revealing.

The types used to build the protocol are the following:

case class EventRow(id: String, customer: String, merchant: String, timestamp: Long, replyTo: ActorRef[Response])

sealed trait Response
case object OperationOk extends Response
case class OperationError(id: String, err: String) extends Response
case class MerchantFraudIndex(fraudCoefficient: Double) extends Response

The whole protocol implementation is coded inside the main behavior, as described below:

def main(inputEvent: EventRow): Behavior[Response] =
 Actor.deferred[Response] { ctx =>
    val enricherRef = ctx.spawn(enricherBehavior, "enricher")
   val saverRef = ctx.spawn(saverBehavior, "saver")

   enricherRef ! inputEvent
   // Change behavior to wait for fraud index
   Actor.immutable {
     case (ctx, m: MerchantFraudIndex) =>
       saverRef ! ((inputEvent, m))
       Actor.immutable { (ctx, m) =>
         m match {
           case OperationOk => { println("Process OK"); Actor.stopped }
           case OperationError(id, err) => { println(s"There was an error $err"); Actor.stopped }
         }
       }
   }
 }

Typed actor references are instantiated via the spawn function which needs one behavior object and a string which will be the name for the underlying created actor. In the example, the enricher and saver actors will be accessible through an ActorRef[EventRow] and an ActorRef[(EventRow, MerchantFraudIndex)] respectively.

The first call is to the enricher actor, then a Behavior[Response] is returned to deal with fraud index responses. Considering only the happy path, when the response arrives another new behavior is returned to deal with saver responses. When the saver actor sends back the result, it is printed and the main actor stops.

The enricher and saver always return the same behavior, they do not change as long as the messages are getting through; they always do the same thing: calling a third party and responding with a Response type. These actors are constrained to return Response objects through the replayTo property if they want to send their responses back.

This way of using Akka Typed can reduce runtime errors as far as possible. Developers who work with Akka in complex scenarios can benefit from these new “typed features” reducing the number of messages falling into the dead letters channel.

Far beyond

Up to this point we have seen protocols built with actors consisting of a series of operations executed by actors which must be coordinated through messages interchange. With Akka Typed and the Scala type system we protect ourselves from some runtime errors, but, when the protocol definition is complex and the number of actors and interactions grow, maybe some kind of a high level expressiveness could be helpful.

An important concept to have a look at is Session Types. Concurrent and distributed processes interact according to a specific protocol, e.g. when a user requests access to a secured resource, several processes interchange messages to grant or deny access. Session Types specify the sequence and structure of messages that take part in application protocols. Imagine that you want to intercommunicate two processes through one channel. The first process sends its authentication data as a string and the receiver responds with a true or false. Although it is seen as the same channel, the data type in each end is different. Session Types model all these communications details, besides creating types that model the whole interaction. The next example is from Scalas,A.Yoshida,N.(2016) Lightweight Session Programming in Scala:

Sh = µX.(!Greet(String).(?Hello(String).X & ?Bye(String).end) ⊕ !Quit.end)

The representation above represents the client side of a “greeting protocol”. The client can send a Greet message of type String or Quit. If he chooses the first option it changes his behavior to wait for Hello or Bye. If the response is Bye, the process ends; if it is Hello, it continues recursively.

There are several session types implementations in different programming languages such as C, Scala or Haskell. Another very interesting resource is Scrible, a language which describes distributed protocols among systems. It would be interesting describing and building a protocol on top of Akka using this, wouldn’t it?

In the next section, we will describe the Process DSL by Roland Kuhn. A proposal of modeling session types on top of Akka Typed.

Protecting resources, another protocol

Let’s assume that your application needs to access some resource stored in some database or file system and is properly protected. To allow access, it is necessary to have one token emitted from another system. Due to security restrictions, this token must be signed and getting one is only possible via an actor reference. Then, authorization takes place to get the user claims which provide the capabilities that the user owns.

Instead of using Akka directly we are going to use the DSL approach. There are three main players which take part in this implementation: Akka Typed providing the underlying communication infrastructure, the “always present” Shapeless library and the Scala compiler itself.

Scared, who said so …

If you are comfortable with compec like monads or free monad interpreters you are going to feel at home, because the next code is like an armistice agreement between the actors anarchy and the strictness of static functional programming.

The concurrent code which runs inside one actor is represented as a sequence of operations. One operation is modeled as an effect that provides a runtime value which can be used to create sequential or monadic (apart from laws) operations composition to be executed inside a process. Processes could be described as concurrent execution units which run within actors. They also have their own references like actors do.

In the next example, apart from the OpDSL related stuff needed, there are three operations which form the process: the first operation gets the reference of the process itself, the second sends the authentication request to an external actor and puts the process in waiting state, and the third builds the response with the token once it has been received.

trait Response
case object AuthenticationError extends Response
case class AuthenticationOk(token: String) extends Response
case class AuthenticationError(err: String) extends Response
case class AuthorizationResponse(claims: List[String]) extends Response
case class Result(content: String) extends Response

case class AuthenticationRequest(id: String, replyTo: ActorRef[Response])

def doAuthentication(authZ: ActorRef[AuthenticationRequest], id: String) =
 OpDSL[Response] { implicit opDSL =>
   for {
     self <- opProcessSelf
     _    <- opSend(authZ, AuthenticationRequest(id, self))
     AuthenticationOk(token) <- opRead
   } yield token
 }.withTimeout(3.seconds)

The process has a limit of three seconds to perform the operation, otherwise it is aborted. The previous function returns the following type:

Process[Response, String, ::[E.Send[AuthenticationRequest],::[E.Read[Response], _0]]]

The first parameter corresponds to the input channel type to receive the response. The second is the token (a string) that the process will spawn after its execution, and finally the composed effect that describes the two operations which take part.

The following two steps of the protocol have the same structure, except the type of the remote actor to which the request is made and the yield value:

case class AuthorizationRequest(id: String, feature: String, replyTo: ActorRef[Response])
case class QueryFeature(featureId: String, userAttributes: List[String], replyTo: ActorRef[Response])

def doAuthorization(authoZ: ActorRef[AuthorizationRequest], token: String, resource: String) =
 OpDSL[Response] { implicit opDSL =>
   for {
     self <- opProcessSelf
     _    <- opSend(authoZ, AuthorizationRequest(token, resource, self))
     AuthorizationResponse(claims) <- opRead 
   } yield claims 
 } 

def doQuery(query: ActorRef[QueryFeature], claims: List[String], resource: String) = OpDSL[Response] { implicit opDSL =>
   for {
     self <- opProcessSelf
     _    <- opSend(query, QueryFeature(resource, claims, self))
     Result(value)     <- opRead
   } yield value
}

The main process that builds the protocol consists of joining the three “subprocesses” in a for comprehension structure. To see the flatMap implementation type of the Process in more detail take a look at the github repository. The whole implementation would be as follows:

case object AuthenticationService extends ServiceKey[AuthenticationRequest]
case object AuthorizationService extends ServiceKey[AuthorizationRequest]
case object QueryService extends ServiceKey[QueryFeature]

def main(id: String, feature: String) = {
 OpDSL[Nothing] { implicit opDSL =>
   for {
     self <- opActorSelf
     authRef <- opCall(getService(AuthenticationService).named("authentication_service"))
     token <- opCall(doAuthentication(authRef, id).named("send_auth"))
     authoRef  <- opCall(getService(AuthorizationService).named("authorization_service"))
     claims  <- opCall(doAuthorization(authoRef, token, feature).named("send_autho"))
     srv  <- opCall(getService(QueryService).named("query_service"))
     result   <- opCall(doQuery(srv, claims, feature).named("send_query"))
   } yield ()
 }
}

The opCall function executes one process inside the parent actor and produces a runtime value that will be used in the next step, as described in the authentication case. Observe that before executing each step, the actor reference is previously obtained, this is made calling the getService function within a process that will provide the actor reference required. The getService function asks the receptionist to get the actor reference. More information about this pattern is available on the Akka website. As an example of service registration, in the receptionist there should be something like that for the authentication service:

val authBehavior: Behavior[AuthenticationRequest] =
 Actor.deferred { ctx ⇒
   ctx.system.receptionist ! Receptionist.Register(AuthenticationService, ctx.self)(ctx.system.deadLetters)
   Actor.immutable[AuthenticationRequest] { case (ctx, msg) ⇒
     msg.replyTo ! AuthenticationOk("token")
     Actor.same
   }
 }

The main function returns the whole protocol composed effect which must be interpreted and executed by an interpreter which is provided by the library.

The idea behind how this effect is accumulated is based on Shapeless HList. These structures are used to work with heterogeneous lists, which means that they can contain values of different types. For example:

1 :: "foo" :: true :: HNil

This instance represents a list which contains an integer as head and a tail which is another HList composed by a string and a boolean. Extrapolating this representation to a sequence of effects can provide the whole protocol description. As we said before, when a monadic operation composition is done to build a protocol with this DSL, every flatMap prepends the new effect to the last, stacking them in a pseudo HList, giving a kind of Session Type representation as a Scala type that can be checked at compile time.

Our protocol could be represented like this:

object QueryFeatureProtocol extends E.Protocol {
  type Session =
  E.Send[AuthenticationRequest] ::
  E.Read[Response] ::
  E.Choice[(E.Halt :: _0) :+: _0 :+: CNil] ::
  E.Send[AuthorizationRequest] ::
  E.Read[Response] ::
  E.Choice[(E.Halt :: _0) :+: _0 :+: CNil] ::
  E.Send[QueryFeature] ::
  E.Read[Response] ::
  E.Choice[(E.Halt :: _0) :+: _0 :+: CNil] ::
  _0
}

The Choice effect is a Shapeless coproduct which shows that the operation can fail and stop, or produce a empty effect, represented by _0 which means that the process continues. Remember that the whole process is built inside an actor and if some of the underlying process crashes, then it stops.

The actual implementation, at the moment, needs to call one “checking“ function explicitly:

E.vetExternalProtocol(QueryFeatureProtocol, main(???, ???))

If the protocol does not comply with the description above, the compiler will tell what it expects from the main function return type.

Conclusions

Akka is intended to deal with high load and distributed applications. Typically developers must deal with complex asynchronous and distributed scenarios which may require more features in the programming API to make the code safer and more expressive. Here, Akka Type will play an important role and can likely gain more presence in the future. The Process DSL is very experimental and needs more support by the community. Anyway, building a production ready framework based on session types on top of Akka represents a challenge that should be considered.

Other interesting stories