Cerrar panel

Cerrar panel

Cerrar panel

Cerrar panel

Programación 06 mar 2018

Evolución del actor de Akka: hacia una computación distribuida más segura

En BBVA Labs hemos usado actores Akka en diferentes proyectos durante bastante tiempo, gracias a su capacidad de distribución de la computación en escenarios de alta carga, usando actores con o sin estado que se envían mensajes entre ellos de forma asíncrona.

En este post veremos algunos de los puntos débiles a la hora de trabajar con actores clásicos sin tipado, y veremos qué beneficios nos aporta Akka Typed. Finalmente, presentaremos una breve descripción de los Session Types y la API de Process DSL, creada por el Dr. Rolan Kuhn. Todos los ejemplos están escritas usando la API de Scala.

Akka es un conjunto de herramientas e infraestructura de ejecución y una de las principales implementaciones del modelo actor sobre la JVM. Ofrece una api sencilla para crear sistemas de alta concurrencia además de ofrecer una suite muy completa para procesamiento en stream. Es un proyecto open source con alrededor de quinientos contribuidores que surgió para cubrir las necesidades que requieren aplicaciones de nueva generación recogidas en el reactive manifesto. Documentación en profundidad está disponible en el recientemente renovado website .

Akka es un conjunto de herramientas para construir sistemas concurrentes y distribuidos para Scala y Java bajo el paradigma del modelo de actor”

Actores

Un actor en Akka es fundamentalmente una clase con una partial function que describe su comportamiento o behaviour y un punto de entrada a su buzón: la referencia del actor representada en el tipo ActorRef. Podemos imaginar que un actor es una mínima entidad de ejecución que envía y recibe mensajes. Cada vez que llega un mensaje a su buzón el actor realiza la ejecución correspondiente al código incluido en la partial function y una vez fializada, el actor puede (o no) cambiar su comportamiento para reaccionar a otro tipo de mensajes y realizar una tarea diferente.

Como ejemplo, imaginemos el escenario en el que tenemos que construir un servicio que recibe eventos de compra. Una vez llegan al sistema son enriquecidos con un índice de fraude correspondiente al comercio donde se realiza la operación y que servirá para aceptarla o denegarla en un paso posterior. Este simple protocolo puede ser modelado con un simple actor que va cambiando su comportamiento interno usando la función become. Cuando un evento de compra llega, el actor envía un mensaje de consulta a otro actor que dispone de la información de cada comercio. Cuando la respuesta es recibida, se envía el evento enriquecido hacia el actor que hace la función de registrarlo en un repositorio, donde posteriormente un procesador de fraude analizará la operación entrante.

El siguiente código muestra los tipos utilizados y la implementación del actor que ejecuta y coordina todo el proceso:

// Represents the incoming event
case class EventRow(id: String, customer: String, merchant: String, timestamp: Long)

// Enriched event with the merchant fraud index
case class MerchantFraudIndex(fraudCoefficient: Double)

// type to report if the operation result
sealed trait EventOperation
case class OperationOk(id: String) extends EventOperation
case class OperationError(id: String, err: String) extends EventOperation

// type to handle the process state
sealed trait ActorState
case object Processing extends ActorState
case object Enriched extends ActorState
case object Done extends ActorState

final class EventProcessor(enricherActor: ActorRef, saver: ActorRef, id: String)
   extends Actor {

 // private variable to store internal transaction state
 private var state: ActorState = Processing

 override def receive: Receive = {
   case newEvent: EventRow =>
     enricherActor ! newEvent
     context.become {
       case MerchantFraudIndex(index) =>
         state = Enriched
         context.become {
           case OperationOk(id) =>
             println("Event enriched and stored OK!!")
             state = Done
           case OperationError(id, errmsg) =>
             println(
               s"There was an error processing event ${id}. Message: ${errmsg}")
             state = Done
         }
         saver ! (newEvent, index)
     }
 }
}

En el momento que un evento de compra llega al actor, éste pregunta a EnrichterActorusando la función !. Una vez que envía la petición el actor cambia su comportamiento, aceptado únicamente desde ese momento mensajes del tipo MerchantFraudIndex. El evento entrante más el índice de fraude se envía al Saver en forma de tupla para que sea almacenado. Además, en este ejemplo se hace uso de una variable mutable para que el actor vaya actualizando el estado en el que se encuentra, en caso que quisiéramos monitorizar la situación de las transacciones. El actor garantiza el acceso seguro a esta variable siempre que se actualice dentro del método receive.

