Akka Distributed Data Deep Dive

一致性难题

CAP 定理阐述了在构建分布式系统时候,Consistency(一致性)Availability(可用性)Partition tolerance(分区容错性),这三者只能取二。

计算给我们再多的资源(常常以为可以用空间来换取时间,比如加内存,加物理机,甚至加钱),受制于目前计算机体系(多核CPU),理论上不可能三者都满足。

CAP 理论通常都会表述如下:在一致性、可用性和分区容错性这三个特性中,一个分布式系统只能够选择满足其中两个。分区容错性是我们必须要满足的,它表示如果数据被冗余备份到三个节点,那么如果其中一个节点暂时变得不可用,而另两个节点仍然能够正常运行,那么就认为系统具备分区容错性。

假设我们希望系统具备分区容错性,只在可用性和一致性之间进行妥协。读者可能会问为什么。在接收到一个有超时限制的请求时,如果节点不可用,我们其实就需要在两种方案之间进行选择:要么返回错误(选择一致性),要么继续,即使服务器之间可能会不一致(选择可用性)。等待的时间过长会导致该请求被抛弃,所以时间是一个重要的因素,系统必须要在上面两者中做出决定。

为了达到高可用,就必须满足最终一致性;为了达到最终一致性,就会牺牲可用性,数据插入后并不得到一个真实的返回。这里就出现了一致性问题,即要求强一致性时,系统非常非常慢,因为事件A的操作完成后,才能执行后续操作;若是要求最终一致性,多个节点如果没有同步副本,执行查询并不能得到希望的结果,因为我们并不知道副本什么时候同步完成。

现在认为,只要是最终一致性的就可以接受的范围。并且你会发现,所有力求最终一致性的分布式系统,都应该使用CRDT。

分布式事务常见的有几种解决方案:

  • 两阶段提交(2PC)
  • 补偿事务(TCC)
  • 本地消息表(异步确保)
  • MQ 事务消息
  • Sagas 事务模型

具体依赖于系统的实现逻辑方案来决定。这里撇开分布式事务不谈。

Conflict Free Replicated Data Types - CRDT(无冲突复制数据类型)

CRDT,顾名思义就是一类数据结构。指的是副本可以被同步到各个节点上,副本(replicas)与副本之间更新不需要依赖调停器(coordination)。有两种方式实现CRDT,Operation-based CRDTsState-based CRDTs,两种方式都能实现强最终一致性(strong eventual consistencey)。

根据Shapiro本人的论文2,两种实现都是等价的,都可以相互模拟。

State-based replication: 当一个副本(replica)收到来自客户端的一个Update时,首先更新自己的状态,之后将自身的所有状态发送给另一个副本。间接性地,每个副本都将自己的full state发送给系统内的其它副本。当一个副本收到来自其它副本的状态,提供一个Merge函数,将自身本地的状态和接收到的状态合并。如下所示,如果集合的值的状态可以表示为一个半格(semi-lattice )(半格 —— 一个偏序集,带有连续/最小上界)并且更新是递增的(譬如,状态是一个整数,更新是一个自增操作),并且如果Merge函数计算最小上界,则可以确保副本已经合并了相同的值(接近最小上界)。要让系统尽可能成为一个半格Merge操作必须满足等幂(idempotent)、结合律(associative)、交换律(commutative)。一个副本对象满足这些特征的CRDT,称为CvRDT(convergent replicated data type)

State-based approach. “s” denotes the source replica where the initial update is applied

Operation-based replication: 该方式并没有将一个副本的全部状态发送给另一个副本,而是通过广播的形式将Update操作发送给系统的其他副本,并由它们重演更新操作(类似于状态机复制 )。因为是广播操作,如果有两个更新,u_1u_2,作用于副本 i 上,然后副本 i 将这两个更新发送给另外两个副本 r_1r_2 ,这时更新到达副本的顺序是不同的,r_1_可能先收到_u_1u_2r_2 可能是先收到 u_2 然后 u_1 。如何进行合并?不管顺序如何,这些更新是满足交换律的,副本最后得到的状态是相同的。类似于这种广播的形式发送Update到所有副本,这一类对象称之为CmRDT(commutative replicated data type)

