如何解决taskManager 无法连接到新选择的 jobmanager
我有一个在 minikube 上运行的 Flink 集群:1 个作业管理器和 3 个任务管理器。 我正在使用 Kubernetes Ha 服务来处理 jobmanager leader 选举。
当我试图杀死 jobmanager 以模拟崩溃时,taskmanager 无法连接 新的 jobmanager 它总是尝试连接之前终止的 jobmanager 的 IP 地址。
这里有一个例外:
2021-05-05 12:14:28.126 [flink-akka.actor.default-dispatcher-3] WARN akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-7 - Association with remote system [akka.tcp://flink@172.17.0.7:6123] has Failed,address is Now gated for [50] ms. Reason: [Association Failed with [akka.tcp://flink@172.17.0.7:6123]] Caused by: [java.net.noroutetoHostException: No route to host]
2021-05-05 12:14:28.131 [flink-akka.actor.default-dispatcher-3] ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler - Unhandled exception.
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:61)
at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:53)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not connect to rpc endpoint under address akka.tcp://flink@172.17.0.7:6123/user/rpc/resourcemanager_0.
at org.apache.flink.runtime.rpc.akka.AkkaRpcService.lambda$resolveActorAddress$10(AkkaRpcService.java:570)
at scala.concurrent.java8.FuturesConvertersImpl$CF$$anon$1.accept(FutureConvertersImpl.scala:59)
... 5 common frames omitted
Caused by: org.apache.flink.runtime.rpc.exceptions.RpcConnectionException: Could not connect to rpc endpoint under address akka.tcp://flink@172.17.0.7:6123/user/rpc/resourcemanager_0.
... 7 common frames omitted
Caused by: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@172.17.0.7:6123/),Path(/user/rpc/resourcemanager_0)]
at akka.actor.ActorSelection.$anonfun$resolveOne$1(ActorSelection.scala:71)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:81)
at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:120)
at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:114)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:80)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:556)
at akka.actor.DeadLetteractorRef.specialHandle(ActorRef.scala:593)
at akka.actor.DeadLetteractorRef.$bang(ActorRef.scala:582)
at akka.remote.RemoteActorRefProvider$RemoteDeadLetteractorRef.$bang(RemoteActorRefProvider.scala:104)
at akka.remote.EndpointWriter.postStop(Endpoint.scala:606)
at akka.actor.Actor.aroundPostStop(Actor.scala:536)
at akka.actor.Actor.aroundPostStop$(Actor.scala:536)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:458)
at akka.actor.dungeon.FaultHandling.finishTerminate(FaultHandling.scala:210)
at akka.actor.dungeon.FaultHandling.terminate(FaultHandling.scala:172)
at akka.actor.dungeon.FaultHandling.terminate$(FaultHandling.scala:142)
at akka.actor.ActorCell.terminate(ActorCell.scala:429)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:533)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:549)
at akka.dispatch.MailBox.processAllSystemMessages(MailBox.scala:283)
at akka.dispatch.MailBox.run(MailBox.scala:224)
at akka.dispatch.MailBox.exec(MailBox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。