Akka Streaming学习

时至今日,我们从因特尔上消遣服务包含许多流媒体数据(streaming data),包括下载、上传、点对点的数据传送。把数据整体看做是一个元素集合的流(a stream of elements),对我们计算机发送和接收这些数据起了很大帮助,因为数据变得越来越庞大,以“流”的方式处理数据显得很有必要。

Actors看起来也可以处理“流”:它们顺序地接收一系列消息,并对消息进行传输。但我们发现,在actor之间要实现一个stable streaming,会显得冗余乏味(tedious)和易出错(error-prone),因为在处理发送(sending)和接收(receiving)时,我们还需要担心buffer溢出或邮箱溢出问题。另外一个陷阱(pitfall)是,Actor的消息会丢失,对丢失的消息要进行转发,以免stream一直停留在receiving的那一方。当处理完streams,Actor不能担保不会有连接(wiring)错误的出现。

基于这些原因,Akka Stream API便被提出。目的是提供一个直观的、安全的方式来**定制(formulate)**流处理,使我们在资源限制使用的情况(即控制内存溢出的情况)下高效处理。那么如何实现?akka stream实现了一个有 “back-pressure” 的特性,它来源于 Reactive Streams ,akka是该规范的初始成员。

Reactive Streams规范实现只有4个接口:

Publisher

1
2
3
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}

Subscriber

1
2
3
4
5
6
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}

Subscription

1
2
3
4
public interface Subscription {
public void request(long n);
public void cancel();
}

Processor

1
2
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

该协议的主要目标是实现 back-pressureasyncnon-blocking boundaries and interoperability

  • back-pressure:反作用力,具体实现细节为,Publisher实现了一个subscribe方法,意思是,Publisher发布者拥有对Subscriber订阅的功能,所以Publisher本身也是个Subscriber,只是它仅能对唯一一个对象进行订阅——Subscriber。反作用力体现在,Publisher可以由该方法,得知Subscriber的订阅能力,由此控制“订阅”的“速度”,不会超出“发布”的速度很多,以此控制避免内存溢出等问题。
  • async:异步。可以看到这4个接口的方法都没有返回值,所以这些方法实现以非阻塞方式控制,实现对流的异步操作处理。
  • non-blocking:非阻塞。Subscriber除了onSubscribe外,包含有onNext方法,当一个Element处理完成(异步方式),调用onNext方法,处理下一个Element,不会阻塞整个操作(出现异常或错误,具体看代码实现细节,akka stream对错误处理为supervision strategy方式,删除出错的element,继续处理下一个onNext,参考kafka设计)。

快速开始

  • Source: something with exactly one output stream
  • Sink: something with exactly one input stream
  • Flow: something with exactly one input and one output stream
  • BidiFlow: something with exactly two input streams and two output streams that conceptually behave like two Flows of opposite direction
  • Graph: a packaged stream processing topology that exposes a certain set of input and output ports, characterized by an object of type Shape.

一个stream通常以一个source开始,我们需要引入相应的包

1
2
import akka.stream._
import akka.stream.scaladsl._

另外还需引入常用的执行关联包

1
2
3
4
5
6
import akka.{ NotUsed, Done }
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths

以及Main方法

1
2
3
object Main extends App {
// Code here
}

一个最简单的Source,包含1 to 100整数

1
val source: Source[Int, NotUsed] = Source(1 to 100)

其中Source参数化两个类型:第一个是Int为该source 发射元素的类型,第二个为运行该source可能产生的 辅助值(auxiliary value)(如网络source会得到端口号或地址),由SourcesSinks运行后产生的auxiliary value,术语上被称作materialized value。不产生辅助信息时,用akka.NotUsed表示——这里例子的整型肯定不会产生辅助信息。

创建了source,我们便可以发射这100个自然数,但Source没有激活。因此我们需要执行:

1
source.runForeach(i => println(i))(materializer)

这行执行在一个consumer 函数完成——以包含“run”的方法名执行;包含其它类似的方法。

在App执行该代码时,程序并没有终止退出,因为ActorSystem没有终止。runForeach返回一个Future[Done]表示stream完成

1
2
3
4
val done: Future[Done] = source.runForeach(i => println(i))(materializer)