Operation-based approach. “s” denotes source replicas and “d” denotes the downstream replicas

CRDT解决了分布式系统的一个有趣而又基本的问题,但是有一些限制。CRDT不实现一致性,只针对部分问题空间的Update操作的交换,而不是所有。因此并不是所有问题都可以转化为CRDT。

关于CRDT的研究, Shapiro论文提出了几点理论3

  • (Convergent Replicated Data Type (CvRDT)). Assuming eventual delivery and termination, any state-based object that satisfies the monotonic semilattice property is SEC.
  • (Commutative Replicated Data Type (CmRDT)). Assuming causal delivery of updates and method termination, any op-based object that satisfies the commutativity property for all concurrent updates, and whose delivery precondition is satisfied by causal delivery, is SEC.
  • (CvRDT emulation). Any SEC op-based object can be emulated by a SEC state-based object of a corresponding interface.

Akka集群分片

作为抛砖引玉,这里先介绍Akka Sharding的实现方式,再来介绍Akka Distributed Data。

Akka Sharding也是一种分布式集群的实现4,集群分片主要应用于需要有大量带有状态的Actor处理大量数据的情形。因为要处理Actor的状态,所以很多例子都要求分片上的Actor都是继承了PersistenceActor。为了避免混淆,我们这里不引入PersistenceActor,单独阐述Cluster Sharding的机制。

由于集群分片需要实现位置透明,即ShardRegion.ExtractEntityId需要通过EntityId知会分片Actor的具体位置,从而向对应的Actor发送消息;当然你也可以不需要知会Actor的位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
object CounterClusterConfig {

final case class EntityEnvelope(entityId: String, payload: Any)
final case class Get(entityId: String)
val numberOfShards = 100 // 最大分片数

val extractEntityId: ShardRegion.ExtractEntityId = {
case EntityEnvelope(entityId, payload) => (entityId, payload)
case msg@Get(entityId) => (entityId, msg)
}

val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(entityId, _) => (math.abs(entityId.hashCode) % numberOfShards).toString
case Get(entityId) => (math.abs(entityId.hashCode) % numberOfShards).toString
case ShardRegion.StartEntity(entityId) =>
// StartEntity is used by remembering entities feature
(math.abs(entityId.hashCode) % numberOfShards).toString
}
}

object Counter {
case object Increment
case object Decrement

case object Stop
final case class CounterChanged(delta: Int)
}

class Counter extends Actor with ActorLogging {

import com.lightbend.akka.np_sharding.Counter._
import com.lightbend.akka.np_sharding.CounterClusterConfig.Get

import scala.concurrent.duration._

context.setReceiveTimeout(120.seconds)

val entityType: String = getClass.getSimpleName
val entityId : String = self.path.name

override def preStart(): Unit = {
log.debug("==>1. I was created...")
}

override def postStop(): Unit = {
log.debug("==>4. I was stop...")
}

override def receive: Receive = {
case Get(id) =>
log.debug(s"==>2. $id established...")
case it@Increment =>
log.debug("==>3. I am increase...")
context stop self
}
}

ShardRegion每次收到来自客户端的消息时,如果分片中没有对应ID的Actor,则会先创建该Actor,然后发送消息;若存在该ID的Actor,则消息直接被Actor处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val counterRegion: ActorRef = ClusterSharding(system).start(
typeName = "Counter",
entityProps = Props[Counter],
settings = ClusterShardingSettings(system),
extractEntityId = CounterClusterConfig.extractEntityId,
extractShardId = CounterClusterConfig.extractShardId)

// scheduler sender message
system.scheduler.schedule(0.second,
5.seconds,
new Runnable {
override def run(): Unit = {
val id = java.util.UUID.randomUUID.toString
counterRegion ! Get(id)
counterRegion ! EntityEnvelope(id, Increment)
}
}
)

sys.addShutdownHook {
Await.result(system.whenTerminated, Duration.Inf)
}

