Akka Typed 协议和行为

2019年11月6号LightBend公司发布了AKKA 2.6版本,带来了类型安全的actor,新的Akka Cluster底层通信设施——Artery,带来了更好的稳定性,使用Jackson进行消息序列化,支持SLF4J日志接口。

Why Akka Typed

actor编程模型是一个强有力的抽象模型,尤其擅长解决真实世界建模,容错、并发、分布式系统问题。actor抽象编程模型构建于在互相独立的actor之间发送消息的基础之上,actor可以创建子actor,并负责监管,当子actor出现错误的时候可以重启或者重新创建,这套容错机制给整个actor系统带来了自愈能力。

经典的Akka actor API非常简单,就是提供一组处理和接收消息的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// the Actor trait (AbstractActor class in Java) is the entry point for using the API
class OrderProcessor extends Actor {

// receive方法处理消息
def receive: Receive = {
case order @ OrderProcessor.ProcessOrder =>
// actor方法繁衍新的actor
val connection = context.actorOf(
BankConnection.props(order.bankIndentifier)
)
// !方法表示即发即弃
connection ! BankConnection.ExecuteOrder(order)
}
}

这种编程模型和API在多线程环境中具有显著的优势,每个actor顺序处理接收到的消息,actor的内部状态也只有它本身可以修改,这比并发的修改共享状态容易多了。

天下没有免费的午餐,actor编程模型也有它的缺点,槽点在这篇文章中有提到:Akka anti-patterns series

这些年来我在一些稍微大一些的Akka工程中见到的最大的问题是actor系统随着业务越做越大,并且非常难以扩展。根本原因是这套Akka API没有强制用户采用“协议优先”的规范。实际上Akka官方教程里最先讲述的就是清晰的定义组件之间的通信协议(也就是消息),并使用全路径访问消息。已上面的例子来说,OrderProcessor的通信协议定义如下:

1
2
3
4
5
6
// 伴生对象存放了消息的定义
// 对于集群或持久化系统,消息定义需要使用合适的序列化机制,譬如protobuf
object OrderProcessor {
sealed trait Command
final case class ProcessOrder(bank: BankId, fromAccount: AccountId, toAccount: AccountId, amount: Amount) extends Command
}

即便你遵照Akka最佳实践,但还是无法保证给actor发送一些它不支持消息,actor的receive方法会接受任意类型的消息,当它收到不支持的消息时,便自动转给unhandled方法,此方法默认只会打日志记录一下(需要正确的配置日志打印机制),这对新人来说太坑了,你找不到任何错误,但是系统就是无法正常工作。

更深层次的原因在于缺少一种机制来帮助我们维护actor之间的通信协议。随着消息类型增多,很容易忘记这些actor都支持什么类型的消息。通过单元测试和严格的日志级别会有助于缓解这种问题(只要接受到不支持的消息就打warn日志),但是仍然无法完全避免。

Akka Typed就是为了解决这个问题,新的API是为“协议优先”设计的,在实现功能之前,你必须花一点时间想一想每一个actor要处理哪些消息。经典的Actor API的最佳实践也是如此,但却是可选的,你需要在实现的过程中使要处理消息条理清晰。

看过许多真实的Akka System分享之后,有一点必须强调一下:开发Akka Typed的目的不仅仅是为了以结构化的方式组织消息以及防止丢失那一点点actor不支持的消息,它的主要目的是引导我们优先考虑系统设计。设计一组恰到好处的actor,适当的通信粒度,正确的消息模式,这样就可以构建一个强大的系统,但是它的核心却非常简单,就像高考一样简单。但是我见到太多过度设计,大家倾向于设计过多的actor以及消息,引入了不必要的复杂度,最后尾大不掉。

Let’s build a payment processor

前面我们已经讲过使用Akka Typed可以非常容易的定义协议,但什么是“协议”呢?协议仅仅是“消息”吗?简单来说协议就是:定义一组消息,在两个及以上的组件之间按特定的顺序和组合传递。常见的协议有TCP、HTTPS等,而我们定义的是应用层的协议。你可以认为协议就是增强版的API:API只定义了个体之间的调用格式(参数、请求内容、响应内容等),协议描述了怎么通过组件之间的相互调用使系统到达期望的状态。

