一致性难题
CAP 定理阐述了在构建分布式系统时候,Consistency(一致性) , Availability(可用性) ,Partition tolerance(分区容错性) ,这三者只能取二。
计算给我们再多的资源(常常以为可以用空间来换取时间,比如加内存,加物理机,甚至加钱),受制于目前计算机体系(多核CPU),理论上不可能三者都满足。
CAP 理论通常都会表述如下:在一致性、可用性和分区容错性这三个特性中,一个分布式系统只能够选择满足其中两个。分区容错性 是我们必须要满足的,它表示如果数据被冗余备份到三个节点,那么如果其中一个节点暂时变得不可用,而另两个节点仍然能够正常运行,那么就认为系统具备分区容错性。
假设我们希望系统具备分区容错性,只在可用性和一致性之间进行妥协。读者可能会问为什么。在接收到一个有超时限制的请求时,如果节点不可用,我们其实就需要在两种方案之间进行选择:要么返回错误(选择一致性),要么继续,即使服务器之间可能会不一致(选择可用性)。等待的时间过长会导致该请求被抛弃,所以时间是一个重要的因素,系统必须要在上面两者中做出决定。
为了达到高可用,就必须满足最终一致性;为了达到最终一致性,就会牺牲可用性,数据插入后并不得到一个真实的返回。这里就出现了一致性问题,即要求强一致性时,系统非常非常慢,因为事件A的操作完成后,才能执行后续操作;若是要求最终一致性,多个节点如果没有同步副本,执行查询并不能得到希望的结果,因为我们并不知道副本什么时候同步完成。
现在认为,只要是最终一致性的就可以接受的范围。并且你会发现,所有力求最终一致性的分布式系统,都应该使用CRDT。
分布式事务常见的有几种解决方案:
两阶段提交(2PC)
补偿事务(TCC)
本地消息表(异步确保)
MQ 事务消息
Sagas 事务模型
具体依赖于系统的实现逻辑方案来决定。这里撇开分布式事务不谈。
Conflict Free Replicated Data Types - CRDT(无冲突复制数据类型)
CRDT,顾名思义就是一类数据结构。指的是副本可以被同步到各个节点上,副本(replicas)与副本之间更新不需要依赖调停器(coordination)。有两种方式实现CRDT,Operation-based CRDTs
和 State-based CRDTs
,两种方式都能实现强最终一致性(strong eventual consistencey)。
根据Shapiro本人的论文2 ,两种实现都是等价的,都可以相互模拟。
State-based replication : 当一个副本(replica)收到来自客户端的一个Update
时,首先更新自己的状态,之后将自身的所有状态发送给另一个副本。间接性地,每个副本都将自己的full state
发送给系统内的其它副本。当一个副本收到来自其它副本的状态,提供一个Merge
函数,将自身本地的状态和接收到的状态合并。如下所示,如果集合的值的状态可以表示为一个半格(semi-lattice )(半格 —— 一个偏序集,带有连续/最小上界)并且更新是递增的(譬如,状态是一个整数,更新是一个自增操作),并且如果Merge
函数计算最小上界,则可以确保副本已经合并了相同的值(接近最小上界)。要让系统尽可能成为一个半格 ,Merge
操作必须满足等幂(idempotent)、结合律(associative)、交换律(commutative)。一个副本对象满足这些特征的CRDT,称为CvRDT(convergent replicated data type) 。
Operation-based replication : 该方式并没有将一个副本的全部状态发送给另一个副本,而是通过广播的形式将Update
操作发送给系统的其他副本,并由它们重演更新操作(类似于状态机复制 )。因为是广播操作,如果有两个更新,u_1 和 u_2 ,作用于副本 i 上,然后副本 i 将这两个更新发送给另外两个副本 r_1 和 r_2 ,这时更新到达副本的顺序是不同的,r_1_可能先收到_u_1 ,u_2 ,r_2 可能是先收到 u_2 然后 u_1 。如何进行合并?不管顺序如何,这些更新是满足交换律的,副本最后得到的状态是相同的。类似于这种广播的形式发送Update
到所有副本,这一类对象称之为CmRDT(commutative replicated data type) 。
CRDT解决了分布式系统的一个有趣而又基本的问题,但是有一些限制。CRDT不实现一致性,只针对部分问题空间的Update
操作的交换,而不是所有。因此并不是所有问题都可以转化为CRDT。
关于CRDT的研究, Shapiro论文提出了几点理论3 :
(Convergent Replicated Data Type (CvRDT)). Assuming eventual delivery and termination, any state-based object that satisfies the monotonic semilattice property is SEC.
(Commutative Replicated Data Type (CmRDT)). Assuming causal delivery of updates and method termination, any op-based object that satisfies the commutativity property for all concurrent updates, and whose delivery precondition is satisfied by causal delivery, is SEC.
(CvRDT emulation). Any SEC op-based object can be emulated by a SEC state-based object of a corresponding interface.
Akka集群分片
作为抛砖引玉,这里先介绍Akka Sharding的实现方式,再来介绍Akka Distributed Data。
Akka Sharding也是一种分布式集群的实现4 ,集群分片主要应用于需要有大量带有状态的Actor处理大量数据的情形。因为要处理Actor的状态,所以很多例子都要求分片上的Actor都是继承了PersistenceActor。为了避免混淆,我们这里不引入PersistenceActor,单独阐述Cluster Sharding的机制。
由于集群分片需要实现位置透明,即ShardRegion.ExtractEntityId
需要通过EntityId
知会分片Actor的具体位置,从而向对应的Actor发送消息;当然你也可以不需要知会Actor的位置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 object CounterClusterConfig { final case class EntityEnvelope (entityId: String , payload: Any ) final case class Get (entityId: String ) val numberOfShards = 100 val extractEntityId: ShardRegion .ExtractEntityId = { case EntityEnvelope (entityId, payload) => (entityId, payload) case msg@Get (entityId) => (entityId, msg) } val extractShardId: ShardRegion .ExtractShardId = { case EntityEnvelope (entityId, _) => (math.abs(entityId.hashCode) % numberOfShards).toString case Get (entityId) => (math.abs(entityId.hashCode) % numberOfShards).toString case ShardRegion .StartEntity (entityId) => (math.abs(entityId.hashCode) % numberOfShards).toString } } object Counter { case object Increment case object Decrement case object Stop final case class CounterChanged (delta: Int ) } class Counter extends Actor with ActorLogging { import com.lightbend.akka.np_sharding.Counter ._ import com.lightbend.akka.np_sharding.CounterClusterConfig .Get import scala.concurrent.duration._ context.setReceiveTimeout(120. seconds) val entityType: String = getClass.getSimpleName val entityId : String = self.path.name override def preStart (): Unit = { log.debug("==>1. I was created..." ) } override def postStop (): Unit = { log.debug("==>4. I was stop..." ) } override def receive : Receive = { case Get (id) => log.debug(s"==>2. $id established..." ) case it@Increment => log.debug("==>3. I am increase..." ) context stop self } }
ShardRegion每次收到来自客户端的消息时,如果分片中没有对应ID的Actor,则会先创建该Actor,然后发送消息;若存在该ID的Actor,则消息直接被Actor处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 val counterRegion: ActorRef = ClusterSharding (system).start( typeName = "Counter" , entityProps = Props [Counter ], settings = ClusterShardingSettings (system), extractEntityId = CounterClusterConfig .extractEntityId, extractShardId = CounterClusterConfig .extractShardId) system.scheduler.schedule(0. second, 5. seconds, new Runnable { override def run (): Unit = { val id = java.util.UUID .randomUUID.toString counterRegion ! Get (id) counterRegion ! EntityEnvelope (id, Increment ) } } ) sys.addShutdownHook { Await .result(system.whenTerminated, Duration .Inf ) }
分片Region类似于线程池,既然可以重复利用,为什么不创建一个监督机制。这样每个Actor都是位置透明的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class CounterSupervisor extends Actor { val counter = context.actorOf(Props [Counter ], "theCounter" ) override val supervisorStrategy = OneForOneStrategy () { case _: IllegalArgumentException ⇒ SupervisorStrategy .Resume case _: ActorInitializationException ⇒ SupervisorStrategy .Stop case _: DeathPactException ⇒ SupervisorStrategy .Stop case _: Exception ⇒ SupervisorStrategy .Restart } def receive = { case msg ⇒ counter forward msg } }``` 默认地,Akka Sharding 使用的是[Distributed Data Mode ](https: ```scala akka.cluster.sharding.state-store-mode = ddata
你也可以使用Persistence Mode,
1 akka.cluster.sharding.state-store-mode = persistence
这个描述的Distributed Data Mode跟接下来的 Akka Distribted Data并不是一回事。关于更多Akka Sharding 的技术细节可以参考官网。
Akka Distributed Data
Akka Distributed Data 是一个用于节点间共享数据的模组。它被设计成Key-Value存储,Value实现了CRDT(Conflict Free Replicated Data Types)。它允许节点的数据可以从其它任意节点更新,二不需要经过Coordinator
处理,CRDT的Value总是合并(converge)。
前面已经描述了CRDT的原理和结构,那么理解Akka Distribted Data就可以得心应手了。区别于Akka Sharding,distributed-data中的Replicator
是API提供的,所以直接拿来用就可以了。
副本(Replicator),声明如下:
1 2 3 4 5 import akka.cluster.ddata._ implicit val cluster = Cluster (context.system) val replicator: ActorRef = DistributedData (context.system).replicator
Akka Distributed Data实际上相当于Redis的Key-Value存储,并且一致性都是可调的。由于Key的特殊性,它是Value类型的编码形式。
1 val Key = ORMapKey .create[String , StoredOrder ]("orders" )
这里的Key
变量以大写开头,允许用match
表达式匹配。这是Scala语言规范,
1 2 3 4 5 8.1.1 Variable Patterns [...] A variable pattern x is a simple identifier which starts with a *lower case* letter. It matches any value, and binds the variable name to that value. [...]
Replicator
实际上是一个简单的Actor,我们通过消息协议进行副本的通讯。例如要处理更新操作,让Replicator
发送Replicator.Update
消息,
Replicator 1 2 3 4 5 6 7 8 9 10 def storeOrderValidation (id: OrderIdentifier , storedOrder: StoredOrder , request: StoreOrderValidation ) = { replicator ! Replicator .Update ( key = Key , initial = ORMap .empty[String , StoredOrder ], writeConsistency = Replicator .WriteMajority (5. seconds), request = Some (request) ) { orders => orders + (id.i.toString -> storedOrder) } }
副本需要知会Key是什么
分布式数据初始化值,这里定义了Key对应的Value是空的。Data Type为ORMap[String, StoredOrder]
写一致性,它可以是WriteLocal
,WriteTo(number of nodes)
,WriteMajority
以及WriteAll
。这里我们使用WriteMajority
。表示立即写入N/2 + 1个节点(N是集群中节点的个数)
可选的。该对象附带更新请求一并传递。
这里是一个函数,作用于要修改的值。例如这里的数据是添加一个新的StoredOrder
到map中。
1 2 3 4 5 object Update { def apply [A <: ReplicatedData ]( key: Key [A ], initial: A , writeConsistency: WriteConsistency , request: Option [Any ] = None )(modify: A ? A ): Update [A ] }
整个过程就是,Update
消息携带的Key
,通过modify
函数,写入Value
到ORMap
中。那么接下来会出现3种情况:
更新成功
更新全部失败
介于两者之间,部分更新成功
如果所有都按计划执行,将返回一个UpdateSuccess
消息,表示更新成功。
1 2 case Replicator .UpdateSuccess (Key , Some (request: StoreOrderValidation )) => request.replyTo ! OrderValidationStored (request.id, request.order)
或者更新失败,返回得到一个ModifyFailure
。
最后一种情况,“我们不确定是否成功,但它确实运行了”,这时包含有UpdateTimeout
和StoreFailure
。
UpdateTimeout
的出现,取决于事先定义的一致性等级。例如这里定义的是Replicator.WriteMajority(5.seconds)
,表示主节点在5秒的时间内没有应答。这种情况出现,可能是其它几个节点处理消息非常慢、或者网络延迟等等因素。如何处理这种情况取决于真实的案例。
StoreFailure
,表示本地持久化存储出现了问题。这种问题出现之前,需先额外定义是否进行本地的持久化。
一个非常实用的操作是,我们可以监听分布式数据的改变。
1 replicator ! Replicator .Subscribe (OrderStorage .Key , self)
当发生改变时,会得到一个Replicator.Changed
消息:
1 2 3 case change @ Replicator .Changed (OrderStorage .Key ) => val allOrders: Map [String , StoredOrder ] = change.get(OrderStorage .Key ).entries
对于Update ,一致性等级大概有:
WriteLocal
the value will immediately only be written to the local replica, and later disseminated with gossip
WriteTo(n)
the value will immediately be written to at least n replicas, including the local replica
WriteMajority
the value will immediately be written to a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
WriteAll
the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group)
对于Get ,一致性等级有:
ReadLocal
the value will only be read from the local replica
ReadFrom(n)
the value will be read and merged from n replicas, including the local replica
ReadMajority
the value will be read and merged from a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
ReadAll
the value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group)
前面说过,Value的实现是一个CvRDT,它的定义为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 trait ReplicatedData { type T < : ReplicatedData def merge (that: T ): T }
该数据结构要求有merge
函数,用于合并其它副本传递过来的数据。
1 2 3 object ORMapKey { def create [A , B <: ReplicatedData ](id: String ): Key [ORMap [A , B ]] = ORMapKey (id) }
该CvRDT被定义为Data Type函数的上界,以实现存储功能。
你会发现,merge
函数返回一个T
,不是Option[T]
也不是Try[T]
——它证明了**merge
总是正常的**。你会发现,merge
函数是一个单调函数,总是向一个方向增长。
Akka api提供了一些基础数据类型 ,用于实现我们的分布式数据的存储。
对于上面的例子,可以直接操作数据,因为它本身就是个Key-Value,下面使用分布式方案实现基于内存的存储实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props import akka.cluster.Cluster import akka.cluster.ddata.DistributedData import akka.cluster.ddata.LWWMap import akka.cluster.ddata.LWWMapKey object ReplicatedCache { def props : Props = Props [ReplicatedCache ] private final case class Request (key: String , replyTo: ActorRef ) final case class PutInCache (key: String , value: Any ) final case class GetFromCache (key: String ) final case class Cached (key: String , value: Option [Any ] ) final case class Evict (key: String ) } class ReplicatedCache extends Actor { import akka.cluster.ddata.Replicator ._ import ReplicatedCache ._ val replicator = DistributedData (context.system).replicator implicit val cluster = Cluster (context.system) def dataKey (entryKey: String ): LWWMapKey [String , Any ] = LWWMapKey ("cache-" + math.abs(entryKey.hashCode) % 100 ) def receive = { case PutInCache (key, value) => replicator ! Update (dataKey(key), LWWMap (), WriteLocal )(_ + (key -> value)) case Evict (key) => replicator ! Update (dataKey(key), LWWMap (), WriteLocal )(_ - key) case GetFromCache (key) => replicator ! Get (dataKey(key), ReadLocal , Some (Request (key, sender()))) case g @ GetSuccess (LWWMapKey (_), Some (Request (key, replyTo))) => g.dataValue match { case data: LWWMap [_, _] => data.asInstanceOf[LWWMap [String , Any ]].get(key) match { case Some (value) => replyTo ! Cached (key, Some (value)) case None => replyTo ! Cached (key, None ) } } case NotFound (_, Some (Request (key, replyTo))) => replyTo ! Cached (key, None ) case _: UpdateResponse [_] => } }
TDD案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 import scala.concurrent.duration._import akka.cluster.Cluster import akka.cluster.ddata.DistributedData import akka.cluster.ddata.Replicator .GetReplicaCount import akka.cluster.ddata.Replicator .ReplicaCount import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._import com.typesafe.config.ConfigFactory object ReplicatedCacheSpec extends MultiNodeConfig { val node1 = role("node-1" ) val node2 = role("node-2" ) val node3 = role("node-3" ) commonConfig(ConfigFactory .parseString(""" akka.loglevel = INFO akka.actor.provider = "cluster" akka.log-dead-letters-during-shutdown = off """ ))} class ReplicatedCacheSpecMultiJvmNode1 extends ReplicatedCacheSpec class ReplicatedCacheSpecMultiJvmNode2 extends ReplicatedCacheSpec class ReplicatedCacheSpecMultiJvmNode3 extends ReplicatedCacheSpec class ReplicatedCacheSpec extends MultiNodeSpec (ReplicatedCacheSpec ) with STMultiNodeSpec with ImplicitSender { import ReplicatedCacheSpec ._ import ReplicatedCache ._ override def initialParticipants = roles.size val cluster = Cluster (system) val replicatedCache = system.actorOf(ReplicatedCache .props) def join (from: RoleName , to: RoleName ): Unit = { runOn(from) { cluster join node(to).address } enterBarrier(from.name + "-joined" ) } "Demo of a replicated cache" must { "join cluster" in within(20. seconds) { join(node1, node1) join(node2, node1) join(node3, node1) awaitAssert { DistributedData (system).replicator ! GetReplicaCount expectMsg(ReplicaCount (roles.size)) } enterBarrier("after-1" ) } "replicate cached entry" in within(10. seconds) { runOn(node1) { replicatedCache ! PutInCache ("key1" , "A" ) } awaitAssert { val probe = TestProbe () replicatedCache.tell(GetFromCache ("key1" ), probe.ref) probe.expectMsg(Cached ("key1" , Some ("A" ))) } enterBarrier("after-2" ) } "replicate many cached entries" in within(10. seconds) { runOn(node1) { for (i ← 100 to 200 ) replicatedCache ! PutInCache ("key" + i, i) } awaitAssert { val probe = TestProbe () for (i ← 100 to 200 ) { replicatedCache.tell(GetFromCache ("key" + i), probe.ref) probe.expectMsg(Cached ("key" + i, Some (i))) } } enterBarrier("after-3" ) } "replicate evicted entry" in within(15. seconds) { runOn(node1) { replicatedCache ! PutInCache ("key2" , "B" ) } awaitAssert { val probe = TestProbe () replicatedCache.tell(GetFromCache ("key2" ), probe.ref) probe.expectMsg(Cached ("key2" , Some ("B" ))) } enterBarrier("key2-replicated" ) runOn(node3) { replicatedCache ! Evict ("key2" ) } awaitAssert { val probe = TestProbe () replicatedCache.tell(GetFromCache ("key2" ), probe.ref) probe.expectMsg(Cached ("key2" , None )) } enterBarrier("after-4" ) } "replicate updated cached entry" in within(10. seconds) { runOn(node2) { replicatedCache ! PutInCache ("key1" , "A2" ) replicatedCache ! PutInCache ("key1" , "A3" ) } awaitAssert { val probe = TestProbe () replicatedCache.tell(GetFromCache ("key1" ), probe.ref) probe.expectMsg(Cached ("key1" , Some ("A3" ))) } enterBarrier("after-5" ) } } }
性能优化
如果没有做性能优化,将会是萦绕我们夜晚的致命罪孽。在生产环境,我们需要做:
自定义的Data Type需要实现序列化
实现delta-CRDTs
避免更新发送full state
删除完成的记录
序列化是性能优化的关键,Java自身的序列方案并不是最优的。建议使用kryo
或谷歌的protobuf
delta-CRDT主要用于减少full state Update
的发送,它表示以一定的顺序传播更新。
因为是冲突自由(Conflict Free)的,在使用诸如ORMap
时,如果并发地添加和删除一个条目,添加会获胜。你不能删除一个不存在的条目。这会引入一个问题,例如某个节点删除订单的同时,另外一个节点添加订单,添加订单被执行。原来的订单没有删除,所以需要额外修剪未移除的已经“完成”的条目。
总结,在副本同步操作时,有些方面并不完全正确的:
对于OrderHandler
是没有副本机制的——如果某个节点故障,所有当前处理的Update
消息将停留在地域边缘,客户端也不会有任何响应。
不要将东西持久化存储!现实中这是相当疯狂的事情!(设想副本在不断地写入、不断地更新同步)