第十二章:Akka构建扩展、分布式应用

主要内容:

  1. Akka背后哲学思想
  2. Actor并发、STM、代理以及数据流
  3. 构建一个大型的可扩展应用:Akkaoogle

本章介绍一个已有的Scala工具包:Akka,让你为JVM平台构建新一代的、事件驱动的、容错的、可伸缩的分布式应用。Akka提供了多并发的抽象,本章着重探索其内容。前面内容,仅介绍了Actor面向消息的(message-oriented)并发使用。这里将继续探索诸如 STM、Agent和Dataflow的并发抽象内容。

为了理解Akka各部分是如何整合在一起的,我们将使用Akka来构建一个真实的搜索应用:Akkaoogle。该应用类似于Froogle,它是google的一个服务,用于搜索产品的最低价格。你将马上构建这个产品,这样你就可以看到应该在什么场合,使用怎样的Akka特性。

注意 本章所有覆盖的Akka特性,同时都有相应的Java API。但不在本章阐述Java方面的内容,你可以查阅[文档](http://akka.io/docs/ for details)了解该方面的内容。

Akka由Scala编写,却在Java和Scala APIs上开发了所有的特性。因为这里只介绍Scala方面的内容,也主要讨论Scala的API,当然也包含有Java的例子。你可以仿照Akkaoogle在Scala上的例子,构建一个Java的Akkaoogle版本,因为两者的API是相同的。最开始会先介绍Akka背后的哲学思想,理解之后将进入到Akka项目中,并尝试解决实际问题。

12~1〖The philosophy behind Akka〗P345

Akka背后的哲学思想是简单的:使开发者更容易构建正确的、并发的、可伸缩的和容错的应用。为此,Akka提供了一个高层抽象,用于处理并发性、可伸缩性和错误问题。图12.1展示了3个核心模块concurrency、scalability和fault tolerance。

Figure 12.1

这些并发模块提供了处理并发性相关问题的选项。到目前为止,我确信你只会Actor(面向消息的并发性)。但Actor不是个从一而终的并发解决方案。你需要理解Akka模块的其它替代方案。在下面的小节将探索所有该内容。在内核,Akka是一个基于事件的(event-based)平台,依赖于Actor的消息传递和伸缩性。Akka任由你使用本地和远程的Actor。通过路由(routing)使用本地Actor,你可以向上扩展;使用远程Actor则帮助你水平扩展。在本章最后,将看到有关于此的更多详细内容。

12~2〖Simple concurrency with Akka〗P346

为了拓展你的应用,使用并发。在第9章学习到,线程是实现并发的一个困难的、易出错的方式,它应该作为你的最后的一个选择方案。问题是最优的方案是什么,第二、第三方案呢?本小节将介绍如何选择合适的方案。表 12.1 描述了Akka中的所有可用并发技术。好消息是,你可以组合所有这些并发技术,它是大多数Akka开发者最终的做法。

name Description
Actors 一个Actor是一个异步处理消息并封装状态的一个对象。Actor实现了消息传递的并发性。
STM(Software transactional memory) 软件事务内存,模拟数据库事务的机制,控制在并行计算时对共享内存的访问控制。它是锁的一种替代机制,以及提供了可组合性。
Agents Agents提供了在可变数据(mutable data)上的抽象。它仅允许你通过一个异步的写(write)动作来改变数据。
Dataflow Dataflow并发性是确定的。这意味着每次执行都表现出相同的行为。因此你的应用最先是死锁的,它会始终是死锁,这有帮助你debug问题。Akka使用Future来实现 Oz-style 的dataflow

这些选项提供了设计正确并发应用的灵活性。例如你可以使用Actor来建模一个应用,用STM或Agents来处理可变状态,以及使用Dataflow并发来组合多并发进程。这都有无尽的可能性。

注意 Akka不再包含STM模块,取而代之的是支持Scala STM。

下面开始Akka并发世界之旅————这会是一个有趣的旅程。

1221〖Remote actors〗P347

在第9章我们详细探索了Actor内容。Actor编程不仅仅局限于单个JVM,因此每个Actor可以跨多个JVM进行交流。Akka的远程Actor允许你在远程设备上部署Actor,透明地来回发送消息。远程Actor是实现扩展的、分布式应用的一个很好的方式。这些消息,使用Google protocol buffer 进行自动序列化,两个节点间的交流,通过使用JBoss Netty 处理。可以把Google protocl buffer 认为是小巧而又快速的XML,Netty认为是一个非IO阻塞(non-blocking I/O)的实现,它们让Akka高效地使用线程为I/O操作。

Figure 12.2

Akka实现了透明的远程调用,即在部署时Actor的运程(remoteness)被完全配置。你可以在构建方案时使用本地Actor,在部署期间为每个独立的Actor配置运程详情。

注意 在将来的Akka版本,Netty将由一个基于Actor的I/O库 Actor I/O所代替。

在此之前,先让我们为远程Actor添加依赖项。Akka是模块化的,如其添加整个akka库,你仅需要添加Actor依赖库即可。下面是其远程依赖的build.sbt文件配置。

1
2
3
4
5
6
7
8
resolvers ++= Seq(
"Akka Repo" at "http://akka.io/repository",
"Typesafe Repo" at "http://repo.typesafe.com/typesafe/repo"
)
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.3.6",
"com.typesafe.akka" %% "akka-remote % "2.3.6"
)

