微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

从 Akka-2.6.9 更新到 Akka 2.6.10 时,ShardCoordinator$LeastShardAllocationStrategy 中的 NullPointerException

如何解决从 Akka-2.6.9 更新到 Akka 2.6.10 时,ShardCoordinator$LeastShardAllocationStrategy 中的 NullPointerException

我面临 java.lang.NullPointerException,同时将 Akka 版本从 2.6.9 更新到 2.6.10

这是我遇到此错误的示例代码:-

  1. akka-sharding/src/main/resources/application.conf
akka {
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  cluster {
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551","akka://ClusterSystem@127.0.0.1:2552"]
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
}
  1. akka-sharding/src/main/scala/MyActorSharding.scala
import akka.actor.{ActorLogging,ActorRef,ActorSystem,PoisonPill,Props}
import akka.cluster.Cluster
import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy,ShardAllocationStrategy}
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.{ClusterSharding,ClusterShardingSettings}
import scala.concurrent.Future
import scala.util.Random

object MyActorSharding extends App {

  class Actor extends akka.actor.Actor with ActorLogging {

    private var index = 0

    override def receive: Receive = {
      case x: Int =>
        index += x
      case "print" =>
        println("???????????????" + index)
        println(self.path.toString)
    }
  }

  class MyShardAllocationStrategy(shardAllocationStrategy: ShardAllocationStrategy)
  extends ShardAllocationStrategy {
    override def allocateShard(requester: ActorRef,shardId: ShardId,currentShardAllocations: Map[ActorRef,IndexedSeq[ShardId]]): Future[ActorRef] = {
      println("Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..")
      shardAllocationStrategy.allocateShard(requester,shardId,currentShardAllocations)
    }

    override def rebalance(currentShardAllocations: Map[ActorRef,IndexedSeq[ShardId]],rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
      println("Rebalance Shard ->>>>>>>>>>>>>>>>...")
      shardAllocationStrategy.rebalance(currentShardAllocations,rebalanceInProgress)
    }
  }

  val extractEntityId: PartialFunction[Any,(String,Any)] = {
    case message =>
      (
        s"${message.toString} -> entityId -> ${new Random().nextInt(34)}",message
      )
  }

  val shardId: Any => String = x =>
    s"${x.toString} -> shardId -> ${new Random().nextInt(34)}"

  val actorProps: Props = Props[Actor]

  val system = ActorSystem("ClusterSystem")
  val cluster = Cluster(system)
  val clusterSharding = ClusterSharding(system)
  val actorRef = clusterSharding.start(
    "MyActor",actorProps,ClusterShardingSettings(system),extractEntityId,new MyShardAllocationStrategy(new LeastShardAllocationStrategy(10,3)),PoisonPill
  )

  actorRef ! 12
  actorRef ! 13
  actorRef ! 14
  actorRef ! "print"

}

现在使用 Akka-2.6.9

akka-sharding/build.sbt

val akkaVersion = "2.6.9"

lazy val root = (project in file("."))
  .settings(
    name := "akka-sharding",version := "0.1",scalaVersion := "2.13.5",libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-cluster" % akkaVersion,"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
    )
  )

我得到的输出为:-

