非阻塞方式处理迭代Future问题

Scala的Future实现是非常cool的,它很容易并行处理代码,除此之外易于组合,因为它是一个monad实现。

我们在创建一个Future的同时,它被切分为多个线程执行:

1
2
3
4
5
val list = List(1,2)
def doubleFuture(i: Int) = Future {i * 2}

list map doubleFuture //returns List[Future[Int]]
Future.traverse(list)(doubleFuture) //returns Future[List[Int]]

因为map执行后会返回得到一个List[Future[Int]],更通常的一种用法是使用Future.traverse将它转换为Future[List[Int]],它最终结果是相同的,以及并行地去执行Future里面的内容,只不过仅返回一个Future类型。

有时候,我们并不希望并行地去处理所有Iterator里面的内容?又或者我们希望在处理过程中,某个计算failed掉了,可以终止整个处理进程?实际上,在工作中有大量的这种情形需要这样特别对待:Future 按顺序一个一个地处理,某一个failed,终止进程。很明显,我们不期待使用Await,它会阻塞上一个进程。下面的“技巧”用到了foldLeft的方式,这样List里面下一个值的计算,开始于前一个计算完成时。

1
2
3
4
5
6
7
8
9
def serialiseFutures[A, B](l: Iterable[A])(fn: A => Future[B])
(implicit ec: ExecutionContext): Future[List[B]] =
l.foldLeft(Future(List.empty[B])) {
(previousFuture, next) =>
for {
previousResults <- previousFuture
next <- fn(next)
} yield previousResults :+ next
}

它仅在上一个future是complete或successful下才被调用。它会被立即执行,但Future实际上是顺序执行的。

上面代码的返回类型是Future[List[B]]。理想情况下,它应该返回Future[C[B]],C是我们真正传递的集合类型。我们可以借助CanBuildFrom进行优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def serialiseFutures[A, B, C[A] <: Iterable[A]]
(collection: C[A])(fn: A => Future[B])(
implicit ec: ExecutionContext,
cbf: CanBuildFrom[C[B], B, C[B]]): Future[C[B]] = {
val builder = cbf()
builder.sizeHint(collection.size)

collection.foldLeft(Future(builder)) {
(previousFuture, next) =>
for {
previousResults <- previousFuture
next <- fn(next)
} yield previousResults += next
} map { builder => builder.result }
}

单元测试如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
val start = System.currentTimeMillis
val doubled = Await.result({
serialiseFutures(List(10, 20)) { i =>
Future {
Thread.sleep(i)
i * 2
}
}
}, 1 second)

val timeElapsed = System.currentTimeMillis - start
timeElapsed should be >= (30l)
doubled should be(List(20, 40))

那是否可以通过顺序执行中抛出异常的方式,终止整个for循环的调用吗?

实际上,循环中某个Future fail掉了,循环是不会终止的。因为原生的for-comprehension(它是一个flatMap实现).

另外一种实现方式是使用Akka Stream的mapAsyncUnordered来处理,由Stream的Strategy来处理异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val futureFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._

val dangerousFlow = Flow[Int].mapAsyncUnordered[Int](5)(makeFuture)

val safeFlow = Flow[Int]
val bcast = builder.add(Broadcast[Int](2))
val zip = builder.add(Zip[Int, Int])

bcast ~> dangerousFlow ~> zip.in0
bcast ~> safeFlow ~> zip.in1

FlowShape(bcast.in, zip.out)
})

Source(1 to 10)
.via(futureFlow)
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runForeach(println)

由于Broadcast使用back-pressure机制,执行时需要等待上游的完成才进行下一步操作,因此也是按照顺序执行的,该方式跟Future.foldLeft的实现是等价的。我们还是希望可以各个Future异步地调用处理,并快速失败。但这并不符合响应式设计,因为如果异步地不按顺序执行,我们并不知哪个快哪个慢,这是一种严重的内存消耗、硬件消耗!!

如果一定要实现性能最优,我们可能需要额外的GraphStage,并赋予因子factor,用于记录上一次下游向上游拉取所用的时间,并进行决策。