第九章:Scala之并发编程

主要内容:

  1. 并发编程挑战
  2. actor编程模型
  3. actor中的错误处理
  4. 并发编程中组合Future和Promise

在本章将介绍Scala中最为激动人心的特性:actor库。可以把一个actor认为是一个对象,该对象处理一个消息(请求)并封装状态(actor间的状态是不共享的)。接收一个消息,并在相应中执行一个动作的能力,这样的对象称为一个actor。更高层面上讲,actors是你做面向对象编程(OOP)时应该实现的方式。要记得actor模型鼓励不共享状态的体系架构。在本章,我将解析为什么在任何并发编程里面,这是一个重要的性质。

Future 和 Promise以非阻塞(nonblocking)的方式提供了执行并发操作的抽象。这是一个很好的方式来创建多并发和平行计算,以此计算你的工作(job)。这和你如何组合函数很相似,但,在这里,函数被并发(concurrently)地或平行(parallel)地执行。Future可以认为是一个代理对象,你可以为一个以后会用到结果进行创建。你可以用Promise有提供的结果来完成一个Future。我们将在本章循序渐进得探索Promise 和 Future。首先,让我们先理解下我所理解的并发、并行编程。

9~1〖What is concurrent programming?〗P256

并发,指的是当多于一个任务时,并在重叠的时间内,这些任务能够开始并完成。即使它们运行在同一个时刻(instant)也没有问题。你可以在单核CPU上编写并发程序,当且仅当只有一个任务可以执行在给定的时间点上。典型地,多个任务被执行在一个时间分片管理器(time-slice manager)上,该时间分片管理器就是指一个计划任务(例如JVM)将被确保每个处理在操作时间内被“切分”进行。这就给使用者感觉是并行的(parallelism)。普遍事实上的、标准的处理多任务应用的方式是使用线程。图Figure9.1展示了多任务应用是如何在单核CPU上共享的。

Figure 9.1

正如Figure 9.1,两个线程正在执行由时间分片管理器应用生成的指令。这些指令组的大小各不相同,因为在调度器(schedulers)从一个线程执行结束到执行另外一个线程之前,你不知道有多少指令会被执行。记住,在其它进程运行的同时,需要CPU时间,你可以看到它是那么的不可预测。当有超过一个线程处于就绪状态(ready-to-run state)时,调度器(schedulers)会使用一个优先级机制来安排一个线程的运行。这样一来,当你有指令在占用着资源时,例如从socket或文件系统读取数据。在这种情况下,尽管该线程有机会使用CPU,但它却不能使用因为它处于等待数据的状态,CPU处于闲置状态。我会在9.4小节重现这个问题。

大多数人交替地使用 并发(concurrency)并行编程(parallel programming) ,但有不同,在并行编程中,按字面上的意思,你可以在同一个时间运行多个任务,在多核处理器上是可行的。

一个并发编程,当它运行在多核环境上时,有时候是一个并行编程(在下个小节我将解释为什么并不总是)。这听起来棒极了,因为所有的CPU供应商都向多核CPU方向生产。但也给软件开发者摆出了难题,因为编写并发、并行应用是困难的。可以想象,当在并行模式中执行多任务应用,线程1需要来自线程2的数据,但这数据并不能用。在这种情况下,线程1会等待,直到它得到该数据,这样一来就不再是并行的了。你共享了线程越多的数据和状态,就越难在并行线程中对此进行管理。本章的全部内容,将尽量使你的并发编程运行在并行模式之中。

并发中另一个常用的术语是 分布式计算(distributed computing) 。这里定义的分布式计算是跨越网络的多个计算节点(计算机、虚拟设备),它们在一个给定问题上工作在一起。一个并行进程会作为一个分布式进程运行在一个多网络节点上。在第12章,你将会看到一个例子——当我们在一个远程节点上部署actors以实现网络交流。现在让我们看看用手头上的工具来解决并发问题以及与此有关的各种挑战。

Figure 9.2

9~2〖Challenges with concurrent programming〗P258

第一章讨论了我们当前所面临的危机并以摩尔定律结束。作为一个软件工程师,我们没有选择并应支持多核处理。CPU制造业早已经以构建多核CPU为方向。在未来将看到16核、32核以及64核。我们在企业软件开发中的问题种类也变得越来越大。面对处理能力快速增长的需求,我们必须规划出一个方案来吸收这些多核处理器的有点——否则,我们的编程将变得越来越低效。

但是要编写一个正确的、没有错误的并发程序有困难。一下列出其原因:

  • 只有一般的程序员知道如何编写一个正确的、并发的应用或程序。程序的正确性(correctness)非常重要。
  • 难于调试多线程程序。同样是死锁引起的问题,在本地调试时却不会有任何问题。有时则是好多年过去后才出现线程问题。
  • 线程间鼓励共享状态并发性,这使得程序会由于锁、信号量(semaphores)、线程间依赖的原因难于并行运行。

尽管多线程问题使得编写并发程序困难起来,但主要的罪魁祸首(culprit)是可变的状态(mutable state)。线程鼓励共享的状态并发。下一个小节将探讨共享状态并发有多难。

921〖Difficulties of shared-state concurrency with threads〗P258

使用线程的问题是并发性是一个很底层的抽象。线程太接近硬件,它表示了CPU的任务调度方式。你需要的是可以封装这些线程,并给你某些东西让你可以更容易地实现编程。以Scala的集合为例:特质Traversable定义了一个抽象方法叫def foreach[U](f: Elem => U),并让其它集合库类继承实现。可以想象,你仅仅需要使用foreach来做任何集合排序的操作,却不能使用其它map,fold,filters之类的方法。这样的话Scala编程将变得相当困难。这就是我尤其在线程问题上想说的:他们对于程序员来说太底层了。例如,在Java中,在引入java.util.concurrent包之前,我们只有java.lang.Thread,我们很难在其它类上实现并发应用。引入java.util.concurrent之后,事情变得明朗了。新的java.util.concurrent包中提供了一系列的非常有用的工具,并实现了流行的并发设计模式,尽管如此,我们仍然很难避免线程背后复杂的,来自可变共享数据的问题。它是一个设计上的问题,我们编程人员在使用线程时不得不处理该问题。为了防止数据损坏以及保持数据的一致性,我们使用了锁(locks)。我们使用锁来控制那些被修改和访问的共享数据,但是锁在编程中引入了问题。

