¶主要内容:
- Akka背后哲学思想
- Actor并发、STM、代理以及数据流
- 构建一个大型的可扩展应用:Akkaoogle
本章介绍一个已有的Scala工具包:Akka,让你为JVM平台构建新一代的、事件驱动的、容错的、可伸缩的分布式应用。Akka提供了多并发的抽象,本章着重探索其内容。前面内容,仅介绍了Actor面向消息的(message-oriented)并发使用。这里将继续探索诸如 STM、Agent和Dataflow的并发抽象内容。
为了理解Akka各部分是如何整合在一起的,我们将使用Akka来构建一个真实的搜索应用:Akkaoogle。该应用类似于Froogle,它是google的一个服务,用于搜索产品的最低价格。你将马上构建这个产品,这样你就可以看到应该在什么场合,使用怎样的Akka特性。
注意 本章所有覆盖的Akka特性,同时都有相应的Java API。但不在本章阐述Java方面的内容,你可以查阅[文档](http://akka.io/docs/ for details)了解该方面的内容。
Akka由Scala编写,却在Java和Scala APIs上开发了所有的特性。因为这里只介绍Scala方面的内容,也主要讨论Scala的API,当然也包含有Java的例子。你可以仿照Akkaoogle在Scala上的例子,构建一个Java的Akkaoogle版本,因为两者的API是相同的。最开始会先介绍Akka背后的哲学思想,理解之后将进入到Akka项目中,并尝试解决实际问题。
¶12~1〖The philosophy behind Akka〗P345
Akka背后的哲学思想是简单的:使开发者更容易构建正确的、并发的、可伸缩的和容错的应用。为此,Akka提供了一个高层抽象,用于处理并发性、可伸缩性和错误问题。图12.1展示了3个核心模块concurrency、scalability和fault tolerance。
这些并发模块提供了处理并发性相关问题的选项。到目前为止,我确信你只会Actor(面向消息的并发性)。但Actor不是个从一而终的并发解决方案。你需要理解Akka模块的其它替代方案。在下面的小节将探索所有该内容。在内核,Akka是一个基于事件的(event-based)平台,依赖于Actor的消息传递和伸缩性。Akka任由你使用本地和远程的Actor。通过路由(routing)使用本地Actor,你可以向上扩展;使用远程Actor则帮助你水平扩展。在本章最后,将看到有关于此的更多详细内容。
¶12~2〖Simple concurrency with Akka〗P346
为了拓展你的应用,使用并发。在第9章学习到,线程是实现并发的一个困难的、易出错的方式,它应该作为你的最后的一个选择方案。问题是最优的方案是什么,第二、第三方案呢?本小节将介绍如何选择合适的方案。表 12.1 描述了Akka中的所有可用并发技术。好消息是,你可以组合所有这些并发技术,它是大多数Akka开发者最终的做法。
name | Description |
---|---|
Actors |
一个Actor是一个异步处理消息并封装状态的一个对象。Actor实现了消息传递的并发性。 |
STM(Software transactional memory) |
软件事务内存,模拟数据库事务的机制,控制在并行计算时对共享内存的访问控制。它是锁的一种替代机制,以及提供了可组合性。 |
Agents |
Agents提供了在可变数据(mutable data)上的抽象。它仅允许你通过一个异步的写(write)动作来改变数据。 |
Dataflow |
Dataflow并发性是确定的。这意味着每次执行都表现出相同的行为。因此你的应用最先是死锁的,它会始终是死锁,这有帮助你debug问题。Akka使用Future 来实现 Oz-style 的dataflow |
这些选项提供了设计正确并发应用的灵活性。例如你可以使用Actor来建模一个应用,用STM或Agents来处理可变状态,以及使用Dataflow并发来组合多并发进程。这都有无尽的可能性。
注意 Akka不再包含STM模块,取而代之的是支持Scala STM。
下面开始Akka并发世界之旅————这会是一个有趣的旅程。
¶1221〖Remote actors〗P347
在第9章我们详细探索了Actor内容。Actor编程不仅仅局限于单个JVM,因此每个Actor可以跨多个JVM进行交流。Akka的远程Actor允许你在远程设备上部署Actor,透明地来回发送消息。远程Actor是实现扩展的、分布式应用的一个很好的方式。这些消息,使用Google protocol buffer 进行自动序列化,两个节点间的交流,通过使用JBoss Netty 处理。可以把Google protocl buffer 认为是小巧而又快速的XML,Netty认为是一个非IO阻塞(non-blocking I/O)的实现,它们让Akka高效地使用线程为I/O操作。
Akka实现了透明的远程调用,即在部署时Actor的运程(remoteness)被完全配置。你可以在构建方案时使用本地Actor,在部署期间为每个独立的Actor配置运程详情。
注意 在将来的Akka版本,Netty将由一个基于Actor的I/O库 Actor I/O所代替。
在此之前,先让我们为远程Actor添加依赖项。Akka是模块化的,如其添加整个akka库,你仅需要添加Actor依赖库即可。下面是其远程依赖的build.sbt
文件配置。
1 | resolvers ++= Seq( |
resolvers
定义了依赖的地址,libraryDependencies
则添加远程Actor。
我们将使用第9章的word count例子,更改原来读取文本文件的方式,改为读取URL的内容。目的是链接到URL,并计算页面的单词数。在Java中要创建一个Akka Actor,你需要继承akka.actor.UntypedActor
类,并重写onReceive
方法:
1 | import akka.actor.UntypedActor; |
该类被称为UntypedActor
因为Akka包括有一个TypedActor
概念。这个类型Actor实现了主动对象(active object)模式,即将任意的POJO接口,转变为一个异步的API。
注意 Akka的类型化Actor(typed actor)是主动对象模式(active object pattern)的一个实现。它将同步方法调用变为异步派遣。使用类型化Actor的一个优势在于,你可以有一个静态的编译的类型约定,这样你不需要定义消息。在Akka文档中读取更多有关内容。
因为你的wordCountWorker
需要处理FileToCount
消息,你需要将接收的参数消息转换类型为FileToCount
:
1 | if (message instanceof FileToCount) { |
这里使用了instanceof
来检验接收消息的类型,如果消息不是FileToCount
类型,将抛出一个异常。在Scala代码,需要添加countWords
方法到FileToCount
case 类,以计算所有单词:
1 | case class FileToCount(url: String) { |
在WordCountWorker
Actor,你可以调用 countWords
方法来统计单词数:
1 | FileToCount c = (FileToCount)message; |
要给发送者回复一个响应,使用 getSender().tell(...)
方法。方法 tell
允许Actor向发送者回复。要向主干回复,子Actor需要构造WordCount
消息。
1 | FileToCount c = (FileToCount)message; |
getSelf
方法返回当前Actor的引用。下面代码清单为完整的WordCountWorker
。
1 | package countwords; |
为了利用好远程Actor,我们在一个JVM中,将所有worker actor从master actor分离。为此,需要创建两个Actor系统。配置Akka Actor最简单的方式是在classpath中提供一个配置文件。你可以在akka文档中找到所有配置属性的更多内容。下面例子定义两个Actor系统:主Actor系统和工作Actor系统。
1 | workersystem { |
通过Actor系统提供的灵活性定义配置,将每个Actor系统进行分离。下面元素添加运程(remoteness)到Actor系统:
- Actor provider 改为
akka.remote.RemoteActorRefProvider
。 - 添加主机名。并确保IP地址可访问。
- 添加远程Actor系统监听的端口号。
- 映射将被部署的Actor系统的名称。
现在将上述配置保存到src/main/resources文件夹下的application.conf文件中。要是workersystem
能够在一个不同的JVM上运行,在该终端执行下面代码:
1 | package countwords |
它将开启"workersystem",并监听来自端口号为2560的消息。现在创建一个新的Actor。它用于运行主Actor系统:
1 | case class FileToCount(url: String) { |
当你在两个不同的JVM实例上开启WorkerSystem
和MainSystem
,将实现了workers Actor运行在一个JVM,而主Actor运行在另一个JVM上。现在你实现了Akka的分布式部署方案。
¶1222〖Making mutable data safe with STM〗P351
软件事务内存(STM, Software transactional memory)将一个Java堆栈变为一个事务数据集。STM类似于数据库事务,但是使用内存代替。因为STM不能在内存中实现持久化,你仅能获得事务ACID的前三个属性(atomicity,consistency,isolation,durability):
- 原子性——所偶修改应该遵循“all or nothing” 规则。在STM,所有修改由一个不可分割的事务处理,一项修改失败将导致所有的改动回滚。
- 一致性——表明一个STM事务将系统从一个一致状态到另一个一致性状态。如果你想要删除一个Map中的一个元素,再插入到另外一个Map,那么STM事务最终,两个Map都被适当地修改。
- 隔离性——STM的事务间相互不可见,彼此孤立,不互相干预。
STM最好的部分是锁自由。它从异常中回滚,并且是可组合的。你可以将两个小的STM操作组合成为一个大的STM操作。在展示STM例子之前,先让我们理解一下STM的状态是什么,以及它是如何表示的。
HOW STATE IS DEFINED IN STM
让我们看看在命令式编程中状态是如何处理的。图12.3展示了状态是如何处理的。你直接访问内存中的数据,并改变这个数据。在图中,一个对象A,直接被B和C访问数据。问题是在并发世界这种方式无效。当一些其它驻留在B或C的线程或进程尝试访问数据,而A正尝试修改数据时,会怎样?结果显然不是我们所期望的。
要解决这种方式上的问题,STM定义了不同的可变状态。在STM中,state
被定义为值,它是一个在特殊点上具体标识的实体。一个值value
是指不会变化的(不可变的)。标识identity
是指在一个给定的点上对一个值的引用。图12.4展示了STM的这种结构。可变部分是唯一标识的,它可一系列值相关联。STM使得从一个值到另一个值的可变引用具有原子性。当驻留在B和C的其它进程或线程,访问正在修改的A时,会发生什么?你会看到与B或C关联的值,因为STM事务被隔离的,事务的部分改变在外部不可见。
这种根据标识和值来定义状态的思想,来源于编程语言Clojure 。现在看看在Akka中,STM是如何工作的。
HANDLING MUTABLE DATA IN AKKA USING STM
Akka使用Scala的STM库作为它的STM。在SBT项目中添加该库:
1 | resolvers += ("Typesafe Repository" at "http://repo.typesafe.com/typesafe/ |
为演示STM如何工作,我们以一个小实例着手,你创建一个为可变Map删除和增加元素的原子操作。为了管理可变性,用scala.concurrent.stm.Ref
来封装这个可变的值:
1 | val ref1 = Ref(HashMap[String, Any]( |
这两个值不是别的,就是多并发参与者中的值的可变引用。上述代码片段创建了两个值,指向可变的HashMap。要在Ref上处理操作,你需要使用定义在STM包中的atomic
方法。Scala的STM库创建事务对象,并授权调用者处理事务性的读和写操作。闭包内的任何refs
的改变都在一个STM事务中完成。例如,下面代码中,由ref2向Map添加一个新的元素:
1 | def atomicInsert(key: String, value: Int) = atomic { implicit txn => |
通过调用ref2.get
得到当前由Ref
关联的,使用swap
替代的一个新值。如果操作失败,该操作将会回滚。事务参数被隐式标志,因此你不需要直接传递。
要从ref1
中实现key
的atomic
删除,你需要使用定义在Ref
的transform
方法。方法transform
允许你转换Ref
引用的值:
1 | def atomicDelete(key: String): Option[Any] = atomic { |
函数atomicDelete
返回删除了的 value。为什么返回 old value?因为之后会用到。
接着讨论关于STM的组合性。假设你构建了一个atomicSwap
函数,用于将一个Map的一个元素,移动到另一个Map中。使用STM,这变的容易:你需要做的是封装atomicDelete
和atomicInsert
这两个函数,如下:
1 | def atomicSwap(key: String) = atomic { implicit txn => |
因为ref2
仅接收一个Int
类型值,在插入之前先要解析为Int
。为了完全理解swap
的优雅所在,看下面规范:
1 | "Atomic operations in composition" should { |
STM的方法single
可以不要求事务,访问Ref
的上下文。当尝试转换"service3"时,Integer.parseInt
会抛出一个异常。这时删除动作已经成功了,但多亏了STM,整个事务将回滚。锁(lock)可以做到这点吗?不能。
STM很好地构建了小型的一致性操作,以及能够组合这些操作。要学习有关STM的更多内容,查阅Scala STM文档。
¶1223〖Agents〗P354
Agent提供了对任何独立存储位置异步修改的绑定。一个Agent仅允许提供的一个动作改变这个位置内容。这个动作指的是函数,该函数被异步地应用于Agent的状态,其返回值成为Agent的新状态。 然而,从Agent读取一个值是瞬时的和同步的。因而Ref
和Agent
的不同在于,Ref
是一个同步读和写;Agent
是响应式的(reactive)。要任意异步地提供动作,Akka提供了两个方法:send
和sendOff
。send
方法使用响应式的线程池分配给agent,而sendOff
使用一个专有的线程,适用于一个长时间运行的操作。下面是一个Agent相关例子,通过send
动作,向一个文件写日志信息:
1 | import akka.actor.ActorSystem |
Agent将运行,直到调用close
方法。因为在其背后,agent由Actor被实现,你需要为agent创建一个Actor系统。如果你想要做更多的花费时间的事情,你需要使用sendoff
方法:
1 | a.sendOff { someLongRunningProcess } |
注意在任何时间,仅有一个send
动作被调用。即使是多并发处理发送的操作,这些动作将按照顺序执行。注意这些动作可能在多线程间交错。
为了在你的项目中使用Agents,需要添加下列依赖项到SBT配置文件中:
1 | libraryDependencies ++= Seq( |
当使用atomic
块,以及消息被挂起直到事务完成,Agent也参与到STM事务。这对于有side-effect的动作很有帮助,如记录日志文件,你不想以STM实现。为什么?因为STM事务如果失败,它们自动重试,意味着你的副作用操作被执行了多次。这可能不是你想要的,因此组合agent和STM,会是在STM事务间,执行副作用动作的一个很好的模式。有时候,Agent的异步性质,人们会迷惑因为agent和Actor类似,但是它们完完全全在设计方式上不同。Agent与数据相关联,你从外部,以一个函数的形式,向Agent发起行为。而Actor的行为定义在内部,数据则以消息的形式发送。
你将在使用Akkaoogle日志事务时再一次看到agent,但现在让我们继续我们下一个并发模型:dataflow。Dataflow是一个很好的从一个程序封装并发的实现方式。它可以顺序读取。
¶1224〖Dataflow〗P355
数据流并发是一个确定性的并发模型。运行并工作时,它会一直处于工作中而不会有死锁。又或者,如果一开始是死锁,它会总是处于死锁中。这点在并发应用中是一个有力的保障,你可以更容易理解这些代码。数据流并发允许你编写顺序的代码,以并行处理这些操作。限制是你的代码应该是完全无副作用的。因为这些代码被用于处理有副作用操作时,你不能获得确定性的行为。
Dataflow在Akka中使用了Scala的分离延续(delimited continuations)编译器插件实现。要在SBT项目中使用该插件,在配置中添加下列内容:
1 | scalacOptions += "-P:continuations:enable" |
要用到数据流并发,必须用到数据流变量。一个数据流变量像一个单赋值(single-assignment)变量。一旦被绑定,不会再改变,以及之后任何新值的绑定将被忽略。下列例子定义了一个数据流变量:
1 | val messageFromFuture = Promise[String]() |
这里的Akka Promise
被用作创建一个数据流变量。一个Promise
为一个值的读处理,该值会在将来的某个时候被用到。任何数据流操作,会在Future.flow
块中被处理:
1 | Future.flow { |
上述调用会处于一个线程等待中,直到有一个值被绑定到messageFromFuture
。Future.flow
返回一个Future
,这样你可以处理其它操作,而不用阻塞主线程的执行。你可以把Future
认为是用于接收某些并发操作结果的数据结构。要为数据流变量指派一个值,使用<<
方法:
1 | Future.flow { |
一旦一个值被绑定到一个数据流变量,所有Future
将等待,该值将被非阻塞,以能继续执行。下面列出使用dataflow变量的一个完整例子:
1 | import akka.actor.ActorSystem |
下个小节将使用Akka的概念,构建一个应用,实践Akka并发。
¶12~3〖Building a real-time pricing system: Akkaoogle〗P357
略,请参考本书源码。
¶12~4〖Adding asynchronous HTTP support with Play2-mini〗P375
略,像Play2、Lift、Spray这些REST框架,基本上都封装了并发接口。Play2框架使用的是Netty作为NIO,包含大量易上手的工具和模版。请参考官网用户指导手册了解详情。
¶12~5 〖Summary〗P379
在构建框架和应用中,Akka是一个强大的工具。Akka使得并发对编程者来说更容易,它提升了一个抽象层次。Akka并发框架构建在Actor之上,但也提供了所有流行的并发抽象技术。使得你可以在构建下一代应用中有更多的选择。Akka的STM支持可变数据结构的操作,最重要的是,你可以通过组合小部分原子性操作来解决问题。另外我们还探索了另外一个并发模式Agent,它让你从外部发送行为,修改安全管理器内的数据。额外地,还学习了Dataflow并发,通过数据流,能够让我们编写流式的程序,而不用担心它们之间的并发问题,Dataflow并发代码也很好理解和掌握,但要注意dataflow不适用于处理带有副作用的操作。
Akkaoogle是一个典型的并发应用,通过构建该应用,探索了如何思考并发问题,如何约束,设计以及抉择方案。Akka提供丰富的选项、灵活的配置,你应该从Akka上,拾取一些更好的特性和选项,以更好地适应你的应用项目。Akka早已经实践在各种真实应用中,我们应该炙手可热尝试一番。