分片Region类似于线程池,既然可以重复利用,为什么不创建一个监督机制。这样每个Actor都是位置透明的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class CounterSupervisor extends Actor {
val counter = context.actorOf(Props[Counter], "theCounter")

override val supervisorStrategy = OneForOneStrategy() {
case _: IllegalArgumentExceptionSupervisorStrategy.Resume
case _: ActorInitializationExceptionSupervisorStrategy.Stop
case _: DeathPactExceptionSupervisorStrategy.Stop
case _: ExceptionSupervisorStrategy.Restart
}

def receive = {
case msg ⇒ counter forward msg
}
}

默认地,Akka Sharding使用的是Distributed Data Mode

1
akka.cluster.sharding.state-store-mode = ddata

你也可以使用Persistence Mode,

1
akka.cluster.sharding.state-store-mode = persistence

这个描述的Distributed Data Mode跟接下来的 Akka Distribted Data并不是一回事。关于更多Akka Sharding 的技术细节可以参考官网。

Akka Distributed Data

Akka Distributed Data 是一个用于节点间共享数据的模组。它被设计成Key-Value存储,Value实现了CRDT(Conflict Free Replicated Data Types)。它允许节点的数据可以从其它任意节点更新,二不需要经过Coordinator处理,CRDT的Value总是合并(converge)。

前面已经描述了CRDT的原理和结构,那么理解Akka Distribted Data就可以得心应手了。区别于Akka Sharding,distributed-data中的Replicator是API提供的,所以直接拿来用就可以了。

副本(Replicator),声明如下:

1
2
3
4
5
import akka.cluster.ddata._

implicit val cluster = Cluster(context.system)

val replicator: ActorRef = DistributedData(context.system).replicator

Akka Distributed Data实际上相当于Redis的Key-Value存储,并且一致性都是可调的。由于Key的特殊性,它是Value类型的编码形式。

1
val Key = ORMapKey.create[String, StoredOrder]("orders")

这里的Key变量以大写开头,允许用match表达式匹配。这是Scala语言规范,

1
2
3
4
5
8.1.1 Variable Patterns

[...]

A variable pattern x is a simple identifier which starts with a *lower case* letter. It matches any value, and binds the variable name to that value. [...]

Replicator实际上是一个简单的Actor,我们通过消息协议进行副本的通讯。例如要处理更新操作,让Replicator发送Replicator.Update消息,

Replicator
1
2
3
4
5
6
7
8
9
10
def storeOrderValidation(id: OrderIdentifier, storedOrder: StoredOrder, request: StoreOrderValidation) = { 
replicator ! Replicator.Update(
key = Key,
initial = ORMap.empty[String, StoredOrder],
writeConsistency = Replicator.WriteMajority(5.seconds),
request = Some(request)
) { orders =>
orders + (id.i.toString -> storedOrder)
}
}

副本需要知会Key是什么
分布式数据初始化值,这里定义了Key对应的Value是空的。Data Type为ORMap[String, StoredOrder]
写一致性,它可以是WriteLocalWriteTo(number of nodes)WriteMajority以及WriteAll。这里我们使用WriteMajority。表示立即写入N/2 + 1个节点(N是集群中节点的个数)
可选的。该对象附带更新请求一并传递。
这里是一个函数,作用于要修改的值。例如这里的数据是添加一个新的StoredOrder到map中。

1
2
3
4
5
object Update {
def apply[A <: ReplicatedData](
key: Key[A], initial: A, writeConsistency: WriteConsistency,
request: Option[Any] = None)(modify: A ? A): Update[A]
}

整个过程就是,Update消息携带的Key,通过modify函数,写入ValueORMap中。那么接下来会出现3种情况:

  • 更新成功
  • 更新全部失败
  • 介于两者之间,部分更新成功

如果所有都按计划执行,将返回一个UpdateSuccess消息,表示更新成功。

1
2
case Replicator.UpdateSuccess(Key, Some(request: StoreOrderValidation)) =>
request.replyTo ! OrderValidationStored(request.id, request.order)

或者更新失败,返回得到一个ModifyFailure

最后一种情况,“我们不确定是否成功,但它确实运行了”,这时包含有UpdateTimeoutStoreFailure