Problem Description
锁不能组合 你不能通过组合小的线程安全的操作,实现一个高层次的、线程安全的行为
用了太多或太新的锁 你会在出现问题的时候才知道。这个问题可能不会在产品上显示(甚至好几年之后),获取和释放锁是个昂贵的操作。
死锁和竞价条件(race conditions) 这是线程不确定性的本质。几乎不可能有确定性的编程代码。你可以使用设计模式,例如总是在正确的顺序中获得锁,为了避免死锁,这种机制给程序开发者增加了更多的负担。
难于修复错误 这更多的是线程问题,而不是共享状态问题。尽管如此他也是个大问题。在多线程编程中还没有一个清晰的机制修复这种错误。一般都是查看日志堆栈里面反馈的跟踪信息。

最后但并不是最不重要的是,共享的可变数据使得它难于平行得运行程序,这已经在9.1小节讨论了。最大的问题是:如果线程难于使用,为什么多数程序要用到它?几乎所有多线程编码都有bug,但近期不会带来很大的问题。因为多核架构变得越来越流行,这些bug会出现得非常频繁。

线程问题应该留给一些少数的专家;即我们应该去找更高层的抽象,这个抽象隐藏了多线程的复杂性,并提供了一个易用的API。尽管如此,你也会遇到只能使用线程的场景,但99%的情况下你应该使用其它方式。这种变化会带来代价,这种代价就是我们所有人都要学习一种新的方式来编写或设计并发应用。本章将探索这其中一个新技术。

讨论了足够多的线程、锁问题了,让我们着手如何解决。下面列出三种最流行的并发应用实现。

name Description
软件事务内存(STM) STM是一个并发控制机制,和数据库的事务类似。STM控制了共享内存的访问,而不是处理表、行。一个STM事务执行一小段代码,该代码读取和编写一个共享内存。这种典型的实现了一个无锁的方式并且是可组合的。我会在第12章讨论更多有关STM方面的内容。
数据流并发 数据流并发背后的原理是,在多任务、多线程间共享变量。这些变量在它的声明周期中只被赋值一次。但这些变量中的值可以被读取多次,即使这些变量没有赋值。这给你的编程有更多的确定性,不会有竞价条件和确定的死锁。第12章在akka框架中覆盖数据流并发设计。
消息传递并发(message-passing concurrency) 这是本章节所花费大篇幅要讲的内容。该并发模型中,组件间通过发送消息进行通信。消息既可以是同步的,也可以是异步的,但异步发送信息给其它组件更常用。这些消息是不可变的,并有独立组件的状态中分离出来。你不用共享状态问题——事实上,消息传递并发鼓励非共享架构。最成功的例子是actor模型,自Erlang编程语言开始,成功地使用了actor模型构建大型的、分布式的、并行通信的应用后,actor模型变得流行起来。Scala的actor库中就是这个消息传递并发模型的另外一种实现。

本章剩余的部分将专注于使用Scala actor的消息传递并发内容,让我们现在开始。

9~3〖Implementing message-passing concurrency with actors〗P260

在该并发模型中,actor通过发送和接收消息彼此通信。一个actor处理收到的消息,并执行与之相关联的动作。典型地,这些消息是不可变的(immutable),因为它们没有共享状态。

在一个actor中,有两种主要的通信抽象层:发送和接收。要给一个actor发送消息,你可以使用下面的表达式:

1
a ! msg

你调用了 ! 方法,给actor a 发送了msg消息。当你给一个actor发送消息时,它是一个异步操作,以及它的调用会立即得到返回。这些消息被存储到一个先进先出的队列中。可以认为队列是一个用于存储actor收到的信息的邮箱,每个actor都有自己的邮箱。接收(receive) 操作被定义为一个消息匹配模式:

1
2
3
4
5
receive {
case pattern1 =>
...
case pattern =>
}

区别于其它对象,一个actor的不同之处在于用于对收到的消息给出响应动作。

Scala中以及包含有默认的actor库,从Scala 2.10开始,独立出来成为akka actors。虽然现在有许多actor库,但目前akka才是主流。

注意 由Scala 2.10.1发行开始,Scala的actor库被分离出来,并可能会在将来被删除。为了便于迁移,Scala提供了一个AMK(Actor Migration Kit,迁移包),以方便迁移旧版本的Scala代码到akka库中。

要创建一个actor,继承由Akka库提供的Actor特质,并实现里面的receive方法。下面例子创建了一个简单的actor:

1
2
3
4
5
6
7
case class Name(name: String)

class GreetingsActor extends Actor {
override def receive: Receive = {
case Name(n) => println("Hello " + n)
}
}

GreetingsActor仅能处理类型Name的消息,我将说明,当发送的消息不匹配任何模式时会发生什么。请注意,你不必要为case class创建消息,你可以发送任何可以在Scala模式匹配中可以匹配到的内容。例如,要匹配字符类的消息,你可以这样写:

1
case name: String => println("Hello " + name)

在发送任何消息给GreetingsActor之前,actor需要通过创建一个ActorSystem来初始化。可以认为一个ActorSystem是一个或多个actor的管理器。(ActorSystem在下一个小节介绍)这个 actor system提供了一个方法actorof,它接收一个配置对象(akka.actor.Props),以及一个可选参数name:

1
2
3
4
5
6
7
import akka.actor.Props
import akka.actor.ActorSystem
val system = ActorSystem("greetings")

val a = system.actorOf(Props[GreetingsActor], name = "greetings-actor")
a ! Name("Nilanjan")
system.shutdown()

actor系统会为actor的运行创建基础设施需求。当完成后,system.shutdown()会关闭基础设施和所有actor。消息被异步处理,因此system.shutdown()会停止那些还没有处理完消息的actor。在运行上述代码片段之前,请确保你的构建文件中已经包含以下依赖:

1
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.1.0"