implicit val ec = system.dispatcher
done.onComplete(_ => system.terminate())

我们想知道materializer表示什么。首先我们要创建一个Actor system

1
2
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

你也可以通过ActorContext来创建。Materializer是stream执行引擎的一个工厂,它使得streams可以执行——你现在不需要了解它的更多细节,仅需要知道你可以调用Sourcerun*方法。物化(materializer)实际上就是将SourceSinkFlow构建起来的蓝图(blueprint)提供可执行实现。

1
2
3
4
5
6
val factorials = source.scan(BigInt(1))((acc, next) => acc * next)

val result: Future[IOResult] =
factorials
.map(num => ByteString(s"$num\n"))
.runWith(FileIO.toPath(Paths.get("factorials.txt")))

我们要时刻记住,Source内部实际上不会进行任何计算处理,它仅仅是一个描述(description),当 run* 方法是计算它内部描述的内容。

在Akka Streams的术语中Source表示整个stream的输入,Sink表示整个stream的输出。在Akka Stream结构中,Source仅有一个output channel不包含input channel,Sink则刚好相反,包含一个input channel,不包含output channel。

下面是另外一个参考例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

final case class Author(handle: String)

final case class Hashtag(name: String)

final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}

val akkaTag = Hashtag("#akka")

val tweets: Source[Tweet, NotUsed] = Source(
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("bantonsson"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("drewhk"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("ktosopl"), System.currentTimeMillis, "#akka on the rocks!") ::
Tweet(Author("mmartynas"), System.currentTimeMillis, "wow #akka !") ::
Tweet(Author("akkateam"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("bananaman"), System.currentTimeMillis, "#bananas rock!") ::
Tweet(Author("appleman"), System.currentTimeMillis, "#apples rock!") ::
Tweet(Author("drama"), System.currentTimeMillis, "we compared #apples to #oranges!") ::
Nil)

implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer()

tweets
.filterNot(_.hashtags.contains(akkaTag))
.mapConcat(_.hashtags)
.map(_.name.toUpperCase)
.runWith(Sink.foreach(println))

// $FiddleDependency org.akka-js %%% akkajsactorstream % 1.2.5.1

片段重用

Akka Streams中所有模块都可以实现resuable,例如

1
2
3
4
def lineSink(filename: String): Sink[String, Future[IOResult]] =
Flow[String]
.map(s => ByteString(s + "\n"))
.toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)

链接SourceFlow后得到的auxiliary information就是“materialized value”,我们可以直接将新创建的Sink贴到我们的factorials source上

1
factorials.map(_.toString).runWith(lineSink("factorial2.txt"))

基于时间处理

现在我们通过zip实现将两个source转换为一个stream

1
2
3
4
factorials
.zipWith(Source(0 to 100))((num, idx) => s"$idx! = $num")
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.runForeach(println)

这段代码依赖于时间执行,我们使用throttle来协调stream的速率(每1秒钟,传输一个元素),以此保证,即使是百万级以上的数据,也不会出现JVM内存溢出的情况。这里的throttle在Akka Streams被称作 combinators——协调器,Akka Streams中所有 combinators 都遵循back-pressure设计实现。

Reactive Tweets

下面是一个例子,对非结构化数据的处理

首先定义我们的case class

1
2
3
4
5
6
7
8
9
10
final case class Author(handle: String)

final case class Hashtag(name: String)

final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}

val akkaTag = Hashtag("#akka")

引入隐式运行变量

1
2
implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer()

创建Source

1
val tweets: Source[Tweet, NotUsed]

重用Source

1
2
3
4
val authors: Source[Author, NotUsed] =
tweets
.filter(_.hashtags.contains(akkaTag))
.map(_.author)

物化(materializer)

1
authors.runWith(Sink.foreach(println))

Flow处理

1
2
3
4
5
6
7
8
9
10
11
12
val count: Flow[Tweet, Int, NotUsed] = Flow[Tweet].map(_ => 1)

val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)

val counterGraph: RunnableGraph[Future[Int]] =
tweets
.via(count)
.toMat(sumSink)(Keep.right)

val sum: Future[Int] = counterGraph.run()

sum.foreach(c => println(s"Total tweets processed: $c"))