UpdateTimeout的出现,取决于事先定义的一致性等级。例如这里定义的是Replicator.WriteMajority(5.seconds),表示主节点在5秒的时间内没有应答。这种情况出现,可能是其它几个节点处理消息非常慢、或者网络延迟等等因素。如何处理这种情况取决于真实的案例。

StoreFailure,表示本地持久化存储出现了问题。这种问题出现之前,需先额外定义是否进行本地的持久化。

一个非常实用的操作是,我们可以监听分布式数据的改变。

1
replicator ! Replicator.Subscribe(OrderStorage.Key, self)

当发生改变时,会得到一个Replicator.Changed消息:

1
2
3
case change @ Replicator.Changed(OrderStorage.Key) =>
val allOrders: Map[String, StoredOrder] = change.get(OrderStorage.Key).entries
// do something with the orders

对于Update,一致性等级大概有:

  • WriteLocal the value will immediately only be written to the local replica, and later disseminated with gossip
  • WriteTo(n) the value will immediately be written to at least n replicas, including the local replica
  • WriteMajority the value will immediately be written to a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
  • WriteAll the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group)

对于Get,一致性等级有:

  • ReadLocal the value will only be read from the local replica
  • ReadFrom(n) the value will be read and merged from n replicas, including the local replica
  • ReadMajority the value will be read and merged from a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
  • ReadAll the value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group)

前面说过,Value的实现是一个CvRDT,它的定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Interface for implementing a state based convergent
* replicated data type (CvRDT).
* [...]
**/
trait ReplicatedData {
/**
* The type of the concrete implementation, e.g. `GSet[A]`.
* To be specified by subclass.
*/
type T <: ReplicatedData

/**
* Monotonic merge function.
*/
def merge(that: T): T

}

该数据结构要求有merge函数,用于合并其它副本传递过来的数据。

1
2
3
object ORMapKey {
def create[A, B <: ReplicatedData](id: String): Key[ORMap[A, B]] = ORMapKey(id)
}

该CvRDT被定义为Data Type函数的上界,以实现存储功能。

你会发现,merge函数返回一个T ,不是Option[T]也不是Try[T]——它证明了**merge总是正常的**。你会发现,merge函数是一个单调函数,总是向一个方向增长。

单调变换

Akka api提供了一些基础数据类型,用于实现我们的分布式数据的存储。

对于上面的例子,可以直接操作数据,因为它本身就是个Key-Value,下面使用分布式方案实现基于内存的存储实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.LWWMap
import akka.cluster.ddata.LWWMapKey

object ReplicatedCache {

def props: Props = Props[ReplicatedCache]

private final case class Request(key: String, replyTo: ActorRef)

final case class PutInCache(key: String, value: Any)
final case class GetFromCache(key: String)
final case class Cached(key: String, value: Option[Any])
final case class Evict(key: String)
}

class ReplicatedCache extends Actor {
import akka.cluster.ddata.Replicator._
import ReplicatedCache._

val replicator = DistributedData(context.system).replicator
implicit val cluster = Cluster(context.system)

def dataKey(entryKey: String): LWWMapKey[String, Any] =
LWWMapKey("cache-" + math.abs(entryKey.hashCode) % 100)

def receive = {
case PutInCache(key, value) =>
replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ + (key -> value))
case Evict(key) =>
replicator ! Update(dataKey(key), LWWMap(), WriteLocal)(_ - key)
case GetFromCache(key) =>
replicator ! Get(dataKey(key), ReadLocal, Some(Request(key, sender())))
case g @ GetSuccess(LWWMapKey(_), Some(Request(key, replyTo))) =>
g.dataValue match {
case data: LWWMap[_, _] => data.asInstanceOf[LWWMap[String, Any]].get(key) match {
case Some(value) => replyTo ! Cached(key, Some(value))
case None => replyTo ! Cached(key, None)
}
}
case NotFound(_, Some(Request(key, replyTo))) =>
replyTo ! Cached(key, None)
case _: UpdateResponse[_] => // ok
}

}