下面为GreetingsActor的完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
object GreetingsActor extends App {

import akka.actor.{Actor, ActorSystem, Props}

case class Name(name: String)

class GreetingsActor extends Actor {
override def receive: Receive = {
case Name(n) => println("Hello " + n)
}
}

val system = ActorSystem("greetings")
val a = system.actorOf(Props[GreetingsActor], name = "greetings-actor")

a ! Name("Nilanjan")

Thread.sleep(50)
system.shutdown()
}

如果所有按计划进行, 你应该在控制台看到"Hello Nilanjan"的信息。祝贺你!你实现了你的第一个Scala actors。现在让我们回过头来,理解为什么我们需要一个actor系统。

931〖What is ActorSystem?〗P262

一个actor系统是一个actor结构层次组,这个组共享一个通用的配置。它同时也是创建和查找actors的入口。典型地,一个基于actor设计的应用就像一个真实世界里面的一个工作组织一样。在一个组织中,工作在部门间彼此传递。每个部门可能会进一步分工,直到成为可由雇员可以管理的大小。相似地,actors形成一个层次结构,该结构中父类委派给子类actor,直到它可以由一个独立的actor处理。

注意 一个ActorSystem是一个重量级的结构,允许1…N个线程,因此能为你的应用创建每一个逻辑子系统。例如,你可以有一个actor系统来处理后台数据库,另一个处理所有的web service调用,等等。Actors非常廉价。一个给定actor仅消费300字节,你可以轻松地创建成千上万个。

在结构层次的最顶部是家长actor,在每个actor系统中会自动创建。所有其它由给定的actor系统创建的actors都成为该家长actor的孩子。在actor系统中,每个actor都有自己的监护人(父actor)自动处理错误。因此,如果一个actor崩溃了,它的父actor会自动重启这个actor。

创建一个actor的最简单的方式是创建一个ActorSystem,并使用actorof方法:

1
2
val system = ActorSystem(name = "word-count")
val m: ActorRef = system.actorof(Props[SomeActor], name = "someActor")

这段代码片段创建了一个名为"word-count"的ActorSystem,方法actorof用于创建一个SomeActor类的actor实例。Props是一个线程安全的、共享的ActorRef配置对象。Props有大量的工具方法用来创建actors。

注意当你在akka中创建一个actor时,你不能获得该actor的直接引用。相反回调了一个ActorRef(actor的引用)的actor。ActorRef最重要的目地是给它所代表的actor发送消息。它也扮演一个保护层,这样你不能直接访问actor和改变它的状态。ActorRef是序列化的,因此如果一个actor崩溃了,作为一个错误处理机制,你可以序列化ActorRef,把它发送给另外一个节点,并重启actor。客户端的actor不会注意。有不同类型的actor引用。在本章我们会浏览本地的actor引用(意思是所有actor运行在本地的单一的JVM上);第12章会浏览远程的actor引用(actor运行在其它远程的JVM上)。

Figure 9.3

actor系统的第二部分是actor路径。一个actor的路径不唯一标识在actor系统中。因为actors创建在层次结构中,它们组织了和文件系统相似的结构。因为一个路径在一个文件系统中指向了一个独立的资源,一个actor路径在一个actor系统中也唯一地标识了actor的引用。注意这些actor不一定要在一个机器上——它们可以是分布式的、多节点的。使用定义在ActorSystem里面的方法,你可以查阅的一个actor引用了actor系统中的哪个存在监护人(supervisor)。下面例子是使用 system / 来获得 WordCountWorker 的引用:

1
2
3
4
5
6
7
8
class WordCountWorker extends Actor { ... }
...
val system = ActorSystem(name = "word-count")
system.actorOf(Props[WordCountWorker], name = "wordCountWorker")
...
val path: ActorPath = system / "WordCountWorker"
val actorRef: ActorRef = system.actorFor(path)
actorRef ! “some message”

system / 方法返回actor路径,以及方法actorFor返回该actor在给定path上的引用映射。如果actorFor在给定的path上查找失败,它会返回该actor系统的引用死信(dead-letter mailbox)。在所有消息未被释放时,它是一个合成的actor。

你可以从查找actors中重新创建actor path。可以在 这里 获取更多有关akka的详细内容。

要在actor系统中关闭所有的actors,调用shutdown方法,该方法会优雅地关闭actors系统中的所有actor。父actor会首先停止所有孩子actors并发送所有未处理的消息到死讯邮箱中。最后重要的一步是actor系统的消息调度器。MessageDispatcher是使所有actors工作的引擎。下一小节将介绍actors是如何工作的。

932〖How do Scala actors work?〗P264

每个actor系统都带有一个默认的MessageDispatcher组件。它的职责是给该actor的邮箱发送一个消息,并执行actor的receive块。每个MessageDispatcher被一个线程池支撑,该线程池使用配置文件配置(更多内容在第12章介绍)。你可以为你的actor系统或指定的actor配置不同类型的dispatchers。至于本章的内容将使用默认的dispatcher(a.k.a event-based dispatcher)。图9.4展示了actor内发送和接收消息是如何工作的。

向一个actor发送一个消息更加简单。要发送一个消息到一个actor的邮箱,ActorRef首先会发送消息到actor关联的MessageDispatcher中(大多数情况下,MessageDispatcher由actor系统配置)。该actor中的MessageDispatcher立刻为邮箱中的消息进行排序。控制立即回到消息的发送方。这就是它如何工作的,当我们发送一个消息到greeting actor时。

Figure 9.4

处理一个消息包含更多的调用过程,让我们继续图Figure 9.4的内容:

  1. 当一个actor在它的邮箱中接收到一个消息时,MessageDispatcher为该actor安排执行计划。发送和处理消息发生在两个不同的线程。在线程池中,如果一个线程可以,则该线程被选择用来执行该actor。如果所有线程都处于繁忙状态,actor会在线程可用时执行。
  2. 可用线程从邮箱中读取消息。
  3. 某一时刻,该actor的receive方法被调用来传递一个消息。

消息调度器(message dispatcher)总是确保一个单一线程总是执行一个给定的actor。该线程可能不是同一个,但总是有一个。这大大地确保了在并发世界里面,你可以安全地使用一个actor里面的可变状态(mutable state),只要它不是共享的。现在,我想我们应该使用actor来构建一个应用程序。