Para analizar alguno de los puntos que débiles que pueden provocar errores en tiempo de ejecución cuando se trabaja con actores Akka, se muestra a continuación una posible y simple implementación del EnricherActor:

final class EnricherActor extends Actor {
 override def receive: Receive = {
   case n: EventRow =>
     val fraudIndex: Long = ??? // Get from database or whatever the merchant fraud index
     sender ! MerchantFraudIndex(fraudIndex)
 }
}

El código anterior muestra una mínima definición funcional de un actor: sólo se necesita implementar el método receivede tipo PartialFunction[Any, Unit] para que el código compile. Este tipo permite a cualquier posible cliente enviar cualquier tipo de mensaje, el compilador no alerta ni ofrece información alguna sobre que mensajes son tratados en el destino.

Otro aspecto susceptible a errores en los actores en Akka es la propiedad sender. Representa una especie de acceso directo para enviar mensajes de vuelta hacia el actor que previamente envió un mensaje, pero su uso puede provocar errores en ejecución cuando se trabaja con apis asíncronas dentro del método receive. Muchas apis en Scala y Java trabajan de esta manera, es decir, el hilo no se bloquea esperando a que una ejecución finalice. Si una función retorna un tipo que representa una computación asíncrona, como podría ser un futuro, acceder a la función sender dentro su callback puede referenciar un origen distinto al esperado debido a que otro mensaje desde otro origen puede haber entrado en el actor antes de que la computación haya finalizado:

final class EnricherActor extends Actor {
 override def receive: Receive = {
   case n: EventRow =>
     // Get from database merchant with an asynchronous call
     val fraudIndex: Future[Long] = ???
     fraudIndex.onComplete { case Success(res) => sender ! res // BAD IDEA!!
   }
 }
}

Existen ciertas prácticas para evitar estos posibles errores, como guardar el origen de cada mensaje en otra variable cada vez que llega un mensaje. No deja de ser un parche que cada desarrollador afronta a su manera. La aparición de Akka Typed intenta, entre otras cosas, dar una respuesta a este tipo de cuestiones de una manera más elegante.

Akka Typed se encuentra todavía en estado experimental; a pesar de ello vamos a ver como sería una implementación de este mini protocolo usando esta especificación y ver como nuestros desarrollos pueden ser más seguros.

Akka Typed

La primera cosa que que nos encontramos al empezar con Akka Typed es que un actor no se implementa como una clase que extiende el Actor trait, y que hay que familiarizarse con el concepto de behaviours. Normalmente trataremos con funciones que crean estas behaviors y que serán utilizadas a la hora de crear actores.

Vamos a usar la implementación immutable. El blog de Akka recomienda ir por este camino para evitar los problemas que comentamos anteriormente. Más detalles sobre la diferencia entre el escenario mutable e inmutable se describen en el siguiente blog.

Por ejemplo, el actor Enricher podría tener la siguiente representación:

def enricherBehavior =
 Actor.immutable[EventRow] { (ctx, m) =>
 val fraudIndex: Long = ???
 m.replyTo ! MerchantFraudIndex(fraudIndex)
 Actor.same
}

Lo primero en lo que debemos fijarnos es que la función immutable que crea la behaviour accepta un type parameter. Ésto que indica que la behaviour resultante sólo admite mensajes de tipo EventRow. Además, la función proporciona acceso al ActorContext o contexto de ejecución de actor, ofreciendo, entre otras cosas, la capacidad de crear nuevos actores si fuera necesario. El compilador indicará que el parámetro m es una instancia de tipo EventRow permitiendo el acceso de manera directa a sus propiedades sin la necesidad de tener que hacer el pattern matching como en la versión clásica. Los actores que se crean a partir de esta definición de behaviour solo aceptarán mensajes de este tipo y las referencias para acceder a ellos tambien serán tipos parametrizados. En este caso las referencias serían ActorRef[EventRow].

La función sender desaparece y además no aparece nada para sustituirla, lo cual puede parecer que es como un paso atrás en cuanto a funcionalidad del trait Actor. Sin embargo, eliminar este tipo de propiedades proclives a efectos colaterales parece la decisión correcta. La referencia al origen debe ser incluida de manera explícita en los tipos que se intercambian los actores para construir un determinado protocolo lo cual obliga a una definición más exhaustiva del mismo.

Una implementación del protocolo con Akka Typed podría ser la siguiente:

//The types used to build the protocol are
case class EventRow(id: String, customer: String, merchant: String, timestamp: Long, replyTo: ActorRef[Response])

sealed trait Response
case object OperationOk extends Response
case class OperationError(id: String, err: String) extends Response
case class MerchantFraudIndex(fraudCoefficient: Double) extends Response
 