TDD案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import scala.concurrent.duration._
import akka.cluster.Cluster
import akka.cluster.ddata.DistributedData
import akka.cluster.ddata.Replicator.GetReplicaCount
import akka.cluster.ddata.Replicator.ReplicaCount
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory

object ReplicatedCacheSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")

commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.log-dead-letters-during-shutdown = off
"""))

}

class ReplicatedCacheSpecMultiJvmNode1 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode2 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode3 extends ReplicatedCacheSpec

class ReplicatedCacheSpec extends MultiNodeSpec(ReplicatedCacheSpec) with STMultiNodeSpec with ImplicitSender {
import ReplicatedCacheSpec._
import ReplicatedCache._

override def initialParticipants = roles.size

val cluster = Cluster(system)
val replicatedCache = system.actorOf(ReplicatedCache.props)

def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
}
enterBarrier(from.name + "-joined")
}

"Demo of a replicated cache" must {
"join cluster" in within(20.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)

awaitAssert {
DistributedData(system).replicator ! GetReplicaCount
expectMsg(ReplicaCount(roles.size))
}
enterBarrier("after-1")
}

"replicate cached entry" in within(10.seconds) {
runOn(node1) {
replicatedCache ! PutInCache("key1", "A")
}

awaitAssert {
val probe = TestProbe()
replicatedCache.tell(GetFromCache("key1"), probe.ref)
probe.expectMsg(Cached("key1", Some("A")))
}

enterBarrier("after-2")
}

"replicate many cached entries" in within(10.seconds) {
runOn(node1) {
for (i ← 100 to 200)
replicatedCache ! PutInCache("key" + i, i)
}

awaitAssert {
val probe = TestProbe()
for (i ← 100 to 200) {
replicatedCache.tell(GetFromCache("key" + i), probe.ref)
probe.expectMsg(Cached("key" + i, Some(i)))
}
}

enterBarrier("after-3")
}

"replicate evicted entry" in within(15.seconds) {
runOn(node1) {
replicatedCache ! PutInCache("key2", "B")
}

awaitAssert {
val probe = TestProbe()
replicatedCache.tell(GetFromCache("key2"), probe.ref)
probe.expectMsg(Cached("key2", Some("B")))
}
enterBarrier("key2-replicated")

runOn(node3) {
replicatedCache ! Evict("key2")
}

awaitAssert {
val probe = TestProbe()
replicatedCache.tell(GetFromCache("key2"), probe.ref)
probe.expectMsg(Cached("key2", None))
}

enterBarrier("after-4")
}

"replicate updated cached entry" in within(10.seconds) {
runOn(node2) {
replicatedCache ! PutInCache("key1", "A2")
replicatedCache ! PutInCache("key1", "A3")
}

awaitAssert {
val probe = TestProbe()
replicatedCache.tell(GetFromCache("key1"), probe.ref)
probe.expectMsg(Cached("key1", Some("A3")))
}

enterBarrier("after-5")
}

}

}

性能优化

如果没有做性能优化,将会是萦绕我们夜晚的致命罪孽。在生产环境,我们需要做:

  • 自定义的Data Type需要实现序列化
  • 实现delta-CRDTs避免更新发送full state
  • 删除完成的记录

序列化是性能优化的关键,Java自身的序列方案并不是最优的。建议使用kryo或谷歌的protobuf

delta-CRDT主要用于减少full state Update的发送,它表示以一定的顺序传播更新。

因为是冲突自由(Conflict Free)的,在使用诸如ORMap时,如果并发地添加和删除一个条目,添加会获胜。你不能删除一个不存在的条目。这会引入一个问题,例如某个节点删除订单的同时,另外一个节点添加订单,添加订单被执行。原来的订单没有删除,所以需要额外修剪未移除的已经“完成”的条目。

总结,在副本同步操作时,有些方面并不完全正确的:

  • 对于OrderHandler是没有副本机制的——如果某个节点故障,所有当前处理的Update消息将停留在地域边缘,客户端也不会有任何响应。
  • 不要将东西持久化存储!现实中这是相当疯狂的事情!(设想副本在不断地写入、不断地更新同步)