如何解决Akka Streams (Scala):过滤掉异常
我的 akka 流管道中的一个步骤是在接收到无效输入时抛出异常的转换。我想丢弃这些有问题的输入。所以,我想出了以下解决方案:
...
.map( input => Try( transformation( input ) ).toOption )
.filter( _.nonEmpty )
.map( _.get )
...
对于实际上只是一个 flatMap 的东西需要 3 个步骤。
有没有更直接的 akka 方法来做到这一点?
解决方法
您可以使用监督策略。取自文档:
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
val flow = Flow[Int]
.filter(100 / _ < 50)
.map(elem => 100 / (5 - elem))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
您可以配置决策器以执行您需要的任何操作。如果您需要为所有异常跳过该元素,请使用
case _: Throwable => Supervision.Resume
看看https://doc.akka.io/docs/akka/current/stream/stream-error.html
,如果您想按照示例代码中的指示默默地丢弃异常,这里有几种方法可以减少步骤:
// A dummy transformation
def transformation(i: Int): Int = 100 / i
// #1: Use `collect`
Source(List(5,2,1)).
map(input => Try(transformation(input)).toOption).
collect{ case x if x.nonEmpty => x.get }.
runForeach(println)
// Result: 20,50,100
// #2: Use `mapConcat`
Source(List(5,1)).
mapConcat(input => List(Try(transformation(input)).toOption).flatten).
runForeach(println)
// Result: 20,100
请注意,Akka Source/Flow 没有 flatMap
,尽管 mapConcat
(和 flatMapConcat
)的功能有些类似。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。