[INFO] [05/04/2021 15:37:29.970] [main] [ArteryTcpTransport(akka://ClusterSystem)] Remoting started with transport [Artery tcp]; listening on address [akka://ClusterSystem@127.0.0.1:2551] with UID [3133807086855433188]
[INFO] [05/04/2021 15:37:29.990] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Starting up,Akka version [2.6.9] ...
[INFO] [05/04/2021 15:37:30.232] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/04/2021 15:37:30.233] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [05/04/2021 15:37:30.327] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] SBR started. Config: stableAfter: 20000 ms,strategy: KeepMajority,selfUniqueAddress: UniqueAddress(akka://ClusterSystem@127.0.0.1:2551,3133807086855433188),selfDc: default
[INFO] [05/04/2021 15:37:30.756] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Idle entities will be passivated after [2.000 min]
[WARN] [05/04/2021 15:37:30.806] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552],control stream] Upstream Failed,cause: StreamTcpException: Tcp command [Connect(127.0.0.1/<unresolved>:2552,None,List(),Some(5000 milliseconds),true)] Failed because of java.net.ConnectException: Connection refused
[WARN] [05/04/2021 15:37:30.807] [ClusterSystem-akka.remote.default-remote-dispatcher-8] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552],message stream] Upstream Failed,true)] Failed because of java.net.ConnectException: Connection refused
[INFO] [akkaMemberChanged][05/04/2021 15:37:35.428] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[INFO] [05/04/2021 15:37:35.430] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[INFO] [akkaMemberChanged][05/04/2021 15:37:35.435] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [05/04/2021 15:37:35.443] [ClusterSystem-akka.actor.default-dispatcher-4] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] This node is Now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[INFO] [akkaClusterSingletonStarted][05/04/2021 15:37:35.449] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager starting singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:37:35.451] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Start -> Oldest]
[INFO] [05/04/2021 15:37:35.462] [ClusterSystem-akka.actor.default-dispatcher-11] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] ShardCoordinator was moved to the active state State(Map())
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
???????????????0
akka://ClusterSystem/system/sharding/MyActor/print+-%3E+shardId+-%3E+26/print+-%3E+entityId+-%3E+1
Rebalance Shard ->>>>>>>>>>>>>>>>...
[WARN] [05/04/2021 15:37:50.452] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Association(akka://ClusterSystem)] Outbound control stream to [akka://ClusterSystem@127.0.0.1:2552] Failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://ClusterSystem@127.0.0.1:2552] did not complete within 20000 ms
Rebalance Shard ->>>>>>>>>>>>>>>>...
Rebalance Shard ->>>>>>>>>>>>>>>>...
Rebalance Shard ->>>>>>>>>>>>>>>>...
[INFO] [05/04/2021 15:38:18.737] [ClusterSystem-akka.actor.default-dispatcher-11] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] Starting rebalance for shards [print -> shardId -> 26,14 -> shardId -> 10,13 -> shardId -> 4,12 -> shardId -> 20]. Current shards rebalancing: []
[INFO] [akkaMemberChanged][05/04/2021 15:38:18.756] [ClusterSystem-akka.actor.internal-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Marked address [akka://ClusterSystem@127.0.0.1:2551] as [Leaving]
[INFO] [05/04/2021 15:38:18.759] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Exited [akka://ClusterSystem@127.0.0.1:2551]
[INFO] [05/04/2021 15:38:18.759] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Oldest observed OldestChanged: [akka://ClusterSystem@127.0.0.1:2551 -> None]
[INFO] [05/04/2021 15:38:18.760] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Oldest -> WasOldest]
[INFO] [akkaMemberChanged][05/04/2021 15:38:19.101] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Exiting]
[INFO] [05/04/2021 15:38:19.771] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager stopping singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:38:19.771] [ClusterSystem-akka.actor.internal-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [WasOldest -> Stopping]
[INFO] [akkaClusterSingletonTerminated][05/04/2021 15:38:19.773] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton] was terminated
[INFO] [05/04/2021 15:38:19.778] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Exiting completed
[INFO] [05/04/2021 15:38:19.779] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Shutting down...
[INFO] [05/04/2021 15:38:19.780] [ClusterSystem-akka.actor.internal-dispatcher-6] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Successfully shut down
[INFO] [akkaDeadLetter][05/04/2021 15:38:19.780] [ClusterSystem-akka.actor.default-dispatcher-26] [akka://ClusterSystem/system/clusterEventBusListener] Message [akka.cluster.ClusterEvent$SeenChanged] to Actor[akka://ClusterSystem/system/clusterEventBusListener#636946749] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/04/2021 15:38:19.785] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [05/04/2021 15:38:19.785] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [05/04/2021 15:38:19.799] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.

Akka-2.6.10

  1. akka-sharding/build.sbt
val akkaVersion = "2.6.10"

lazy val root = (project in file("."))
  .settings(
    name := "akka-sharding","com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion
    )
  )

我在 rebalanceallocateShard

时收到 java.lang.NullPointerException
[INFO] [05/04/2021 15:41:44.061] [main] [ArteryTcpTransport(akka://ClusterSystem)] Remoting started with transport [Artery tcp]; listening on address [akka://ClusterSystem@127.0.0.1:2551] with UID [-8668970757003113418]
[INFO] [05/04/2021 15:41:44.081] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Starting up,Akka version [2.6.10] ...
[INFO] [05/04/2021 15:41:44.198] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/04/2021 15:41:44.198] [main] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [05/04/2021 15:41:44.246] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] SBR started. Config: strategy [KeepMajority],stable-after [20 seconds],down-all-when-unstable [15 seconds],selfUniqueAddress [akka://ClusterSystem@127.0.0.1:2551#-8668970757003113418],selfDc [default].
[INFO] [05/04/2021 15:41:44.496] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Idle entities will be passivated after [2.000 min]
[WARN] [05/04/2021 15:41:44.541] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552],true)] Failed because of java.net.ConnectException: Connection refused
[WARN] [05/04/2021 15:41:44.541] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [akka.stream.Log(akka://ClusterSystem/system/Materializers/StreamSupervisor-1)] [outbound connection to [akka://ClusterSystem@127.0.0.1:2552],true)] Failed because of java.net.ConnectException: Connection refused
[INFO] [akkaMemberChanged][05/04/2021 15:41:49.349] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING itself (with roles [dc-default],version [0.0.0]) and forming new cluster
[INFO] [05/04/2021 15:41:49.351] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[INFO] [akkaMemberChanged][05/04/2021 15:41:49.357] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [05/04/2021 15:41:49.367] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/cluster/core/daemon/downingProvider] This node is Now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[INFO] [akkaClusterSingletonStarted][05/04/2021 15:41:49.372] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager starting singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:41:49.372] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Start -> Oldest]
[INFO] [05/04/2021 15:41:49.384] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..
[ERROR] [05/04/2021 15:41:49.655] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.allocateShard(AbstractLeastShardAllocationStrategy.scala:85)
    at MyActorSharding$MyShardAllocationStrategy.allocateShard(MyActorSharding.scala:31)
    at akka.cluster.sharding.ShardCoordinator$$anonfun$active$1.applyOrElse(ShardCoordinator.scala:748)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.Timers.aroundReceive(Timers.scala:56)
    at akka.actor.Timers.aroundReceive$(Timers.scala:41)
    at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.MailBox.processMailBox(MailBox.scala:270)
    at akka.dispatch.MailBox.run(MailBox.scala:231)
    at akka.dispatch.MailBox.exec(MailBox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

[INFO] [akkaDeadLetter][05/04/2021 15:41:49.669] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.669] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][05/04/2021 15:41:49.670] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] Message [akka.cluster.sharding.ShardCoordinator$Internal$GetShardHome] from Actor[akka://ClusterSystem/system/sharding/MyActor#-1153927922] to Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829] was unhandled. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [05/04/2021 15:41:50.171] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:41:54.591] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,13 -> shardId -> 33,14 -> shardId -> 13,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
Rebalance Shard ->>>>>>>>>>>>>>>>...
[ERROR] [05/04/2021 15:41:59.672] [ClusterSystem-akka.actor.default-dispatcher-22] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
    at akka.cluster.sharding.ShardCoordinator$LeastShardAllocationStrategy.rebalance(ShardCoordinator.scala:294)
    at MyActorSharding$MyShardAllocationStrategy.rebalance(MyActorSharding.scala:37)
    at akka.cluster.sharding.ShardCoordinator$$anonfun$active$1.applyOrElse(ShardCoordinator.scala:790)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.Timers.aroundReceive(Timers.scala:56)
    at akka.actor.Timers.aroundReceive$(Timers.scala:41)
    at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.MailBox.processMailBox(MailBox.scala:270)
    at akka.dispatch.MailBox.run(MailBox.scala:231)
    at akka.dispatch.MailBox.exec(MailBox.scala:243)
    ...

[INFO] [05/04/2021 15:42:00.191] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:42:04.352] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [Association(akka://ClusterSystem)] Outbound control stream to [akka://ClusterSystem@127.0.0.1:2552] Failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://ClusterSystem@127.0.0.1:2552] did not complete within 20000 ms
[WARN] [05/04/2021 15:42:04.671] [ClusterSystem-akka.actor.internal-dispatcher-4] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
Rebalance Shard ->>>>>>>>>>>>>>>>...
[ERROR] [05/04/2021 15:42:09.692] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator] null
java.lang.NullPointerException
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.clusterState(AbstractLeastShardAllocationStrategy.scala:78)
    at akka.cluster.sharding.internal.AbstractLeastShardAllocationStrategy.regionEntriesFor(AbstractLeastShardAllocationStrategy.scala:124)
    at akka.cluster.sharding.ShardCoordinator$LeastShardAllocationStrategy.rebalance(ShardCoordinator.scala:294)
    at MyActorSharding$MyShardAllocationStrategy.rebalance(MyActorSharding.scala:37)
    at akka.cluster.sharding.ShardCoordinator$$anonfun$active$1.applyOrElse(ShardCoordinator.scala:790)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:244)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.cluster.sharding.DDataShardCoordinator.akka$actor$Timers$$super$aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.Timers.aroundReceive(Timers.scala:56)
    at akka.actor.Timers.aroundReceive$(Timers.scala:41)
    at akka.cluster.sharding.DDataShardCoordinator.aroundReceive(ShardCoordinator.scala:1335)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.MailBox.processMailBox(MailBox.scala:270)
    at akka.dispatch.MailBox.run(MailBox.scala:231)
    at akka.dispatch.MailBox.exec(MailBox.scala:243)
    ...