def main(inputEvent: EventRow): Behavior[Response] =
 Actor.deferred[Response] { ctx =>
   val enricherRef = ctx.spawn(enricherBehavior, "enricher")
   val saverRef = ctx.spawn(saverBehavior, "saver")

   enricherRef ! inputEvent
   // Change behavior to wait for fraud index
   Actor.immutable {
     case (ctx, m: MerchantFraudIndex) =>
       saverRef ! ((inputEvent, m))
       Actor.immutable { (ctx, m) =>
         m match {
           case OperationOk => { println("Process OK"); Actor.stopped }
           case OperationError(id, err) => { println(s"There was an error $err"); Actor.stopped }
         }
       }
   }
 }

Los actores que intervienen en el protocolo son creados con la función spawn, la cual requiere una bahaviour principal. El parámetro ctx ofrece esta capacidad tal y como se muestra en el código. Los actores Enricher y Saverofrecen como punto de acceso los tipos ActorRef[EventRow] y ActorRef[(EventRow, MerchantFraudIndex)] respectivamente.

La primera llamada que se realiza va dirigida al actor Enricher mediante la función !. Inmediatamente se retorna la Behavior para tratar la respuesta. Si obviamos los escenarios de error, cuando el mensaje del Enricher llega se procede de igual manera: se envía el evento enriquecido al actor Saver y nos quedamos a la espera de la respuesta del proceso de registro de evento. Cuando el resultado llega el actor principal para.

En el otro extremo, tanto el actor Enricher como el Saver siempre hacen lo mismo: llaman a otro actor que les ofrece el servicio y retornan la respuesta a través de la propiedad replayTo de los mensajes de entrada. Además, tienen la restricción de enviar tipos Response hacia el origen. El compilador no les permitirá enviar de vuelta cualquier tipo como ocurre en el escenario de Akka clásico. La utilización de estos “canales” tipados reducirá el número de mensajes enviados al canal dead-Letters.

Más lejos todavía

Hasta aquí hemos visto como se pueden construir protocolos usando actores. Mediante Akka Typed y el sistema de tipos de Scala podemos protegernos de ciertos tipos de errores en tiempo de ejecución. Sin embargo, cuando el protocolo es más complejo y tanto el numero de actores como de interacciones entre ellos aumenta podemos sopesar el utilizr una especificación de más alto nivel a la hora de definir e implementar protolos distribuidos.

Un concepto que nos puede ayudar a pensar en un nivel de abstraccion mayor son los Session Types. Como hemos visto, los actores que conforman procesos distribuidos interactúan y se coordinan siguiendo la definición de un protocolo. Por poner un ejemplo, cuando un usuario pide acceso a un recurso restringido varios sistemas se intercambian mensajes con una estructura definida hasta que otorga o deniega el acceso. Los Session Types definen la secuencia y la definición de los mensajes que intervienen en la ejecución de un protocolo. Imagina que quieres comunicar dos procesos a través de un canal y el primero envía un mensaje de petición de autenticación en forma de cadena de caracteres o string y el segundo responde con un valor booleano: true o false. Aunque pueda parecer que el canal de comunicación es único entre los dos, el tipo de mensaje es distinto cuando la información viaja de un sentido al otro y viceversa. Este tipo de detalles, además de una representación que modela la secuencia completa de interacciones necesarias es lo que definen los Session Types.

El siguiente ejemplo de definición de Session Type es de la siguiente publicación Scalas,A.Yoshida,N.(2016) Lightweight Session Programming in Scala:

Sh = µX.(!Greet(String).(?Hello(String).X & ?Bye(String).end) ⊕ !Quit.end)

Representa el lado cliente de un “greeting protocol”. El cliente puede enviar un mensaje “Greet” de tipo String o un mensaje Bye. Si se envía la primera opción cambia su estado a la espera de un mensaje Hello o Bye. Si la respuesta que llega es Bye el proceso termina; si es Hello continúa de manera recursiva.

Existen implementaciones de Session Types en diferentes lenguajes como C, Scala o Haskell. Otro herramienta interesante es Scribble, un lenguaje diseñado para modelar protocolos distribuidos construido sobre las bases teóricas del Pi Calculus y los Session Types.

En la siguiente sección vamos a hablar de Process DSL creado por Roland Kuhn @rolandkuhn. Una propuesta para modelar e implementar Session Types sobre Akka Typed.

Proteger recursos, otro protocolo

