最近的多人游戏开发上,对Akka 的使用非常感兴趣。在我的工作上,我和 @DsFishlabs 使用了 Spring Boot 以及 Spring Cloud 作为我们的内部服务。
在本文,我想尝试基于 Akka
来构建相似的系统,功能方面跟原来差不多,但这里我想尝试一个不同的方式。其中包含有 Scala,Akka Cluster Sharding ,Akka Persistence aka.Event Sourcing 以及 DDD 。
所有代码会在Github上,可能需要看看 extractShardId 方面的资料。
通用语言 & 第一个模型
这里,我将同时扮演开发者(Developer)和领域专家(Domain Expert)的角色,以减少问题的复杂性。我将围绕我自身在公司和游戏的层面出发。每个项目应该开始一种通用语言(an ubiquitous language),以及要设计实现解决方案的需求,让我们定义一些术语。
领域专家说:
" Each Player
has some amount of XP
as well as Credits
"
它引入了3个术语:
Player 游戏玩家,它有唯一标识
XP 每个游戏玩家通过经验值(the amount of XP
)成长
Credits 该游戏中用于交易的通用货币
真实的需求,引导我们建立模型。这里Player
作为一个 Aggregate Root
并附上一个类型Id(PlayerId
)。
它包含上述的所有需求(当然只有一个…)。我们的模型也包括了通用语言中的所有术语(terms)。
对应Scala类实现如下:
1 2 3 4 5 6 7 class Player { var id: PlayerId = PlayerId (UUID .randomUUID.toString) var xp: Int = 0 var credits: Int = 0 } case class PlayerId (value: String )
完美!所有需求实现了…
但只有一个类起不到什么作用,不能向玩家发送信息,也没有初始化的随机UUID
控制,因此让我们补上。
领域专家说:
“A new Player
will be created when a Person registers on the Backend.
It will the be initialized with a given amount of XP
and Creadits
(in this case 0) When a Player
requests his information from the backend then it should return the Id
and the amount of XP
as well as the amount of Credits
”
是的,玩家可以自己创建角色,并初始化一定的经验值和币值(开始时都是0)。并且能获取到创建角色的有关信息。
下面我们来满足这一需求:
第一个Actor
首先我们要将Player
类转换为一个事件溯源(event sourced)Actor,下面为代码:
PlayerActor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class PlayerActor extends PersistentActor { var id: PlayerId = PlayerId (self.path.name) var xp: Int = _ var credits: Int = _ override def persistenceId : String = "Player-" + self.path.name val log = Logging (context.system, this ) override def receiveCommand = { case init: InitializePlayer => persist(PlayerInitialized (init.playerId, init.xp, init.credits)) { ev => initialize(ev) sender() ! ev } case GetPlayerInformation => sender() ! PlayerInformation (id, xp, credits) case _ => log.info("received unknown message" ) } override def receiveRecover = { case init: PlayerInitialized => initialize(init) case _ => log.info("received unknown message" ) } def initialize (init: PlayerInitialized ) = { this .xp = init.xp this .credits = init.credits } }
之后你将看到我们可以从actor 路径获得 Player
的Id,因此一个Player
总是有一个Id。
要实现需求,Player
应该初始化事件响应。
首先我们会持久化产生的事件,之后调用initialized
方法初始化Player
的状态。
初始化后,我们给sender回应该事件
这里实现了Player
可以获取它的状态。因为没有命令(command)产生的事件,仅返回数据。
当调用这个方法表示该actor的事件进入“重玩”(replayed)。因此直接调用初始化方法,这个不需要持久化。
该方法根据事件来设置XP
和Credits
.
由于要使用Event Sourcing
作为持久化机制,这里扩展PersistentActor
。
伴生对象如下所示,这里我们定义了事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 object PlayerActor { case class InitializePlayer (playerId: PlayerId , xp: Int , credits: Int ) case class GetPlayerInformation (playerId: PlayerId ) case class PlayerInitialized (playerId: PlayerId , xp: Int , credits: Int ) case class PlayerInformation (playerId: PlayerId , xp: Int , credits: Int ) def extractEntityId (): ShardRegion .ExtractEntityId = { case msg@InitializePlayer (id, _, _) => (id.value.toString, msg) case msg@GetPlayerInformation (id) => (id.value.toString, msg) } def extractShardId (numberOfShards: Int ): ShardRegion .ExtractShardId = { case InitializePlayer (id, _, _) => Math .abs(id.hashCode() % numberOfShards).toString case GetPlayerInformation (id) => Math .abs(id.hashCode() % numberOfShards).toString } }
要使用Akka Cluster Sharding 我们需要定义一个extractEntityId
方法,这样akka知道哪个actor应该接受当前的消息。这里返回Player
的Id,同时也是actor的路径名称。
另外也需要定义一个extractShardId
方法,这样akka知道哪个分片负责该Actor(这里定义100个分片),这样将actor分配到每个节点上。
主函数
我们还需要注册一个endpoint将它们连接起来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 object Application extends App with AkkaInjectable { implicit val system = ActorSystem () implicit val executor = system.dispatcher implicit val materializer = ActorMaterializer () implicit val timeout = Timeout (5 seconds) val config = ConfigFactory .load() val logger: LoggingAdapter = Logging (system, getClass) implicit val appModule = new PlayerModule val player = inject[ActorRef ]('player) val routes = { logRequestResult("server" ) { (post & path("register" )) { complete { val playerId = PlayerId (UUID .randomUUID().toString) (player ? InitializePlayer (playerId, 0 , 0 )).mapTo[PlayerInitialized ].map { ev: PlayerInitialized => ev.playerId.value } } } } } Http ().bindAndHandle(routes, config.getString("http.interface" ), config.getInt("http.port" )) }
载入PlayerModule
。
注入分片actor的引用,这样我们可以发送信息。
创建一个唯一标识的Id。
发送请求。
地址和端口。
这里,我使用了Scaldi 作为依赖注入方式,因此我们需要声明集群分片:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class PlayerModule (implicit system: ActorSystem ) extends Module { val numberOfShards = 100 val playerRegion: ActorRef = ClusterSharding (system).start( typeName = "Player" , entityProps = Props [PlayerActor ], settings = ClusterShardingSettings (system), extractEntityId = PlayerActor .extractEntityId(), extractShardId = PlayerActor .extractShardId(numberOfShards) ) bind[ActorRef ] as 'player to playerRegion }
extractEntityId
方法的引用
extractShardId
方法的引用
player
绑定为分片actor的引用,用作依赖注入实现
这里 可以参考完整代码。
测试
启动服务或执行:
我这里使用了Httpie ,测试更加方便。
你可以POST
到/register
端点:
1 2 3 4 5 6 7 8 9 > http POST :9000/register HTTP/1.1 200 OK Content-Length: 36 Content-Type: text/plain; charset=UTF-8 Date: Sun, 10 Jan 2016 18:44:59 GMT Server: akka-http/2.4.1 62b058cf-6d73-4d5c-9855-2aed6e36ad3d
下面作一个短期的基准(benchmark)测试(10秒,10并发请求):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 > ab -t10 -c10 -mPOST http://localhost:9000/register This is ApacheBench, Version 2.3 <$Revision: 1663405 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking localhost (be patient) Completed 5000 requests Completed 10000 requests Finished 10377 requests Server Software: akka-http/2.4.1 Server Hostname: localhost Server Port: 9000 Document Path: /register Document Length: 36 bytes Concurrency Level: 10 Time taken for tests: 10.000 seconds Complete requests: 10377 Failed requests: 0 Total transferred: 2044269 bytes HTML transferred: 373572 bytes Requests per second: 1037.70 [#/sec] (mean) Time per request: 9.637 [ms] (mean) Time per request: 0.964 [ms] (mean, across all concurrent requests) Transfer rate: 199.64 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 0.1 0 3 Processing: 2 9 15.0 8 474 Waiting: 2 9 15.0 8 474 Total: 3 10 15.0 9 474 Percentage of the requests served within a certain time (ms) 50% 9 66% 9 75% 10 80% 10 90% 11 95% 12 98% 14 99% 17 100% 474 (longest request)
10秒内得到10000新用户,令人印象深刻(我们的spring 服务甚至接近的水平都未到…)