933〖Divide and conquer using actors〗P266

下述例子,要求实现从一个目录的所有文件中,统计单词的个数,并按升序排列。一种实现方式是,单线程下在该目录中遍历所有文件,在每个文件统计单词个数,最后将它们排序。但这是实现是顺序的(sequential)。要实现并行处理,我们要通过actor实现分治模式(divide-and-conquer,也称为fork-join)。我们将会有一些列的worker actor处理独立文件,以及一个主要的actor进行排序和计算结果。

Actor API

特质akka.actor.Actor定义了唯一的抽象方法receive 来实现actor的行为。Actor额外定义的方法,对于声明周期钩子和故障处理很有帮助。下面列出一些比较重要的方法。(请查阅scaladoc获得完整的方法列表)

1
def unhandled(message: Any): Unit

当给定的message不能匹配到actor方法receive中的任何一种模式时,unhandled方法会由akka.actor.UnhandledMessage 被调用执行。该方法默认是将message发布给actor系统的事件流(event stream)。你可以配置该事件流,并在日志文件中打印该unhandled messages。

1
val self: ActorRef

这个字段持有对该actor的引用,你可以使用self来给自身发送一个小心:

1
final def sender: ActorRef

该Actor的ActorRef会发送最后接收到的message。当你想要message发送方进行回复时,这会非常有用:

1
val context: ActorContext

这里为actor、message以及创建子actors的工厂方法,提供了一个上下文信息。该上下文也提供了对该actor系统的方法、以及其它actors声明周期钩子的监控。

1
def supervisorStrategy: SupervisorStrategy

该观察者定义了当探测到一个actor失败时将要如何处理,你可以重载并定义自己的观察者模式。我们会在本章中覆盖介绍。

1
def preStart()

该方法会在一个actor第一次启动时调用,它在所有message被处理之前执行。该方法可以用于该actor资源或函数的初始化操作。

1
def preRestart()

Actors可能会在处理message发生错误时重启,该方法在该actor的当前实例中被调用。这里要澄清的是,默认实现是停止所有的子actor并调用postStop 方法。

1
def postStop()

而该方法则是在当前actor实例被停止是调用执行。

1
def postRestart()

当一个actor被重启,旧的actor实例被丢弃,新的actor实例通过actorOf方法被创建。然后postRestart >在该新的实例上被调用。默认实现是调用preStart方法,它和preRestart的调用类似。

为了使用actor来解决单词的计数问题,你将会创建两个actor类:一个用于浏览该目录的所有文件并计算结果,叫做WordCountMaster;另外一个叫做WordCountMaster,用于统计每个文件里面的单词。还有一点要考虑的是message会在actor间通信。首先你需要一个message用于初始化目录文件的计数,以及初始化actor的数量:

1
case class StartCounting(docRoot: String, numActors: Int)

docRoot指定文件的位置,numActors会创建worker actors的个数。主程序会通过传递该消息给主actor开始计数处理。WordCountMaster 和 WordCountWorker彼此间通过消息进行交流。WordCountMaster需要一个message用于发送文件目录给worker actor进行计数,以及需要一个message携带单词计数信息和文件目录名,返回给主actor。下面是这些消息:

1
2
case class FileToCount(fileName: String)
case class WordCount(fileName: String, count: Int)

为了理解这些消息是如何消费(consumed)的,看图 9.5 。该图仅仅显示了一个worker actor,但其它worker actor的数量取决于你在StartCounting中定义的个数。

Figure 9.5

让我们开始 WordCountWorker,这是最容易的一个。这个actor仅处理FileToCount类型信息,该动作关联了 打开文件、统计该文件的单词个数 的消息。统计文件的单词个数,实际上和你之前看到的进程的例子类似:

1
2
3
4
def countWords(fileName:String) = {
val dataFile = new File(fileName)
Source.fromFile(dataFile).getLines.foldRight(0)(_.split(" ").size + _)
}

你使用scala.io.Source来打开文件,统计该文件的所有单词——这是最直接的做法。现在进入到最有趣的环节:receive方法。你已经知道需要处理哪些消息,你需要担心的是,当统计完单词,向WordCountMaster回复。

好消息是,Akka actor在运行时隐式地给sender引用的actor发送每个消息:

1
2
3
4
5
def receive {
case FileToCount(fileName:String) =>
val count = countWords(fileName)
sender ! WordCount (fileName, count)
}

在回复中,你将WordCount消息回发给WordCountMaster 的 actor。

What if an actor performs a blocking operation ?

通常推荐的做法是,你不用从actor处理任何阻塞操作。当你在一个actor中创建一个阻塞时,你也阻塞了一个线程。前面说过,线程是有限资源。所以如果你最终有许多阻塞的actor,你不久将耗尽这些线程,导致actor系统停止。

有时你不会有阻塞之外的其它选择。这种情况下,推荐的做法是,分配不同的 message dispatchers将阻塞actors 从非阻塞actor中分离出来。这里提供了灵活的配置:附加额外的进程进行阻塞分离、吞吐量等等。额外添加这些的好处是,如果系统的一部分由于消息线程繁忙发生过载,其它部分会依然正常工作。

下面是完整的 WordCountWorker 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class WordCountWorker extends Actor {
def countWords(fileName: String) = {
val dataFile = new File(fileName)
Source.fromFile(dataFile).getLines.foldRight(0)(_.split(" ").size + _)
}

def receive = {
case FileToCount(fileName: String) =>
val count = countWords(fileName)
sender ! WordCount(fileName, count)
}

override def postStop(): Unit = {
println(s"Worker actor is stopped: $self")
}

}

这里的方法postStop被重构用于当actor停止时,打印消息到控制台上,这不是必要的。我们用这个来debug消息,确保actor被正确停止。当前的WordCountWorker 仅仅返回FileToCount消息。当它接收到消息,它会统计文件里面单词的个数,并告知主actor对其排序。其它消息会被丢弃,并被unhandled方法处理,因为它有下面提及的方面决定。

What is ActorDSL?

如果你熟悉旧的Scala actors,ActorDSL看起来和Scala actor十分相似。在akka actor库中这是额外新添加的,可以帮助用于创建一次性的workers或在REPL下工作。要导入DSL的所有特性:

1
import akka.actor.ActorDSL._

要创建一个简单的Actor,使用定义在ActorDSL里面的方法 actor,并传递一个Act特质的实例:

1
2
3
4
5
val testActor = actor(new Act{
become {
case "ping" => sender ! "pong"
}
})

方法become 方法添加了该actor需要处理的 message pattern。后台 Act继承了Actor特质,become方法添加了receive块的行为。使用该DSL语法,你不再需要创建一个类。下面是两个actor相互通信的例子,通过发送 ping-pong 信息交流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
object ActorDSLExample extends App {
import akka.actor.ActorDSL._
import akka.actor.ActorSystem

implicit val system = ActorSystem("actor-dsl")

val testActor = actor(new Act {
become {
case "ping" => sender ! "pong"
}
})
actor(new Act {
whenStarting { testActor ! "ping" }
become {
case x =>
println(x)
context.system.shutdown()
}
})
}

该actor系统被声明为implicit值,因此我们不需要显式传递它的方法。whenStarting是该DSL生命周期prestart方法的钩子。

WordCountMaster接收到StartCounting消息后开始计数。该消息包含了两个参数,目录名、worker actor的个数。要浏览目录中的文件,使用java.io.File类中定义了list方法,列出目录中的所有文件:

1
2
private def scanFiles(docRoot: String) =
new File(docRoot).list.map(docRoot + _)

map方法用于带完整路径的list集合。这样,就不用担心子目录。要创建 worker actor,我们使用了numActors参数传递给StartCounting消息来创建多个actor:

1
2
3
4
private def createWorkers(numActors: Int) = {
for (i <- 0 until numActors) yield
context.actorOf(Props[WordCountWorker], name = s"worker-${i}")
}

因为worker actor是WordCountMaster的孩子,context.actorOf工厂方法被使用。

要实现排序,我们需要一个方法遍历所有文件名,并发送一个FileToCount消息给这些worker actor。因为要处理的文件的数量比worker actor的数量多,file以循环的方式发送给每个actor:

1
2
3
4
5
6
private[this] def beginSorting(fileNames: Seq[String],
workers: Seq[ActorRef]) {
fileNames.zipWithIndex.foreach( e => {
workers(e._2 % workers.size) ! FileToCount(e._1)
})
}

方法zipWithIndex对每个集合元素加索引配对,下面是一个例子:

1
2
scala> List("a", "b", "c").zipWithIndex
res2: List[(java.lang.String, Int)] = List((a,0), (b,1), (c,2))