Consideremos que una aplicación necesita acceder a un recurso guardado en una base de datos o sistema de ficheros y que está debidamente protegido. Para permitir el acceso es necesario conseguir un token emitido por otro sistema. Debido a restricciones de seguridad, este token debe ser firmado y conseguirlo sólo es accesible a través de una referencia de actor que necesita un identificador de usuario. Una vez el usuario consigue el token el siguiente paso es proceder a la autorización, donde el usuario que solicita el acceso obtendrá los atributos de seguridad de los que dispone y que serán enviados hacia el sistema que custodia el recurso protegido junto con la petición de acceso.

En lugar de empezar a codificar el protocolo directamente en Akka vamos a utilizar la aproximación basada en el DSL. Existen tres protagonistas principales que intervienen en esta implementación: Akka Typed que proporciona la estructura de actores y comunicación subyacente, la librería de programación genérica Shapeless y el propio compilador de Scala.

En lugar de representar cada paso del protocolo con una Behaviour lo vamos a hacer utilizando un conjunto de operaciones. Una operación representa un efecto que proporciona un valor en tiempo de ejecución y que puede ser encadenada a otra formando una composición secuencial. Cada conjunto de estas acciones forma un proceso que se ejecutará dentro de un actor. Al igual que los actores, estos procesos, disponen de sus propias referencias para poder enviarles mensajes.

En el ejemplo siguiente, a parte de todo lo necesario lo relativo al DSL inyectado vía el implícito OpDSL, se muestran tres operaciones que forman el proceso de autenticación: la primera consiste en obtener la referencia del proceso donde se ejecutarán las operaciones, la segunda envía la petición de autenticación a un actor y por último la operación que pone al proceso en modo de espera hasta que el token correspondiente a autenticación aceptada sea recibido:

trait Response
case object AuthenticationError extends Response
case class AuthenticationOk(token: String) extends Response
case class AuthenticationError(err: String) extends Response
case class AuthorizationResponse(claims: List[String]) extends Response
case class Result(content: String) extends Response

case class AuthenticationRequest(id: String, replyTo: ActorRef[Response])

def doAuthentication(authZ: ActorRef[AuthenticationRequest], id: String) =
 OpDSL[Response] { implicit opDSL =>
   for {
     self <- opProcessSelf
     _    <- opSend(authZ, AuthenticationRequest(id, self))
     AuthenticationOk(token) <- opRead
   } yield token
 }.withTimeout(3.seconds)

El proceso tiene un limite de tiempo de espera de tres segundos. Si se sobrepasa es abortado. La función de autenticación devuelve un tipo Process con tres type parameters como retorno:

Process[Response, String, ::[E.Send[AuthenticationRequest],::[E.Read[Response], _0]]]

El primero corresponde al canal de entrada del proceso hacia donde el actor que autentica enviará la respuesta. El segundo es el token de retorno de tipo String y por último el efecto que muestra la secuencia de operaciones que conforman el proceso de autenticación: “envía una petición de autenticación y ponte a la espera hasta que llegue la respuesta”.

Los procesos siguientes tienen una estructura similar, excepto por el actor remoto que provee el servicio y el valor de retorno.

case class AuthorizationRequest(id: String, feature: String, replyTo: ActorRef[Response])
case class QueryFeature(featureId: String, userAttributes: List[String], replyTo: ActorRef[Response])

def doAuthorization(authoZ: ActorRef[AuthorizationRequest], token: String, resource: String) =
 OpDSL[Response] { implicit opDSL =>
   for {
     self <- opProcessSelf
     _    <- opSend(authoZ, AuthorizationRequest(token, resource, self))
     AuthorizationResponse(claims) <- opRead 
   } yield claims 
 } 

def doQuery(query: ActorRef[QueryFeature], claims: List[String], resource: String) = OpDSL[Response] { implicit opDSL =>
   for {
     self <- opProcessSelf
     _    <- opSend(query, QueryFeature(resource, claims, self))
     Result(value)     <- opRead
   } yield value
}

El proceso principal consiste en unir los tres “subprocesos” en una estructura “for-comprehension” que las compone de manera secuencial. La representación de la función principal quedaría de la siguiente forma:

case object AuthenticationService extends ServiceKey[AuthenticationRequest]
case object AuthorizationService extends ServiceKey[AuthorizationRequest]
case object QueryService extends ServiceKey[QueryFeature]

def main(id: String, feature: String) = {
 OpDSL[Nothing] { implicit opDSL =>
   for {
     self <- opActorSelf
     authRef <- opCall(getService(AuthenticationService).named("authentication_service"))
     token <- opCall(doAuthentication(authRef, id).named("send_auth"))
     authoRef  <- opCall(getService(AuthorizationService).named("authorization_service"))
     claims  <- opCall(doAuthorization(authoRef, token, feature).named("send_autho"))
     srv  <- opCall(getService(QueryService).named("query_service"))
     result   <- opCall(doQuery(srv, claims, feature).named("send_query"))
   } yield ()
 }
}

