An Akka DDD from scratch

最近的多人游戏开发上,对Akka 的使用非常感兴趣。在我的工作上,我和 @DsFishlabs 使用了 Spring Boot 以及 Spring Cloud 作为我们的内部服务。

在本文,我想尝试基于 Akka 来构建相似的系统,功能方面跟原来差不多,但这里我想尝试一个不同的方式。其中包含有 Scala,Akka Cluster ShardingAkka Persistence aka.Event Sourcing 以及 DDD

所有代码会在Github上,可能需要看看 extractShardId 方面的资料。

通用语言 & 第一个模型

这里,我将同时扮演开发者(Developer)和领域专家(Domain Expert)的角色,以减少问题的复杂性。我将围绕我自身在公司和游戏的层面出发。每个项目应该开始一种通用语言(an ubiquitous language),以及要设计实现解决方案的需求,让我们定义一些术语。

领域专家说:
" Each Player has some amount of XP as well as Credits "

它引入了3个术语:

  • Player 游戏玩家,它有唯一标识
  • XP 每个游戏玩家通过经验值(the amount of XP)成长
  • Credits 该游戏中用于交易的通用货币

first-model

真实的需求,引导我们建立模型。这里Player 作为一个 Aggregate Root 并附上一个类型Id(PlayerId)。

它包含上述的所有需求(当然只有一个…)。我们的模型也包括了通用语言中的所有术语(terms)。

对应Scala类实现如下:
1
2
3
4
5
6
7
class Player {
var id: PlayerId = PlayerId(UUID.randomUUID.toString)
var xp: Int = 0
var credits: Int = 0
}

case class PlayerId(value: String)

完美!所有需求实现了…

但只有一个类起不到什么作用,不能向玩家发送信息,也没有初始化的随机UUID控制,因此让我们补上。

领域专家说:
“A new Player will be created when a Person registers on the Backend.
It will the be initialized with a given amount of XP and Creadits (in this case 0) When a Player requests his information from the backend then it should return the Id and the amount of XP as well as the amount of Credits

是的,玩家可以自己创建角色,并初始化一定的经验值和币值(开始时都是0)。并且能获取到创建角色的有关信息。

下面我们来满足这一需求:

第一个Actor

首先我们要将Player类转换为一个事件溯源(event sourced)Actor,下面为代码:

PlayerActor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class PlayerActor extends PersistentActor {

var id: PlayerId = PlayerId(self.path.name)
var xp: Int = _
var credits: Int = _

// self.path.name is the entity identifier (utf-8 URL-encoded)
override def persistenceId: String = "Player-" + self.path.name

val log = Logging(context.system, this)

override def receiveCommand = {
case init: InitializePlayer =>
persist(PlayerInitialized(init.playerId, init.xp, init.credits)) { ev =>
initialize(ev)
sender() ! ev
}
case GetPlayerInformation => sender() ! PlayerInformation(id, xp, credits)
case _ => log.info("received unknown message")
}

override def receiveRecover = {
case init: PlayerInitialized => initialize(init)
case _ => log.info("received unknown message")
}

def initialize(init: PlayerInitialized) = {
this.xp = init.xp
this.credits = init.credits
}
}

之后你将看到我们可以从actor 路径获得 Player的Id,因此一个Player总是有一个Id。
要实现需求,Player应该初始化事件响应。
首先我们会持久化产生的事件,之后调用initialized方法初始化Player的状态。
初始化后,我们给sender回应该事件
这里实现了Player可以获取它的状态。因为没有命令(command)产生的事件,仅返回数据。
当调用这个方法表示该actor的事件进入“重玩”(replayed)。因此直接调用初始化方法,这个不需要持久化。
该方法根据事件来设置XPCredits.

由于要使用Event Sourcing作为持久化机制,这里扩展PersistentActor

