时至今日,我们从因特尔上消遣服务包含许多流媒体数据(streaming data),包括下载、上传、点对点的数据传送。把数据整体看做是一个元素集合的流(a stream of elements),对我们计算机发送和接收这些数据起了很大帮助,因为数据变得越来越庞大,以“流”的方式处理数据显得很有必要。
Actors看起来也可以处理“流”:它们顺序地接收一系列消息,并对消息进行传输。但我们发现,在actor之间要实现一个stable streaming,会显得冗余乏味(tedious)和易出错(error-prone),因为在处理发送(sending)和接收(receiving)时,我们还需要担心buffer溢出或邮箱溢出问题。另外一个陷阱(pitfall)是,Actor的消息会丢失,对丢失的消息要进行转发,以免stream一直停留在receiving的那一方。当处理完streams,Actor不能担保不会有连接(wiring)错误的出现。
基于这些原因,Akka Stream API便被提出。目的是提供一个直观的、安全的方式来**定制(formulate)**流处理,使我们在资源限制使用的情况(即控制内存溢出的情况)下高效处理。那么如何实现?akka stream实现了一个有 “back-pressure” 的特性,它来源于 Reactive Streams ,akka是该规范的初始成员。
Reactive Streams规范实现只有4个接口:
Publisher
1 | public interface Publisher<T> { |
Subscriber
1 | public interface Subscriber<T> { |
Subscription
1 | public interface Subscription { |
Processor
1 | public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { |
该协议的主要目标是实现 back-pressure
、async
、non-blocking boundaries and interoperability
。
back-pressure
:反作用力,具体实现细节为,Publisher
实现了一个subscribe
方法,意思是,Publisher发布者拥有对Subscriber订阅的功能,所以Publisher本身也是个Subscriber,只是它仅能对唯一一个对象进行订阅——Subscriber。反作用力体现在,Publisher
可以由该方法,得知Subscriber
的订阅能力,由此控制“订阅”的“速度”,不会超出“发布”的速度很多,以此控制避免内存溢出等问题。async
:异步。可以看到这4个接口的方法都没有返回值,所以这些方法实现以非阻塞方式控制,实现对流的异步操作处理。non-blocking
:非阻塞。Subscriber
除了onSubscribe
外,包含有onNext
方法,当一个Element处理完成(异步方式),调用onNext
方法,处理下一个Element,不会阻塞整个操作(出现异常或错误,具体看代码实现细节,akka stream对错误处理为supervision strategy方式,删除出错的element,继续处理下一个onNext,参考kafka设计)。
¶快速开始
- Source: something with exactly one output stream
- Sink: something with exactly one input stream
- Flow: something with exactly one input and one output stream
- BidiFlow: something with exactly two input streams and two output streams that conceptually behave like two Flows of opposite direction
- Graph: a packaged stream processing topology that exposes a certain set of input and output ports, characterized by an object of type Shape.
一个stream通常以一个source开始,我们需要引入相应的包
1 | import akka.stream._ |
另外还需引入常用的执行关联包
1 | import akka.{ NotUsed, Done } |
以及Main方法
1 | object Main extends App { |
一个最简单的Source
,包含1 to 100整数
1 | val source: Source[Int, NotUsed] = Source(1 to 100) |
其中Source
参数化两个类型:第一个是Int
为该source 发射元素的类型,第二个为运行该source可能产生的 辅助值(auxiliary value)
(如网络source会得到端口号或地址),由Sources
和Sinks
运行后产生的auxiliary value,术语上被称作materialized value
。不产生辅助信息时,用akka.NotUsed
表示——这里例子的整型肯定不会产生辅助信息。
创建了source,我们便可以发射这100个自然数,但Source
没有激活。因此我们需要执行:
1 | source.runForeach(i => println(i))(materializer) |
这行执行在一个consumer 函数完成——以包含“run”的方法名执行;包含其它类似的方法。
在App执行该代码时,程序并没有终止退出,因为ActorSystem
没有终止。runForeach
返回一个Future[Done]
表示stream完成
1 | val done: Future[Done] = source.runForeach(i => println(i))(materializer) |
我们想知道materializer
表示什么。首先我们要创建一个Actor system
1 | implicit val system = ActorSystem("QuickStart") |
你也可以通过ActorContext
来创建。Materializer
是stream执行引擎的一个工厂,它使得streams可以执行——你现在不需要了解它的更多细节,仅需要知道你可以调用Source
的run*
方法。物化(materializer)实际上就是将Source
、Sink
、Flow
构建起来的蓝图(blueprint)提供可执行实现。
1 | val factorials = source.scan(BigInt(1))((acc, next) => acc * next) |
我们要时刻记住,Source
内部实际上不会进行任何计算处理,它仅仅是一个描述(description),当 run* 方法是计算它内部描述的内容。
在Akka Streams的术语中Source
表示整个stream的输入,Sink
表示整个stream的输出。在Akka Stream结构中,Source
仅有一个output channel不包含input channel,Sink
则刚好相反,包含一个input channel,不包含output channel。
下面是另外一个参考例子:
1 | import akka.NotUsed |
¶片段重用
Akka Streams中所有模块都可以实现resuable,例如
1 | def lineSink(filename: String): Sink[String, Future[IOResult]] = |
链接Source
和Flow
后得到的auxiliary information就是“materialized value”,我们可以直接将新创建的Sink
贴到我们的factorials
source上
1 | factorials.map(_.toString).runWith(lineSink("factorial2.txt")) |
¶基于时间处理
现在我们通过zip实现将两个source转换为一个stream
1 | factorials |
这段代码依赖于时间执行,我们使用throttle
来协调stream的速率(每1秒钟,传输一个元素),以此保证,即使是百万级以上的数据,也不会出现JVM内存溢出的情况。这里的throttle
在Akka Streams被称作 combinators——协调器,Akka Streams中所有 combinators 都遵循back-pressure设计实现。
¶Reactive Tweets
下面是一个例子,对非结构化数据的处理
首先定义我们的case class
1 | final case class Author(handle: String) |
引入隐式运行变量
1 | implicit val system = ActorSystem("reactive-tweets") |
创建Source
1 | val tweets: Source[Tweet, NotUsed] |
重用Source
1 | val authors: Source[Author, NotUsed] = |
物化(materializer)
1 | authors.runWith(Sink.foreach(println)) |
¶Flow处理
1 | val count: Flow[Tweet, Int, NotUsed] = Flow[Tweet].map(_ => 1) |