La función opCall ejecuta un proceso dentro del actor padre y produce un valor en tiempo de ejecución necesario para el siguiente paso. Importante ver que antes de cada llamada a cada uno de los subprocesos se obtiene una referencia al actor que proporciona el servicio. Ésto se realiza mediante la llamada a la función getService que a su vez se ejecuta dentro de otro proceso y que llama al “recepcionista” para obtener la referencia. Más información acerca del patrón recepcionista está disponible en la documentación de Akka. Como ejemplo, el siguiente código muestra el registro del actor en el repcecionista que realiza el proceso de autenticación:

val authBehavior: Behavior[AuthenticationRequest] =
 Actor.deferred { ctx ⇒
   ctx.system.receptionist ! Receptionist.Register(AuthenticationService, ctx.self)(ctx.system.deadLetters)
   Actor.immutable[AuthenticationRequest] { case (ctx, msg) ⇒
     msg.replyTo ! AuthenticationOk("token")
     Actor.same
   }
 }

La función principal que define todo el protocolo retorna el tipo que representa el efecto compuesto que debe ser interpretado y ejecutado mediante un interpreter incluido dentro del proyecto. El código se puede consultar el repositorio de Github.

La idea detrás de como el efecto se va acumulando conforme añadimos subprocesos para construir el protocolo está basado en el tipo Shapeless HList. Esta estructura se usa para trabajar con listas de tipos heterogéneos, es decir, a diferencia de las colecciones habituales en Scala formadas por elementos del mismo tipo, éstas pueden contener miembros de distinta naturaleza. Por ejemplo:

1 :: "foo" :: true :: HNil

La linea anterior representa una instancia del tipo HList de tres elementos de tipos diferentes. Un entero como valor inicial, un tipo String y un booleano. Extrapolando este concepto a la secuenciación de efectos podemos tener la representación de todo el protocolo en un sólo tipo. Como dijimos anteriormente, cuando se realiza la composición secuencial de operaciones para crear el protocolo, cada flatMap que se ejecuta añade un nuevo efecto al tipo dando como resultado una estructura parecida al tipo HList de Shapeless. De esta forma creamos una representación de un Session Type como un tipo de Scala para verificar en tiempo de compilación si la implementación de un protocolo cumple la especificación requerida o no.

Nuestro protocolo quedaría de la siguiente forma:

object QueryFeatureProtocol extends E.Protocol {
  type Session =
  E.Send[AuthenticationRequest] ::
  E.Read[Response] ::
  E.Choice[(E.Halt :: _0) :+: _0 :+: CNil] ::
  E.Send[AuthorizationRequest] ::
  E.Read[Response] ::
  E.Choice[(E.Halt :: _0) :+: _0 :+: CNil] ::
  E.Send[QueryFeature] ::
  E.Read[Response] ::
  E.Choice[(E.Halt :: _0) :+: _0 :+: CNil] ::
  _0
}

El efecto Choice es un coproducto de Shapeless que indica que la operación puede fallar y parar, o bien producir un efecto vacío, representado por _0 que significa que el proceso continúa.

La implementación actual necesita por el momento checkear el protocolo contra el tipo de manera explícita:

E.vetExternalProtocol(QueryFeatureProtocol, main(???, ???))

Si una nueva implementación del protocolo no cumple con con tipo, el compilador alertará del error y mostrará el tipo esperado.

Conclusiones

Akka se creó para crear aplicaciones pensadas para cubrir escenarios de alta concurrencia y de procesamiento distribuido siguiendo el modelo de Actor. En estos casos los desarrolladores se embarcan en proyectos de gran complejidad con multitud de dificultades a afrontar: elevado número de actores, muchos tipos de mensajes, reutilización de código, evitar efectos colaterales, etc… que requieren a Akka evolucionar su api para poder ofrecer mejoras a la hora enfrentarse a ellas. Akka typed puede representar un paso hacia delante en este sentido, y probablemente gane más protagonismo en los próximos meses o años conviviendo con la versión clásica. El Process DSL todavía está en una fase muy experimental y necesita que la comunidad se fije en él y evalúe la necesidad de estas representaciones de alto nivel. En cualquier caso, construir una implementación de Session Types lista para poner en producción sobre Akka presenta un reto a tener en cuenta.

Otras historias interesantes