当 WordCountMaster 接收到 StartCounting消息,将会创建worker actors并浏览文件,并将这些文件发送给每个worker。下面是 WordCountMaster 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class WordCountMaster extends Actor {

var fileNames : Seq[String] = Nil
var sortedCount: Seq[(String, Int)] = Nil

def receive = {
case StartCounting(docRoot, numActors) =>
val workers = createWorkers(numActors)
fileNames = scanFiles(docRoot)
beginSorting(fileNames, workers)

case WordCount(fileName, count) =>
sortedCount = sortedCount :+(fileName, count
sortedCount = sortedCount.sortWith(_._2 < _._2)
if (sortedCount.size == fileNames.size) {
println("final result " + sortedCount)
finishSorting()
}
}

override def postStop(): Unit = {
println(s"Master actor is stopped: $self")
}

private def createWorkers(numActors: Int) = {
for (i <- 0 until numActors) yield context.actorOf(Props[WordCountWorker], name = s"worker-$i")
}

private def scanFiles(docRoot: String) =
new File(docRoot).list.map(docRoot + _)

private[this] def beginSorting(fileNames: Seq[String], workers: Seq[ActorRef]) {
fileNames.zipWithIndex.foreach(e => {
workers(e._2 % workers.size) ! FileToCount(e._1)
})
}

private[this] def finishSorting() {
context.system.shutdown()
}
}

字段fileNames存储了所有我们需要处理的文件。我会最后将会使用这个字段来确保我们已经接收了所有的回复。sortedCount被用于存储结果。有一点要重要提醒的是,这里使用可变状态是安全的,因为actor系统会确保不会存在两个线程在同一时刻执行同一个actor的实例。但你需要确保 可变状态不会泄漏 在该actor系统的外部。

接下来,WordCountMaster需要处理来自WordCountWorker发送的WordCount消息。该消息包含了文件名和单词统计数。该信息被存储在sortedCount中,并被排序:

1
2
3
case WordCount(fileName, count) =>
sortedCount ::= (fileName, count)
sortedCount = sortedCount.sortWith(_._2 < _._2)

最后一步是当所有文件被处理完后,终止操作。一种实现方式是,比较sortedCount的大小和文件个数,并在控制台上打印输出终止的actor信息:

1
2
3
4
if(sortedCount.size == fileNames.size) {
println("final result " + sortedCount)
finishSorting()
}

我们可以使用context.children来访问所有的worker actors并使用如下方式终止:

1
context.children.foreach(context.stop(_))

关闭一个actor系统的最简单方式是使用该actor系统的shutdown方法,我们可以通过context.system如下的方式来访问actor系统:

1
2
3
private[this] def finishSorting() {
context.system.shutdown()
}

下面列出 WordCountWorker 和 WordCountMaster 这两个actor的完整实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import akka.actor.{Actor, ActorRef, Props}
import java.io._
import scala.io._

case class FileToCount(fileName: String)
case class WordCount(fileName: String, count: Int)
case class StartCounting(docRoot: String, numActors: Int)

class WordCountWorker extends Actor {
def countWords(fileName: String) = {
val dataFile = new File(fileName)
Source.fromFile(dataFile).getLines.foldRight(0)(_.split(" ").size + _)
}

def receive = {
case FileToCount(fileName: String) => // [7]. WordCountWorker 接收到FileToCount,开始countWords
val count = countWords(fileName)
sender ! WordCount(fileName, count) // [8]. sender 为 WordCountMaster
}

override def postStop(): Unit = {
println(s"Worker actor is stopped: $self")
}

}

class WordCountMaster extends Actor {

var fileNames : Seq[String] = Nil
var sortedCount: Seq[(String, Int)] = Nil

def receive = {
case StartCounting(docRoot, numActors) => // [1]. 主程序进入receive方法,匹配StartCounting
val workers = createWorkers(numActors) // [2]. 根据参数创建worker actor个数
fileNames = scanFiles(docRoot) // [3]. 列出目录下的所有文件名,存于Seq容器中
beginSorting(fileNames, workers) // [4]. 排序

case WordCount(fileName, count) => // [9]. WordCount
sortedCount = sortedCount :+(fileName, count)
sortedCount = sortedCount.sortWith(_._2 < _._2) // [10]. 对容器Seq[(String,Int)] 的 (String,Int)._2 排序
if (sortedCount.size == fileNames.size) {
println("final result " + sortedCount)
finishSorting() // [11]. 结束上下文
}
}

override def postStop(): Unit = {
println(s"Master actor is stopped: $self")
}

private def createWorkers(numActors: Int) = {
for (i <- 0 until numActors) yield context.actorOf(Props[WordCountWorker], name = s"worker-$i")
}

private def scanFiles(docRoot: String) =
new File(docRoot).list.map(docRoot + _)

private[this] def beginSorting(fileNames: Seq[String], workers: Seq[ActorRef]) {
fileNames.zipWithIndex.foreach(e => { // [5]. foreach 中 e._1 为 循环变量 , e._2 为 index
workers(e._2 % workers.size) ! FileToCount(e._1) // [6]. e._2 % workers.size平均分配任务,WordCountWorker开始receive
})
}

private[this] def finishSorting() {
context.system.shutdown()
}
}

WordCountWorker 和 WordCountMaster都被定义为actor。两者之间的交互发生于不可变的消息(immutable message)。当WordCountMaster接收到StartCounting消息,它便创建了给定数目的worker actor。一旦这个actor被启动,WordCountMaster发送FileToCount消息给所有worker actor。当worker actor完成了文件单词的统计后,它便想主actor发送WordCount消息。当sortedCount的大小和文件数目刚好相等,停掉所有的actor操作。

剩下来的内容就是Main方法还没有写。现在,不在创建一个新的actor,而是创建一个带Main方法的对象。

1
2
3
4
5
6
7
8
9
10
import akka.actor.{ActorSystem, Props}

object Main {
def main(args: Array[String]) {
val system = ActorSystem("word-count-system")

val m = system.actorOf(Props[WordCountMaster], name="master")
m ! StartCounting("src/main/resources/", 2)
}
}

本小节中,你学习了关于actor的许多有趣的内容。并学习了如何使用actor来设计你的应用。创建一个独立的不可变的消息(self-contained immutable message),并终止actor之间的相互通讯,是actor操作的重要部分内容。对于理解“所有通信通过消息传递,并且只能由消息传递”同样重要。这引出了actors和OOP的相似之处。当 Alan Kay最先以OOP方式时,他的伟大想法是 “消息传递”。事实上,actor的操作有更多的面向对象。

假如发生错误会怎样?在并发/并行的编程世界里面许多事情都会出错。如果我们在读取文件时,出现IOException怎么办?下面让我们在基于actor的应用中,如何来处理这些错误。

933〖Fault tolerance made easy with a supervisor〗P274

akka鼓励非防御型的编程,也就是说,在应用声明周期中,即使是失败,也是一个有效的状态。作为一个编程人员,我们不能保留每个错误,所以,最好为你的应用准备errors。你可以通过akka提供的观察者层次结构(supervisor hierarchy)实现容错支持(fault-tolerance support)。

可以把这个观察者认为是一个actor,它链接了观察actors,以及当其中一个actor死亡后进行重启。一个观察者的职责是启动、终止、监控子actor。它和链接的机制相同。但akka提供了更好的抽象,叫做 监管策略(supervision strategies)

图 9.6展示了一个观察者层次结构的例子。

Figure 9.6

你不会被限制于一个观察者。你可以有一个观察者链接到另外一个观察者。这样你可以观察另外一个观察者是否处于崩溃情况。很难用一个机箱来建立一个容错系统,因此我建议你在跨平台上有观察者层次的延伸。这样,如果一个节点(机器)坏了,你可以在不同的机箱中重启一个actor。谨记,,请委派工作,这样如果一个崩溃发生了,其它观察者可以恢复。现在让我们看看akka里面实现的容错策略。

SUPERVISION STRATEGIES IN AKKA

akka带来了两个重启策略:One-for-One 和 All-for-One。在One-for-One策略中。如果一个actor死亡,将重新创建。如果actor在系统中是独立的,这策略很棒。它不依赖其它正常工作的actor。

Figure 9.7

如果在一个工作流中有多个actor参与,重启一个单独的actor可能会不起作用。这种情况下,使用 All-for-One 重启策略,即被观察者观察的一个actor死亡,则该观察者观察的所有actors都将被重启。

Figure 9.8

那么,这些在代码中怎样表示?在akka,默认地,每一个actor都有一个观察者,以及父actor为子actor的观察者。当没有观察者被定义,将使用默认策略(OneForOne),即是在子actor有Exception失败时重启。下面例子配置了WordCounterWorker的OneForOneStrategy:

1
2
3
4
5
6
7
8
9
import akka.actor.SupervisorStrategy._
class WordCountWorker extends Actor {
. . .
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3,
withinTimeRange = 5 seconds) {
case _: Exception => Restart
}
. . .
}

你用你自己的故障处理方式,重载了actor的supervisorStrategy属性。例如,在java.lang.Exception情况,你的模式将会匹配,并直接让父actor丢弃旧的actor实例,并用一个新的actor实例代替。如果没有模式匹配,故障被升级为父actor。类似地,下面例子配置了WordCounterMaster使用AllForOneStrategy:

1
2
3
4
5
6
7
8
9
class WordCountMaster extends Actor {
. . .
override val supervisorStrategy = AllForOneStrategy() {
case _: Exception =>
println("Restarting...")
Restart
}
. . .
}

一个带有观察者的单词统计的例子在本章的代码中。下一小节讨论在并发环境中可变数据的工作实现。

9~4〖Composing concurrent programs with Future and Promise〗P276