[INFO] [05/04/2021 15:42:10.211] [ClusterSystem-akka.actor.default-dispatcher-6] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator/singleton/coordinator] MyActor: ShardCoordinator was moved to the active state State(Map())
[WARN] [05/04/2021 15:42:14.771] [ClusterSystem-akka.actor.internal-dispatcher-2] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Requested shard homes [12 -> shardId -> 7,print -> shardId -> 29] from coordinator at [Actor[akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton/coordinator#-88352829]]. [4] total buffered messages.
[WARN] [05/04/2021 15:42:15.521] [ClusterSystem-akka.actor.internal-dispatcher-2] [CoordinatedShutdown(akka://ClusterSystem)] Coordinated shutdown phase [cluster-sharding-shutdown-region] timed out after 10000 milliseconds
[INFO] [akkaMemberChanged][05/04/2021 15:42:15.524] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Marked address [akka://ClusterSystem@127.0.0.1:2551] as [Leaving]
[INFO] [05/04/2021 15:42:15.527] [ClusterSystem-akka.actor.internal-dispatcher-4] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Exited [akka://ClusterSystem@127.0.0.1:2551]
[INFO] [05/04/2021 15:42:15.528] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Oldest observed OldestChanged: [akka://ClusterSystem@127.0.0.1:2551 -> None]
[INFO] [05/04/2021 15:42:15.529] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [Oldest -> WasOldest]
[INFO] [akkaMemberChanged][05/04/2021 15:42:15.771] [ClusterSystem-akka.actor.internal-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Exiting]
[INFO] [05/04/2021 15:42:16.541] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton manager stopping singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton]
[INFO] [05/04/2021 15:42:16.541] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] ClusterSingletonManager state change [WasOldest -> Stopping]
[INFO] [akkaClusterSingletonTerminated][05/04/2021 15:42:16.544] [ClusterSystem-akka.actor.internal-dispatcher-3] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActorCoordinator] Singleton actor [akka://ClusterSystem/system/sharding/MyActorCoordinator/singleton] was terminated
[WARN] [05/04/2021 15:42:16.544] [ClusterSystem-akka.actor.internal-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/sharding/MyActor] MyActor: Trying to register to coordinator at [ActorSelection[Anchor(akka://ClusterSystem/),Path(/system/sharding/MyActorCoordinator/singleton/coordinator)]],but no ackNowledgement. Total [4] buffered messages. [Coordinator [Member(akka://ClusterSystem@127.0.0.1:2551,Exiting)] is reachable.]
[INFO] [05/04/2021 15:42:16.548] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Exiting completed
[INFO] [05/04/2021 15:42:16.549] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Shutting down...
[INFO] [05/04/2021 15:42:16.550] [ClusterSystem-akka.actor.internal-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Successfully shut down
[INFO] [05/04/2021 15:42:16.557] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Shutting down remote daemon.
[INFO] [05/04/2021 15:42:16.558] [ClusterSystem-akka.remote.default-remote-dispatcher-7] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remote daemon shut down; proceeding with flushing remote transports.
[INFO] [05/04/2021 15:42:16.573] [ClusterSystem-akka.remote.default-remote-dispatcher-9] [akka://ClusterSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.

谁能告诉我为什么会这样?

我知道 ShardAllocationStrategy 在 2.6.10 中从 2.6.9 改变了。

发行说明:- https://akka.io/blog/news/2020/10/09/akka-2.6.10-released 文档:- https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#shard-allocation

解决方法

您可以将 StartableAllocationStrategy 用于自定义 MyShardAllocationStrategy。此外,您需要将 shardAllocationStrategy 变量的类型更改为 LeastShardAllocationStrategy

完整代码供参考:

import akka.actor.{ActorLogging,ActorRef,ActorSystem,PoisonPill,Props}
import akka.cluster.Cluster
import akka.cluster.sharding.ShardCoordinator.{LeastShardAllocationStrategy,ShardAllocationStrategy,StartableAllocationStrategy}
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.cluster.sharding.{ClusterSharding,ClusterShardingSettings}
import scala.concurrent.Future
import scala.util.Random
object MyActorSharding extends App {
  class Actor extends akka.actor.Actor with ActorLogging {
    private var index = 0
    override def receive: Receive = {
      case x: Int =>
        index += x
      case "print" =>
        println("???????????????" + index)
        println(self.path.toString)
    }
  }
  class MyShardAllocationStrategy(shardAllocationStrategy: LeastShardAllocationStrategy)
    extends StartableAllocationStrategy {
    override def allocateShard(requester: ActorRef,shardId: ShardId,currentShardAllocations: Map[ActorRef,IndexedSeq[ShardId]]): Future[ActorRef] = {
      println("Allocate Shard ->>>>>>>>>>>>>>>>>>>>>..")
      shardAllocationStrategy.allocateShard(requester,shardId,currentShardAllocations)
    }
    override def rebalance(currentShardAllocations: Map[ActorRef,IndexedSeq[ShardId]],rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
      println("Rebalance Shard ->>>>>>>>>>>>>>>>...")
      shardAllocationStrategy.rebalance(currentShardAllocations,rebalanceInProgress)
    }
    override def start(): Unit = {
      shardAllocationStrategy.start(system)
    }
  }
  val extractEntityId: PartialFunction[Any,(String,Any)] = {
    case message =>
      (
        s"${message.toString} -> entityId -> ${new Random().nextInt(34)}",message
      )
  }
  val shardId: Any => String = x =>
    s"${x.toString} -> shardId -> ${new Random().nextInt(34)}"
  val actorProps: Props = Props[Actor]
  val system = ActorSystem("ClusterSystem")
  val cluster = Cluster(system)
  val clusterSharding = ClusterSharding(system)
  val actorRef = clusterSharding.start(
    "MyActor",actorProps,ClusterShardingSettings(system),extractEntityId,new MyShardAllocationStrategy(new LeastShardAllocationStrategy(1,1)),PoisonPill
  )
  actorRef ! 12
  actorRef ! 13
  actorRef ! 14
  actorRef ! "print"
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?