Scala的Future实现是非常cool的,它很容易并行处理代码,除此之外易于组合,因为它是一个monad
实现。
我们在创建一个Future的同时,它被切分为多个线程执行:
1 2 3 4 5
| val list = List(1,2) def doubleFuture(i: Int) = Future {i * 2}
list map doubleFuture Future.traverse(list)(doubleFuture)
|
因为map
执行后会返回得到一个List[Future[Int]]
,更通常的一种用法是使用Future.traverse
将它转换为Future[List[Int]]
,它最终结果是相同的,以及并行地去执行Future里面的内容,只不过仅返回一个Future类型。
有时候,我们并不希望并行地去处理所有Iterator
里面的内容?又或者我们希望在处理过程中,某个计算failed掉了,可以终止整个处理进程?实际上,在工作中有大量的这种情形需要这样特别对待:Future 按顺序一个一个地处理,某一个failed,终止进程。很明显,我们不期待使用Await
,它会阻塞上一个进程。下面的“技巧”用到了foldLeft
的方式,这样List里面下一个值的计算,开始于前一个计算完成时。
1 2 3 4 5 6 7 8 9
| def serialiseFutures[A, B](l: Iterable[A])(fn: A => Future[B]) (implicit ec: ExecutionContext): Future[List[B]] = l.foldLeft(Future(List.empty[B])) { (previousFuture, next) => for { previousResults <- previousFuture next <- fn(next) } yield previousResults :+ next }
|
它仅在上一个future是complete或successful下才被调用。它会被立即执行,但Future实际上是顺序执行的。
上面代码的返回类型是Future[List[B]]
。理想情况下,它应该返回Future[C[B]]
,C是我们真正传递的集合类型。我们可以借助CanBuildFrom
进行优化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def serialiseFutures[A, B, C[A] <: Iterable[A]] (collection: C[A])(fn: A => Future[B])( implicit ec: ExecutionContext, cbf: CanBuildFrom[C[B], B, C[B]]): Future[C[B]] = { val builder = cbf() builder.sizeHint(collection.size)
collection.foldLeft(Future(builder)) { (previousFuture, next) => for { previousResults <- previousFuture next <- fn(next) } yield previousResults += next } map { builder => builder.result } }
|
单元测试如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| val start = System.currentTimeMillis val doubled = Await.result({ serialiseFutures(List(10, 20)) { i => Future { Thread.sleep(i) i * 2 } } }, 1 second)
val timeElapsed = System.currentTimeMillis - start timeElapsed should be >= (30l) doubled should be(List(20, 40))
|
那是否可以通过顺序执行中抛出异常的方式,终止整个for循环的调用吗?
实际上,循环中某个Future fail掉了,循环是不会终止的。因为原生的for-comprehension(它是一个flatMap实现).
另外一种实现方式是使用Akka Stream的mapAsyncUnordered
来处理,由Stream的Strategy来处理异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| val futureFlow = Flow.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._
val dangerousFlow = Flow[Int].mapAsyncUnordered[Int](5)(makeFuture)
val safeFlow = Flow[Int] val bcast = builder.add(Broadcast[Int](2)) val zip = builder.add(Zip[Int, Int])
bcast ~> dangerousFlow ~> zip.in0 bcast ~> safeFlow ~> zip.in1
FlowShape(bcast.in, zip.out) })
Source(1 to 10) .via(futureFlow) .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) .runForeach(println)
|
由于Broadcast
使用back-pressure机制,执行时需要等待上游的完成才进行下一步操作,因此也是按照顺序执行的,该方式跟Future.foldLeft
的实现是等价的。我们还是希望可以各个Future异步地调用处理,并快速失败。但这并不符合响应式设计,因为如果异步地不按顺序执行,我们并不知哪个快哪个慢,这是一种严重的内存消耗、硬件消耗!!
如果一定要实现性能最优,我们可能需要额外的GraphStage
,并赋予因子factor
,用于记录上一次下游向上游拉取所用的时间,并进行决策。