在Akka Typed API中,协议由一组消息class和对应类型的actor组成。下面的例子展示了从configuration组件获取配置数据的协议:

1
2
3
4
5
6
7
8
9
10
11
sealed trait ConfigurationMessage
final case class RetrieveConfiguration(merchantId: MerchantId, replyTo: ActorRef[ConfigurationResponse]) extends ConfigurationMessage

sealed trait ConfigurationResponse
final case class ConfigurationFound(merchantId: MerchantId, merchantConfiguration: MerchantConfiguration) extends ConfigurationResponse
final case class ConfigurationNotFound(merchanId: MerchantId) extends ConfigurationResponse

case class MerchantId(id: String) extends AnyVal
case class BankIdentifier(id: String) extends AnyVal

case class MerchantConfiguration(bankIdentifier: BankIdentifier)

这个例子遵循了请求-响应的消息设计模式,欲知更多详情,请参见本书:Reactive Design Patterns

如果你以前用过经典的Actor API,你会发现这里的实现方式有两个不同的地方,第一个是消息发送者的引用包含在消息的定义中,经典的Actor API是通过Akka提供的sender()方法来获取发送者的。第二个是消息class中包含的ActorRef是有类型的,发送者使用它的时候就可以清楚的知道应该发送什么类型的消息。我们使用接口ConfigurationResponse定义了配置数据的返回格式,它有两个实现类,这样发送者就可以发送两种格式的消息。

看了Actor的定义之后,就能理解为什么Akka Typed比经典的Actor更容易且更安全的解决协议问题,Configuration的定义如下:

1
2
3
class Configuration(context: ActorContext[ConfigurationMessage]) extends AbstractBehavior[ConfigurationMessage] {
// ...
}

我们定义的actor继承AbstractBehavior,并带有指定的类型,它只能处理ConfigurationMessage类型的消息,编译器可以帮助我们检查消息的发送者发送的消息是否正确。

上面的例子中我们使用面向对象的编程方式定义了Actor,稍后我们会展示函数式编程风格。

Implementing our first typed actor

Configuration提供查询功能:根据商户Id查询支付方式。我们继续使用面向对象的编程方式,如果使用过经典的Akka API,你对这种使用方式应该非常熟悉。

继承AbstractBehavior就必须实现onMessage方法,它返回一个Behavior

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// the AbstractBehavior trait is the entry point for using the object-oriented style API
class Configuration(context: ActorContext[ConfigurationMessage]) extends AbstractBehavior[ConfigurationMessage] {

// the mutable state here holds the configuration values of each merchant we know about
var configurations: Map[MerchantId, MerchantConfiguration] = Map.empty

// the onMessage method defines the initial behavior applied to a message upon reception
override def onMessage(msg: ConfigurationMessage): Behavior[ConfigurationMessage] = msg match {
case RetrieveConfiguration(merchantId, replyTo) =>
configurations.get(merchantId) match {
case Some(configuration) =>
// reply to the sender using the fire-and-forget paradigm
replyTo ! ConfigurationFound(merchantId, configuration)
case None =>
// reply to the sender using the fire-and-forget paradigm
replyTo ! ConfigurationNotFound(merchantId)
}
// lastly, return the Behavior to be applied to the next received message
// in this case, that's just the same Behavior as we already have
this
}
}

这个actor与我们在本文开头使用经典的actor API定义的actor非常相似:覆盖onMessage方法,并根据指定的消息类型做出对应的响应。

不同点在于onMessage对应的方法返回的是一个Behavior,一个actor接收到消息之后的行为包含如下3个步骤:

发送一条或多条消息给其他的actor
创建子acotr
返回一个新的行为,准备接收下一个消息
在Akka Typed API中,一个Behavior即代表了处理当前消息的行为,也表明了如何处理下一个消息——通过返回一个新的Behavior。也可以只是返回当前行为(就像上面的例子一样),因为使用面向对象风格的actor继承自AbstractBehavior,它本身就是一个Behavior,所以可以使用return this。