一个Future对象可以存放一个可用的值,顾名思义,Future就是之后的时间。它本质上扮演了一个真实值的代理,该值暂时不存在。通常该值由一些异步计算执行产生。创建一个 Future 最简单的方式就是使用apply方法:

1
2
3
def someFuture[T]: Future[T] = Future {
someComputation()
}

这里 someFuture 存储计算结果,以及T表示结果的类型。因为Future是被异步执行,我们需要指定scala.concurrent.ExecutionContext.ExecutionContext是一个抽象于线程池的,所有执行的任务被提交到该线程池。这里的任务,由Future计算处理。有许多方式来配置并创建ExecutionContext,但本章我们将使用默认的、全局的scala库提供的执行上下文:

1
import ExecutionContext.Implicits.global

当Future有值,它被认为是完整的。它也可以用一个exception表示。要在Future被完成后处理操作,我们可以使用onComplete回调函数:

1
2
3
4
someFuture.onComplete {
case Success(result) => println(result)
case Failure(t) => t.printStackTrace
}

因为Future可以是一个成功或失败状态,onComplete允许你同时处理这两种情况。(查阅 scala.concurrent.Future scaladoc 有关内容)

Future 也可以通过 Promise 创建。可以把 Promise 认为是一个可写的、单任务的容器。你可以用 Promise 来创建一个 Future,当 Promise 被一个值填充,Future被完成:

1
2
3
4
5
6
7
8
9
10
val promise: Promise[String] = Promise[String]()
val future = promise.future
...
val anotherFuture = Future {
...
promise.success("Done")
doSomethingElse()
}
...
future.onSuccess { case msg => startTheNextStep() }

这里我们创建了两个Future,一个使用Future的方法,另一个由Promise创建。anotherFuture 完成 promise 由调用 success 方法实现(也可以使用 failure 方法完成 promise)。一旦 promise 被完成,你不能再调用 success。否则,将抛出一个异常。promise 会自动完成 future,以及 onSuccess 回调方法被自动调用。请注意,Future注册的回调方法,在 future 完成后仅被执行一次。Scala的 Future 和 Promise API有许多非常有用的方法,请检查scala文档了解更多内容。

现在你可能想知道什么时候使用 Future 什么时候使用 actor。一个通常使用 Future的情况是,处理一些并发计算,但又不需要actor做额外的效用。Scala Future库最引人注目的特性是,它允许我们组合并发操作,这在actor中很难实现。让我们使用Future 和 Promise实现单词计数问题。

941〖Divide and conquer with Future〗P278

现在使用Future重新实现单词计数问题。现在我们将单词计数问题分割成几个步骤,以便独立解决每个问题。因为Future允许函数式的组合,我们应该组合这些步骤来解决我们的问题。我们将该问题分成4步:

  1. 在给定的目录中,浏览所有文件
  2. 对给定的一个文件进行单词统计
  3. 累加和排序结果
  4. 生成结果

我们已经知道如何浏览给定目录中的文件,但这次希望异步处理:

1
2
3
private def scanFiles(docRoot: String): Future[Seq[String]] = Future { 
new File(docRoot).list.map(docRoot + _)
}

类似地,在Future内部对给定的文件进行单词计数。如果出现问题,我们使用 recover 方法注册回调:

1
2
3
4
5
6
7
8
9
10
private def processFile(fileName: String): Future[(String, Int)] =
Future {
val dataFile = new File(fileName)
val wordCount = Source.fromFile(dataFile).getLines.foldRight(0)(_.split(" ").size + _)
(fileName, wordCount)
} recover {
case e: java.io.IOException =>
println("Something went wrong " + e)
(fileName, 0)
}

当在Future内部出现 IOException异常时,recover回调被执行。因为每个文件在Future内部被处理,我们最终会得到一个 futures 集合:

1
val futures: Seq[Future[(String, Int)]] = fileNames.map(name => processFile(name))

问题来了。我们怎么知道所有的Future会完成?我们不可能为每个future注册回调,因为每个都是独立的、并且完成时间也不相同。如其是Seq[Future[(String, Int)]],我们更希望得到的是Future[Seq[(String, Int)]],这样我们就可以对结果进行累加和排序。这实际上就是Future.sequence被设计的原因。它接收一个futures集合,并得到一个Future:

1
val singleFuture: Future[Seq[(String, Int)]] = Future.sequence(futures)

你可以调用map方法,并进行排序:

1
2
3
4
5
6
7
8
9
10
11
private def processFile(fileName: String): Future[(String, Int)] =
Future {
val dataFile = new File(fileName)
val wordCount = Source.fromFile(dataFile).getLines.foldRight(0)(_.split(" ").size + _)
(fileName, wordCount)
} recover {
// recover 注册回调,用于失败处理
case e: java.io.IOException =>
println("Something went wrong " + e)
(fileName, 0)
}

如果你没有猜到,Future是一个monad例子。它实现了map、flatMap以及filter操作,以及必要的函数式组件。现在你可以组合 scanFiles 和 processFiles 产生排序结果:

1
2
3
4
5
6
7
8
val path                 = "src/main/resources/"

val futureWithResult: Future[Seq[(String, Int)]] = for {
files <- scanFiles(path)
result <- processFiles(files)
} yield {
result
}

这个的 for-comprehensions 组合了scanFiles 和 processFiles,并产生另外一个future。注意这里每个操作的处理都是异步的,我们以一种非阻塞的方式组合这些 futures。这里的 for-comprehensions 创建了另外一个future仅在scanFiles 和 processFiles 同时完成时创建。它在两个操作之间也扮演了管道的角色,即当scanFiles的所有输出发送给processFiles时。

最后一步,我们可以用一个Promise实现,当futureWithResult完成时。下面是使用Future实现单词统计例子的完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.io.File

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.io.Source
import scala.util.{Failure, Success}