伴生对象如下所示,这里我们定义了事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
object PlayerActor {
// define compatible commands
case class InitializePlayer(playerId: PlayerId, xp: Int, credits: Int)
case class GetPlayerInformation(playerId: PlayerId)

// define compatible events
case class PlayerInitialized(playerId: PlayerId, xp: Int, credits: Int)

// custom responses
case class PlayerInformation(playerId: PlayerId, xp: Int, credits: Int)

def extractEntityId(): ShardRegion.ExtractEntityId = {
case msg@InitializePlayer(id, _, _) => (id.value.toString, msg)
case msg@GetPlayerInformation(id) => (id.value.toString, msg)
}

def extractShardId(numberOfShards: Int): ShardRegion.ExtractShardId = {
case InitializePlayer(id, _, _) => Math.abs(id.hashCode() % numberOfShards).toString
case GetPlayerInformation(id) => Math.abs(id.hashCode() % numberOfShards).toString
}
}

要使用Akka Cluster Sharding 我们需要定义一个extractEntityId方法,这样akka知道哪个actor应该接受当前的消息。这里返回Player的Id,同时也是actor的路径名称。
另外也需要定义一个extractShardId方法,这样akka知道哪个分片负责该Actor(这里定义100个分片),这样将actor分配到每个节点上。

主函数

我们还需要注册一个endpoint将它们连接起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
object Application extends App with AkkaInjectable {

implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
implicit val timeout = Timeout(5 seconds)

val config = ConfigFactory.load()

val logger: LoggingAdapter = Logging(system, getClass)

implicit val appModule = new PlayerModule
val player = inject[ActorRef]('player)

val routes = {
logRequestResult("server") {
(post & path("register")) {
complete {
// create a new user and send it a message
val playerId = PlayerId(UUID.randomUUID().toString)
(player ? InitializePlayer(playerId, 0, 0)).mapTo[PlayerInitialized].map { ev: PlayerInitialized => ev.playerId.value }
}
}
}
}

Http().bindAndHandle(routes, config.getString("http.interface"), config.getInt("http.port"))
}

载入PlayerModule
注入分片actor的引用,这样我们可以发送信息。
创建一个唯一标识的Id。
发送请求。
地址和端口。

这里,我使用了Scaldi作为依赖注入方式,因此我们需要声明集群分片:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class PlayerModule(implicit system: ActorSystem) extends Module {

val numberOfShards = 100

val playerRegion: ActorRef = ClusterSharding(system).start(
typeName = "Player",
entityProps = Props[PlayerActor],
settings = ClusterShardingSettings(system),
extractEntityId = PlayerActor.extractEntityId(),
extractShardId = PlayerActor.extractShardId(numberOfShards)
)

bind[ActorRef] as 'player to playerRegion

}

extractEntityId方法的引用
extractShardId方法的引用
player绑定为分片actor的引用,用作依赖注入实现

这里可以参考完整代码。

测试

启动服务或执行:

1
sbt run

我这里使用了Httpie,测试更加方便。

你可以POST/register端点:

1
2
3
4
5
6
7
8
9
> http POST :9000/register

HTTP/1.1 200 OK
Content-Length: 36
Content-Type: text/plain; charset=UTF-8
Date: Sun, 10 Jan 2016 18:44:59 GMT
Server: akka-http/2.4.1

62b058cf-6d73-4d5c-9855-2aed6e36ad3d

下面作一个短期的基准(benchmark)测试(10秒,10并发请求):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
> ab -t10 -c10 -mPOST http://localhost:9000/register

This is ApacheBench, Version 2.3 <$Revision: 1663405 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 5000 requests
Completed 10000 requests
Finished 10377 requests


Server Software: akka-http/2.4.1
Server Hostname: localhost
Server Port: 9000

Document Path: /register
Document Length: 36 bytes

Concurrency Level: 10
Time taken for tests: 10.000 seconds
Complete requests: 10377
Failed requests: 0
Total transferred: 2044269 bytes
HTML transferred: 373572 bytes
Requests per second: 1037.70 [#/sec] (mean)
Time per request: 9.637 [ms] (mean)
Time per request: 0.964 [ms] (mean, across all concurrent requests)
Transfer rate: 199.64 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 0 0.1 0 3
Processing: 2 9 15.0 8 474
Waiting: 2 9 15.0 8 474
Total: 3 10 15.0 9 474

Percentage of the requests served within a certain time (ms)
50% 9
66% 9
75% 10
80% 10
90% 11
95% 12
98% 14
99% 17
100% 474 (longest request)

10秒内得到10000新用户,令人印象深刻(我们的spring 服务甚至接近的水平都未到…)