如何解决如何应对akka流中的异常上游故障
我的akka Flow[I,O]
不受控制,因为它来自某些第三方代码。每当输入元素不产生输出元素时,我就需要做出反应(例如,由于流程的某些部分引发了异常)。为此,我需要产生故障的输入元素。我在流上没有找到任何API或类似的东西,它允许我注册处理程序或以任何方式对其作出反应。我该怎么办?
解决方法
当akka流引发异常时,您想Resume
而不是Stop
。收集所有成功的元素之后,您可以Seq#diff
来说明由于抛出异常而删除了哪些元素。
import scala.concurrent.ExecutionContext
import scala.util.{Failure,Success}
object Exception {
case class MyException(n: Int) extends RuntimeException
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("Exception")
implicit val ec: ExecutionContext = system.dispatcher
val decider: Supervision.Decider = {
case _: MyException => Supervision.Resume
case _ => Supervision.Stop
}
val flow = Flow[Int]
.map(n =>
if (n % 2 == 1) throw MyException(n)
else n
)
val in = 1 to 10
val outFuture = Source(in)
.via(flow)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
outFuture.onComplete {
case Success(out) =>
println("dropped elements are " + (in.diff(out)))
case Failure(_) =>
println("unknown failure")
}
}
}
控制台输出为:
dropped elements are Vector(1,3,5,7,9)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。