本系列教程后面会讨论更多关于Behavior的用法,使用Akka Typed API定义的actor的一个优点就是非常容易组合和测试。

Typed Akka TestKit可以帮助你轻而易举的对actor进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class ConfigurationSpec extends ScalaTestWithActorTestKit with WordSpecLike {

"The Configuration actor" should {

"not find a configuration for an unknown merchant" in {
// define a probe which allows it to easily send messages
val probe = createTestProbe[ConfigurationResponse]()

// spawn a new Configuration actor as child of the TestKit's guardian actor
val configurationActor = spawn(Configuration())

// send a message to the actor under test with the probe's reference as sender
configurationActor ! Configuration.RetrieveConfiguration(MerchantId("unknown"), probe.ref)

// expect a certain type of message as response. there are many different ways to retrieve
// or to expect messages
val response = probe.expectMessageType[Configuration.ConfigurationNotFound]
response.merchanId shouldBe MerchantId("unknown")
}

}

}

Supervising and starting the actor

actor System为actor提供运行环境、分配资源、基础设施。在这个系统中,每一个actor都有一个父actor,最顶层的actor叫做根节点(root),使用/代表,它的两个直接子actor是/user/system/user用于在用户空间创建子actor,/system属于akka系统内部管理,所以我们创建的所有的actor都从属于/user

Akka Typed与经典的Actor API有一个非常重要的不同点:/user的处理逻辑。在经典的Akka API中,Akka提供的/useractor负责监管一切;但是Akka Typed把这个权力交给了用户。也就是说应用程序的开发者在实现actor的时候同时也必须多考虑一下actor都会有哪些行为。

在创建Configuration actor的时候,我们大可以直接把它传给ActorSystem并把它作为监管者,但当创建更多actor的时候,这些actor全部都由Configuration actor监管就不合适了。而且在actor模型中父监管机制采用级联的方式处理actor失败的问题:父actor负责决定如何处理子actor(当它抛异常的时候),因此如何对actor分组直接影响了监管策略。同样的我们应该使用一个专用的父actor做为监管actor,由它来决定如何处理子actor的失败问题。Akka Typed API中默认的监管策略是停止失败的子actor(经典的Akka API是重启)。由我们指定监管actor可以开发更灵活的监管策略,根据不同的异常做出相应的决策。综上所述我们决定使用PaymentProcessor actor做为所有actor的监管者,actor层级如下图所示:

![/img/akka/typed/supervison.png]

PaymentProcessor的功能目前非常简单,启动的时候创建一个子actor——Configuration,它是无状态的,也不接收任何消息,这次我们使用函数式编程的风格,无需继承任何接口,只需要返回一个Behavior:

1
2
3
4
5
6
7
8
9
object PaymentProcessor {

def apply() = Behaviors.setup[Nothing] { context =>
context.log.info("Typed Payment Processor started")
context.spawn(Configuration(), "config")
Behaviors.empty
}

}

Behaviors.setup()方法是创建Behavior的入口,该方法包含一个ActorContext变量,我们用它打日志,记录actor已经启动,并使用spawn()方法创建了一个Configuration actor,第一个参数用于创建actor,第二个参数是actor的名字,它在actor路径中是/user/config

因为PaymentProcessor不处理任何消息,所以这里使用了setup[Nothing]

Configuration actor使用静态的create函数创建Behavior

1
2
3
4
5
object Configuration {

def apply(): Behavior[ConfigurationMessage] = Behaviors.setup(context => new Configuration(context))

}

现在万事俱备,只欠东风,需要启动ActorSystem来创建我们的监管actor。Akka提供了静态方法用来创建监管actor:

1
2
3
4
5
6
7
object Main extends App {

override def main(args: Array[String]): Unit = {
ActorSystem[Nothing](PaymentProcessor(), "typed-payment-processor")
}

}

搞定!现在运行Main方法,就可以看到PaymentProcessor启动了:

1
2
[info] Running io.bernhardt.typedpayment.Main
[INFO] [07/10/2019 09:36:42.483] [typed-payment-processor-akka.actor.default-dispatcher-5] [akka://typed-payment-processor/user] Typed Payment Processor started