微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

akka 演员监督策略不适用于使用 ActorRefFactory 创建的子演员

如何解决akka 演员监督策略不适用于使用 ActorRefFactory 创建的子演员

我正在通过 ActorRefFactory 创建子演员,以便我可以注入它 我正在为这些子演员使用一对一的监督策略,但是当他们抛出异常时,我无法在日志中看到监督 startegy 正在运行,因为我将其设置为 Resume

这是我的代码

val system = actorInjector.getInstance(classOf[ActorSystem])


val eventWriteActor = (f:ActorRefFactory) => f.actorOf(Props(new EventWriteActor(insertService)),"eventWriteActor")
  val actorsList:List[ActorRefFactory=>ActorRef]=List(eventWriteActor,eventQueryActor)
  val actorManager: ActorRef = system.actorOf(Props(new ActorManager(actorsList)),name = "ActorManager")

ActorManager.scala //父actor

class ActorManager(childMaker: List[ActorRefFactory => ActorRef]) extends Actor {
  val seconds = 10
  var eventWriteActor: Option[ActorRef] = None
  var eventQueryActor: Option[ActorRef] = None

  override val supervisorStrategy = OneForOnestrategy(
    maxNrOfRetries = seconds,withinTimeRange = seconds seconds) {
    case mex: MongoException =>
      log.error("Got some MongoException,Supervision Strategy says Resume exception is {}",mex)
      Resume
    case e: Exception =>
      log.error("Got some Exception,e)
      Resume
  }
  if (childMaker.nonEmpty) {
    eventWriteActor = Some(childMaker(0)(context))
    eventQueryActor = Some(childMaker(1)(context))
  }
  def receive: PartialFunction[Any,Unit] = {
    case InsertEvent(event) =>
      eventWriteActor match {
        case Some(writeActor)=>
          ask(writeActor,InsertEventInMongo(event)).pipeto(sender)
        case None=>log.info("eventWriteActor is empty")
      }
}
}

童星

class EventWriteActor @Inject() (insertEventServiceTrait:InsertEventServiceTrait) extends Actor {
def receive: PartialFunction[Any,Unit] = {
    case InsertEventInMongo(event) =>
      senderRef = Some(sender)
      insertEventServiceTrait.insertEventInMongo(event,senderRef)
  }
}

服务类

class InsertEventService @Inject()(eventRepo: EventRepository) extends InsertEventServiceTrait {
override def insertEventInMongo(event: Event,senderRef: Option[ActorRef]): Unit = {
    eventRepository.insertEvent(event: Event,senderRef: Option[ActorRef])
  }
}

仓库类

class EventRepositoryImpl @Inject()(mongodb: MongoFactoryTrait) extends EventRepository {
  val eventInsert = new EventInsert(mongodb)
  override def insertEvent(event: Event,senderRef: Option[akka.actor.ActorRef]): Unit = {
    eventInsert.eventInsert(event: Event,senderRef: Option[akka.actor.ActorRef])
  }
}

域类

class EventInsert(mongoFactoryTrait: MongoFactoryTrait) {
def eventInsert(event: Event,senderRef: Option[akka.actor.ActorRef]):Unit = {
    var isInserted: Boolean = false
    val document = insertDocument(event)
    val collection: MongoCollection[Document] = mongoFactoryTrait.collectionMongo(Event_COLLECTION_NAME)
    val insertionResult: SingleObservable[Completed] = collection.insertOne(document)

    insertionResult.subscribe(new Observer[Completed] {

      override def onNext(result: Completed): Unit = {
        log.info("insertEvent: Event successfully inserted")
        isInserted = true
      }

      override def onError(e: Throwable) = {
        isInserted = false
        e match {
          case mongoEx: MongoException =>
            log.error("insertEvent: Mongoexception in inserting Event",mongoEx)
            senderRef match {
              case Some(ref: ActorRef) =>
                ref ! akka.actor.Status.Failure(mongoEx)
              case None => log.warn("insertEvent: Actor reference is null")
            }
          case e: Exception =>
            isInserted = false
            log.debug("insertEvent: Event record not inserted,isInserted ={}",isInserted)
            senderRef match {
              case Some(ref: ActorRef) =>
                ref ! akka.actor.Status.Failure(e)
              case None => log.warn("insertEvent: Actor reference is null")
            }
        }
      }

      override def onComplete(): Unit = {
        isInserted = true
        
        senderRef match {
          case Some(ref: ActorRef) =>
            ref ! isInserted
          case None => log.warn("insertEvent: Actor reference is null")
        }
      }
    })
  }

我正在添加一个具有相同密钥的文档,以便代码抛出 MongodublicateKey 异常,我可以测试代码,但这里是我得到的日志,它没有在演员中显示主管演员

17:27:30.500 9358 [InnocuousThread-7] operation DEBUG - Unable to retry operation INSERT due to error "com.mongodb.MongoBulkWriteException: Bulk write operation error on server localhost:27017. Write errors: [BulkWriteError{index=0,code=11000,message='E11000 duplicate key error collection: myprojectdb.Event index: _id_ dup key: { _id: "123" }',details={}}]. "
17:27:30.505 9363 [InnocuousThread-7] EventInsert ERROR - insertEvent: Mongoexception in inserting Event
com.mongodb.MongoWriteException: E11000 duplicate key error collection: myprojectdb.Event index: _id_ dup key: { _id: "123" }
    at com.mongodb.async.client.MongoCollectionImpl$10.onResult(MongoCollectionImpl.java:1140)
    at com.mongodb.async.client.MongoCollectionImpl$10.onResult(MongoCollectionImpl.java:1127)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.async.client.OperationExecutorImpl$2$1$1.onResult(OperationExecutorImpl.java:140)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.operation.OperationHelper$ConnectionReleasingWrappedCallback.onResult(OperationHelper.java:432)
    at com.mongodb.operation.MixedBulkWriteOperation.addBatchResult(MixedBulkWriteOperation.java:525)
    at com.mongodb.operation.MixedBulkWriteOperation.access$1600(MixedBulkWriteOperation.java:72)
    at com.mongodb.operation.MixedBulkWriteOperation$6.onResult(MixedBulkWriteOperation.java:507)
    at com.mongodb.operation.MixedBulkWriteOperation$6.onResult(MixedBulkWriteOperation.java:479)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:253)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:85)
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:467)
    at com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)
    at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:399)
    at com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:376)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:677)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:644)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
    at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:560)
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:277)
    at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:298)
    at com.mongodb.internal.connection.AsynchronousSocketChannelStream$AsynchronousSocketChannelAdapter.read(AsynchronousSocketChannelStream.java:137)
    at com.mongodb.internal.connection.AsynchronousChannelStream.readAsync(AsynchronousChannelStream.java:105)
    at com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:511)
    at com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:76)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:634)
    at com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:619)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:514)
    at com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:511)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:220)
    at com.mongodb.internal.connection.AsynchronousChannelStream$BasicCompletionHandler.completed(AsynchronousChannelStream.java:203)
    at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:437)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)
    at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
    at java.base/sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:306)
    at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
    at java.base/jdk.internal.misc.InnocuousThread.run(InnocuousThread.java:134)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?