object Main extends App {
val promiseOfFinalResult = Promise[Seq[(String, Int)]]()
val path = "src/main/resources/"

val futureWithResult: Future[Seq[(String, Int)]] = for {
files <- scanFiles(path)
result <- processFiles(files)
} yield {
result
}

futureWithResult.onSuccess { case r => promiseOfFinalResult.success(r) }

promiseOfFinalResult.future.onComplete {
case Success(result) => result foreach (a => println(a._1.substring(1 + a._1.lastIndexOf('/'), a._1.length) + " -> " + a._2))
case Failure(t) => t.printStackTrace()
}

private def processFiles(fileNames: Seq[String]): Future[Seq[(String, Int)]] = {
val futures: Seq[Future[(String, Int)]] = fileNames.map(name => processFile(name))
val singleFuture: Future[Seq[(String, Int)]] = Future.sequence(futures)
singleFuture.map(r => r.sortWith(_._2 < _._2))
}

private def processFile(fileName: String): Future[(String, Int)] =
Future {
val dataFile = new File(fileName)
val wordCount = Source.fromFile(dataFile).getLines.foldRight(0)(_.split(" ").size + _)
(fileName, wordCount)
} recover {
case e: java.io.IOException =>
println("Something went wrong " + e)
(fileName, 0)
}

private def scanFiles(docRoot: String): Future[Seq[String]] = Future {
new File(docRoot).list.map(docRoot + _)
}

}

你可以看到非常容易地开始使用Future,以及它非常强大,因为它允许你做函数式的组合。另一方面,actor允许你构造你的应用,并提供了一个故障处理策略。你不必要在它们两者之间选择。你可以把你的应用分割为actor,然后这些actor可以使用Future的构建块来处理异步操作。下一小节,我们将看看如何在actor内部使用Future。

942〖Mixing Future with actors〗P280

在使用Akka actor工作有两种普遍的模式调用:

  1. 给一个actor发送消息,并从这个actor接收消息。我们使用了一个“即发即弃(fire-and-forget)” 方法 !。同样,获得一个响应也是一个常用的情况。(a.k.a ask pattern)
  2. 并发任务(Future)完成(a.k.a pipe pattern),给发送方回复。

让我们以一个例子来证明这两个模式。下面代码片段,我们有两个actor,一个父actor和子actor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
implicit val timeout = Timeout(5.seconds) 

class GreetingsActor extends Actor {
val messageActor = context.actorOf(Props[GreetingsChildActor])
def receive = {
case name =>
val f: Future[String] = (messageActor ask name).mapTo[String]
f pipeTo sender
}
}

class GreetingsChildActor extends Actor {
def receive = {...
}
}

GreetingsActor接收 name 然后发送消息给子actor生成一个消息。在这里我们使用ActorRef的ask方法(你也可以使用 ? 表示)发送和接收消息。因为消息是异步处理的,ask方法会返回一个Future。mapTo 消息允许用户将消息从Future[Any]转换为Future[String]。问题是我们不知道消息什么时候准备好,以让我们回复给发送者。pipeTo 解决了这个问题,通过连结Future,只要当Future完成,便接收future内部的响应内容并向发送者发送消息。要看完整代码,请查阅本章基础代码内容。

9~5〖When should you not use actors?〗P281

本章着重讲述了actor在构建面向消息并发的应用中的构建。我们也讨论了可变数据是引起大多数并发问题之根本,以及actor如何消除使用这些非共享的构建。但我们如何在多个组件间进行状态共享?

  • Shared State —— 一个典型例子,你想要将纸币进行转换,你想要在不同的应用之间有一致的观念。你需要更多的不仅是actor,你需要事务支持。如STM(软件事务内存)是解决这类问题的一个很好的选择,或者你需要在消息传递间构建事务。(第12章会看到相关例子)

  • Cost Of asynchronous programming —— 对于大多数编程开发者,要习惯异步编程,是一个思考模式的转变。它花费时间和精力来熟悉你未曾用过的。调试和测试大型的面向消息的应用是困难的。同时,异步消息的传递,使得它难于跟踪和隔离一个问题。这还不是别的,特别是使用actor model,更多的是继承了基于消息应用的复杂性。之后的 Akka TestKit 和 Typesafe console将帮助迁移这些测试和调试问题。

  • Performance —— 如果你的应用需要很高的性能处理,因为actors可能会增加开销,你可能最好需要用一个更底层的抽象,如线程。但相反,99.9%的应用,我认为actors的性能已经足够。

9~6〖Summary〗P281

本章你学习了一个新的并发编程趁势,以及状态共享的并发问题处理。它清晰地指出,如果你建立更高的抽象层次,你就更容易构建并发应用程序。本章关注消息间传递的并发性,以及如何使用actor来实现。

我们也学习了关于Future 和 Promise 方面的内容,以及如何使用函数式的组件由组合小的并发操作 构建 大型的程序。构建容错性的应用的最大挑战在于,如何高效地处理错误并修复它们。通过学习观察者策略,解决了这些基于actor应用 的错误。这使得构建一个长期运行的应用能够自动化地从错误和异常中修复。

下一章将开始关注于Scala 应用的单元测试,以及探索如何通过各种各样的有用的工具来帮助编写Scala单元测试。这部分对应你来说不难。它还将介绍如何编写actor的单元测试。


  1. Java thread states (download), http://mng.bz/w1VH.
  2. Herb Sutter, “The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software,” December 2004, CPU trends graph updated August 2009, www.gotw.ca/publications/concurrency-ddj.htm.
  3. Multicore CPU trend (graphic), www.gotw.ca/images/CPU.png.
  4. See “The Scala Actors Migration Guide,” http://docs.scala-lang.org/overviews/core/actors-migrationguide.html.
  5. See “Actor References, Paths, and Addresses,” version 2.1.0, http://doc.akka.io/docs/akka/2.1.0/general/addressing.html.
  6. Brian Goetz, “Java theory and practice: Stick a fork in it, Part 1,” developerWorks, Nov. 13, 2007, http://mng.bz/aNZn.
  7. Alan Curtis Kay, http://en.wikipedia.org/wiki/Alan_Kay.
  8. Alan Kay, “Prototypes vs. classes was: Re: Sun’s HotSpot,” Oct 10, 1998, http://mng.bz/L12u.
  9. “Testing Actor Systems (Scala), TestKit Example,” http://doc.akka.io/docs/akka/2.1.0/scala/testing.html.
  10. “Typesafe Console,” http://typesafe.com/products/console.