resolvers定义了依赖的地址,libraryDependencies则添加远程Actor。

我们将使用第9章的word count例子,更改原来读取文本文件的方式,改为读取URL的内容。目的是链接到URL,并计算页面的单词数。在Java中要创建一个Akka Actor,你需要继承akka.actor.UntypedActor类,并重写onReceive方法:

1
2
3
4
5
6
import akka.actor.UntypedActor;
public class WordCountWorker extends UntypedActor {
@Override
public void onReceive(Object message) {
}
}

该类被称为UntypedActor因为Akka包括有一个TypedActor概念。这个类型Actor实现了主动对象(active object)模式,即将任意的POJO接口,转变为一个异步的API。

注意 Akka的类型化Actor(typed actor)是主动对象模式(active object pattern)的一个实现。它将同步方法调用变为异步派遣。使用类型化Actor的一个优势在于,你可以有一个静态的编译的类型约定,这样你不需要定义消息。在Akka文档中读取更多有关内容。

因为你的wordCountWorker需要处理FileToCount消息,你需要将接收的参数消息转换类型为FileToCount

1
2
3
4
5
if (message instanceof FileToCount) {
FileToCount c = (FileToCount)message;
} else {
throw new IllegalArgumentException("Unknown message: " + message);
}

这里使用了instanceof来检验接收消息的类型,如果消息不是FileToCount类型,将抛出一个异常。在Scala代码,需要添加countWords方法到FileToCount case 类,以计算所有单词:

1
2
3
4
5
case class FileToCount(url: String) {
def countWords = {
Source.fromURL(new URL(url)).getLines.foldRight(0)(_.split(" ").size + _)
}
}

WordCountWorker Actor,你可以调用 countWords 方法来统计单词数:

1
2
FileToCount c = (FileToCount)message;
Integer count = c.countWords();

要给发送者回复一个响应,使用 getSender().tell(...) 方法。方法 tell 允许Actor向发送者回复。要向主干回复,子Actor需要构造WordCount 消息。

1
2
3
FileToCount c = (FileToCount)message;
Integer count = c.countWords();
getSender().tell(new WordCount(c.url(), count));

getSelf 方法返回当前Actor的引用。下面代码清单为完整的WordCountWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package countwords;
import akka.actor.UntypedActor;

public class WordCountWorker extends UntypedActor {
@Override
public void onReceive(Object message) {
if (message instanceof FileToCount) {
FileToCount c = (FileToCount)message;
Integer count = c.countWords();
getSender().tell(new WordCount(c.url(), count), getSelf());
}
else {
throw new IllegalArgumentException("Unknown message: " + message);
}
}
}

为了利用好远程Actor,我们在一个JVM中,将所有worker actor从master actor分离。为此,需要创建两个Actor系统。配置Akka Actor最简单的方式是在classpath中提供一个配置文件。你可以在akka文档中找到所有配置属性的更多内容。下面例子定义两个Actor系统:主Actor系统和工作Actor系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
workersystem {
akka {
actor{provider = "akka.remote.RemoteActorRefProvider"}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
// transports = "akka.remote.netty.NettyRemoteTransport"
//log-sent-messages = on
//log-received-messages = on
netty.tcp {
hostname = "127.0.0.1"
port = 2560
}
}
}
}

mainsystem {
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.tcp.hostname = "127.0.0.1"
actor {
deployment {
/wordCountMaster {
remote = "akka.tcp://workersystem@127.0.0.1:2560"
}
}
}
}
}

通过Actor系统提供的灵活性定义配置,将每个Actor系统进行分离。下面元素添加运程(remoteness)到Actor系统:

  • Actor provider 改为 akka.remote.RemoteActorRefProvider
  • 添加主机名。并确保IP地址可访问。
  • 添加远程Actor系统监听的端口号。
  • 映射将被部署的Actor系统的名称。

现在将上述配置保存到src/main/resources文件夹下的application.conf文件中。要是workersystem能够在一个不同的JVM上运行,在该终端执行下面代码:

1
2
3
4
5
6
7
package countwords
import akka.actor._
import com.typesafe.config.ConfigFactory
object WorkerSystem extends App {
val workerSystem = ActorSystem("workersystem",
ConfigFactory.load.getConfig("workersystem"))
}

它将开启"workersystem",并监听来自端口号为2560的消息。现在创建一个新的Actor。它用于运行主Actor系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
case class FileToCount(url: String) {
def countWords = {
Source.fromURL(new URL(url)).getLines.foldRight(0)(_.split(" ").size + _)
}
}
case class WordCount(url: String, count: Int)
case class StartCounting(urls: Seq[String], numActors: Int)

object MainSystem {
class MainActor(accumulator: ActorRef) extends Actor {
def receive = {
case "start" =>
val urls = List("http://www.infoq.com/",
"http://www.dzone.com/links/index.html",
"http://www.manning.com/",
"http://www.reddit.com/")
accumulator ! StartCounting(urls, 4)

}
}

def main(args: Array[String]) = run

private def run = {
val mainSystem = ActorSystem("main", ConfigFactory.load.getConfig("mainsystem"))
val accumulator = mainSystem.actorOf(Props[WordCountMaster], name = "wordCountMaster")
val m = mainSystem.actorOf(Props(new MainActor(accumulator)))
m ! "start"

}
}

当你在两个不同的JVM实例上开启WorkerSystemMainSystem,将实现了workers Actor运行在一个JVM,而主Actor运行在另一个JVM上。现在你实现了Akka的分布式部署方案。

Figure 12.remote

1222〖Making mutable data safe with STM〗P351

软件事务内存(STM, Software transactional memory)将一个Java堆栈变为一个事务数据集。STM类似于数据库事务,但是使用内存代替。因为STM不能在内存中实现持久化,你仅能获得事务ACID的前三个属性(atomicity,consistency,isolation,durability):

  • 原子性——所偶修改应该遵循“all or nothing” 规则。在STM,所有修改由一个不可分割的事务处理,一项修改失败将导致所有的改动回滚。
  • 一致性——表明一个STM事务将系统从一个一致状态到另一个一致性状态。如果你想要删除一个Map中的一个元素,再插入到另外一个Map,那么STM事务最终,两个Map都被适当地修改。
  • 隔离性——STM的事务间相互不可见,彼此孤立,不互相干预。

STM最好的部分是锁自由。它从异常中回滚,并且是可组合的。你可以将两个小的STM操作组合成为一个大的STM操作。在展示STM例子之前,先让我们理解一下STM的状态是什么,以及它是如何表示的。

HOW STATE IS DEFINED IN STM

让我们看看在命令式编程中状态是如何处理的。图12.3展示了状态是如何处理的。你直接访问内存中的数据,并改变这个数据。在图中,一个对象A,直接被B和C访问数据。问题是在并发世界这种方式无效。当一些其它驻留在B或C的线程或进程尝试访问数据,而A正尝试修改数据时,会怎样?结果显然不是我们所期望的。

Figure 12.3

要解决这种方式上的问题,STM定义了不同的可变状态。在STM中,state被定义为值,它是一个在特殊点上具体标识的实体。一个值value是指不会变化的(不可变的)。标识identity是指在一个给定的点上对一个值的引用。图12.4展示了STM的这种结构。可变部分是唯一标识的,它可一系列值相关联。STM使得从一个值到另一个值的可变引用具有原子性。当驻留在B和C的其它进程或线程,访问正在修改的A时,会发生什么?你会看到与B或C关联的值,因为STM事务被隔离的,事务的部分改变在外部不可见。

这种根据标识和值来定义状态的思想,来源于编程语言Clojure 。现在看看在Akka中,STM是如何工作的。

HANDLING MUTABLE DATA IN AKKA USING STM

Akka使用Scala的STM库作为它的STM。在SBT项目中添加该库:

1
2
3
4
5
6
resolvers += ("Typesafe Repository" at "http://repo.typesafe.com/typesafe/
releases/")
libraryDependencies ++= Seq(
"org.scala-stm" %% "scala-stm" % "0.7",
"org.specs2" %% "specs2" % "1.13" % "test"
)

为演示STM如何工作,我们以一个小实例着手,你创建一个为可变Map删除和增加元素的原子操作。为了管理可变性,用scala.concurrent.stm.Ref来封装这个可变的值:

1
2
3
4
5
val ref1 = Ref(HashMap[String, Any](
"service1" -> "10",
"service2" -> "20",
"service3" -> null))
val ref2 = Ref(HashMap[String, Int]())

这两个值不是别的,就是多并发参与者中的值的可变引用。上述代码片段创建了两个值,指向可变的HashMap。要在Ref上处理操作,你需要使用定义在STM包中的atomic方法。Scala的STM库创建事务对象,并授权调用者处理事务性的读和写操作。闭包内的任何refs的改变都在一个STM事务中完成。例如,下面代码中,由ref2向Map添加一个新的元素:

1
2
3
4
5
def atomicInsert(key: String, value: Int) = atomic { implicit txn =>
val oldMap = ref2.get
val newMap = oldMap + ( key -> value)
ref2.swap(newMap)
}

通过调用ref2.get得到当前由Ref关联的,使用swap替代的一个新值。如果操作失败,该操作将会回滚。事务参数被隐式标志,因此你不需要直接传递。

要从ref1中实现keyatomic删除,你需要使用定义在Reftransform方法。方法transform允许你转换Ref引用的值:

1
2
3
4
5
6
def atomicDelete(key: String): Option[Any] = atomic {
val oldMap = ref1.get
val value = oldMap.get(key)
ref1.transform(_ - key)
value
}

函数atomicDelete返回删除了的 value。为什么返回 old value?因为之后会用到。

接着讨论关于STM的组合性。假设你构建了一个atomicSwap函数,用于将一个Map的一个元素,移动到另一个Map中。使用STM,这变的容易:你需要做的是封装atomicDeleteatomicInsert这两个函数,如下:

1
2
3
4
def atomicSwap(key: String) = atomic { implicit txn =>
val value: Option[Any] = atomicDelete(key)
atomicInsert(key, Integer.parseInt(value.get.toString))
}

因为ref2仅接收一个Int类型值,在插入之前先要解析为Int。为了完全理解swap的优雅所在,看下面规范:

1
2
3
4
5
6
7
"Atomic operations in composition" should {
"rollback on exception" in {
swap("service3")
ref1.single().contains("service3") must beEqualTo(true)
ref2.single().contains("service3") must beEqualTo(false)
}
}

STM的方法single可以不要求事务,访问Ref的上下文。当尝试转换"service3"时,Integer.parseInt会抛出一个异常。这时删除动作已经成功了,但多亏了STM,整个事务将回滚。锁(lock)可以做到这点吗?不能。

STM很好地构建了小型的一致性操作,以及能够组合这些操作。要学习有关STM的更多内容,查阅Scala STM文档。

1223〖Agents〗P354

Agent提供了对任何独立存储位置异步修改的绑定。一个Agent仅允许提供的一个动作改变这个位置内容。这个动作指的是函数,该函数被异步地应用于Agent的状态,其返回值成为Agent的新状态。 然而,从Agent读取一个值是瞬时的和同步的。因而RefAgent的不同在于,Ref是一个同步读和写;Agent是响应式的(reactive)。要任意异步地提供动作,Akka提供了两个方法:sendsendOffsend方法使用响应式的线程池分配给agent,而sendOff使用一个专有的线程,适用于一个长时间运行的操作。下面是一个Agent相关例子,通过send动作,向一个文件写日志信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import akka.actor.ActorSystem
import akka.agent.Agent
import org.specs2.mutable.Specification

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

implicit val system = ActorSystem("agentExample")
val writer = new FileWriter("src/test/resources/log.txt")
val a = Agent(writer)
a.send { w => w.write("This is a log message"); w}

// wait for reply
val future = a.future
Await.result(future, 100 seconds) should be(writer)

writer.close

Agent将运行,直到调用close方法。因为在其背后,agent由Actor被实现,你需要为agent创建一个Actor系统。如果你想要做更多的花费时间的事情,你需要使用sendoff方法:

1
a.sendOff { someLongRunningProcess }

注意在任何时间,仅有一个send动作被调用。即使是多并发处理发送的操作,这些动作将按照顺序执行。注意这些动作可能在多线程间交错。

为了在你的项目中使用Agents,需要添加下列依赖项到SBT配置文件中:

1
2
3
4
5
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.6.3",
"com.typesafe.akka" %% "akka-agent" % "2.6.3",
"org.specs2" %% "specs2" % "3.8.5" % "test"
)

当使用atomic块,以及消息被挂起直到事务完成,Agent也参与到STM事务。这对于有side-effect的动作很有帮助,如记录日志文件,你不想以STM实现。为什么?因为STM事务如果失败,它们自动重试,意味着你的副作用操作被执行了多次。这可能不是你想要的,因此组合agent和STM,会是在STM事务间,执行副作用动作的一个很好的模式。有时候,Agent的异步性质,人们会迷惑因为agent和Actor类似,但是它们完完全全在设计方式上不同。Agent与数据相关联,你从外部,以一个函数的形式,向Agent发起行为。而Actor的行为定义在内部,数据则以消息的形式发送。

你将在使用Akkaoogle日志事务时再一次看到agent,但现在让我们继续我们下一个并发模型:dataflow。Dataflow是一个很好的从一个程序封装并发的实现方式。它可以顺序读取。

1224〖Dataflow〗P355

数据流并发是一个确定性的并发模型。运行并工作时,它会一直处于工作中而不会有死锁。又或者,如果一开始是死锁,它会总是处于死锁中。这点在并发应用中是一个有力的保障,你可以更容易理解这些代码。数据流并发允许你编写顺序的代码,以并行处理这些操作。限制是你的代码应该是完全无副作用的。因为这些代码被用于处理有副作用操作时,你不能获得确定性的行为。

Dataflow在Akka中使用了Scala的分离延续(delimited continuations)编译器插件实现。要在SBT项目中使用该插件,在配置中添加下列内容:

1
2
3
4
5
6
7
8
scalacOptions += "-P:continuations:enable"

libraryDependencies += "com.typesafe.akka" % "akka-dataflow_2.10" % "2.3.6"

autoCompilerPlugins := true

libraryDependencies <+=
scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % v) }

要用到数据流并发,必须用到数据流变量。一个数据流变量像一个单赋值(single-assignment)变量。一旦被绑定,不会再改变,以及之后任何新值的绑定将被忽略。下列例子定义了一个数据流变量:

1
val messageFromFuture = Promise[String]()

这里的Akka Promise被用作创建一个数据流变量。一个Promise为一个值的读处理,该值会在将来的某个时候被用到。任何数据流操作,会在Future.flow块中被处理:

1
2
3
Future.flow {
messageFromFuture()
}

上述调用会处于一个线程等待中,直到有一个值被绑定到messageFromFutureFuture.flow返回一个Future,这样你可以处理其它操作,而不用阻塞主线程的执行。你可以把Future认为是用于接收某些并发操作结果的数据结构。要为数据流变量指派一个值,使用<<方法:

1
2
3
Future.flow {
messsageFromFuture << "Future looks very cool"
}

一旦一个值被绑定到一个数据流变量,所有Future将等待,该值将被非阻塞,以能继续执行。下面列出使用dataflow变量的一个完整例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import akka.actor.ActorSystem
import akka.dataflow._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Promise
/**
* @author barudisshu
*/
object DataflowExample extends App {

implicit val system = ActorSystem("dataflow")
val messageFromFuture, rawMessage, parsedMessage = Promise[String]()
flow {
messageFromFuture << parsedMessage()
println("z = " + messageFromFuture())
}
flow {
rawMessage << "dlrow olleh"
}
flow {
parsedMessage << toAscii(rawMessage())
}

def toAscii(s: String) = s.reverse

}

下个小节将使用Akka的概念,构建一个应用,实践Akka并发。

12~3〖Building a real-time pricing system: Akkaoogle〗P357

略,请参考本书源码。

12~4〖Adding asynchronous HTTP support with Play2-mini〗P375

略,像Play2、Lift、Spray这些REST框架,基本上都封装了并发接口。Play2框架使用的是Netty作为NIO,包含大量易上手的工具和模版。请参考官网用户指导手册了解详情。

12~5 〖Summary〗P379

在构建框架和应用中,Akka是一个强大的工具。Akka使得并发对编程者来说更容易,它提升了一个抽象层次。Akka并发框架构建在Actor之上,但也提供了所有流行的并发抽象技术。使得你可以在构建下一代应用中有更多的选择。Akka的STM支持可变数据结构的操作,最重要的是,你可以通过组合小部分原子性操作来解决问题。另外我们还探索了另外一个并发模式Agent,它让你从外部发送行为,修改安全管理器内的数据。额外地,还学习了Dataflow并发,通过数据流,能够让我们编写流式的程序,而不用担心它们之间的并发问题,Dataflow并发代码也很好理解和掌握,但要注意dataflow不适用于处理带有副作用的操作。

Akkaoogle是一个典型的并发应用,通过构建该应用,探索了如何思考并发问题,如何约束,设计以及抉择方案。Akka提供丰富的选项、灵活的配置,你应该从Akka上,拾取一些更好的特性和选项,以更好地适应你的应用项目。Akka早已经实践在各种真实应用中,我们应该炙手可热尝试一番。