如何解决K8 HA 模式下的 Flink Fencing 错误
我正在使用 Flink 1.12 并尝试通过 Kubernetes 集群 (AKS) 将作业管理器保留在 HA 中。我正在运行 2 个作业管理器和 2 个任务管理器 Pod。
我面临的问题是任务经理无法找到作业经理领导。
原因是他们试图为 jobmanager(这是一个 clusterIP 服务)点击 K8“服务”,而不是点击领导者的 pod IP。因此,有时jobmanager Service 会解决对备用jobmanager 的注册调用,这使得Taskmanger 无法找到jobmanager leader。
{
"apiVersion": "v1","data": {
"address": "akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2","sessionId": "ee14c446-82b0-45ab-b470-ee445ddd0e0f"
},"kind": "ConfigMap","Metadata": {
"annotations": {
"control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"e6a42a4f-235e-4b97-93c6-40f4b987f56b\",\"leaseDuration\":15.000000000,\"acquireTime\":\"2021-02-16T05:13:37.365000Z\",\"renewTime\":\"2021-02-16T05:22:17.386000Z\",\"leaderTransitions\":105}"
},"creationTimestamp": "2021-02-15T16:13:26Z","labels": {
"app": "flinktestk8cluster","configmap-type": "high-availability","type": "flink-native-kubernetes"
},"name": "flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader","namespace": "default","resourceVersion": "46202881","selfLink": "/api/v1/namespaces/default/configmaps/flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader","uid": "1d5ca6e3-dc7e-4fb7-9fab-c1bbb956cda9"
}
}
这里的 flink-jobmanager
是 jobmanager 的 K8 服务的名称。
有没有办法解决这个问题?如何让jobmanager在leader文件中写入podIP而不是服务名?
这是个例外
2021-02-12 06:15:53,849 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Registration at ResourceManager Failed due to an error
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144,RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration,Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.DefaultMessagedispatcher.dispatch(Endpoint.scala:101) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:999) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.MailBox.processMailBox(MailBox.scala:258) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.MailBox.run(MailBox.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.MailBox.exec(MailBox.scala:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144,Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcmessage(FencedAkkaRpcActor.java:67) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 9 more
2021-02-12 06:15:53,849 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Pausing and re-attempting registration in 10000 ms
解决方法
问题在于,您希望在使用备用 JobManager 时为 JobManager pod 提供唯一地址。因此,您不得配置组件用于相互通信的服务。相反,您应该使用 pod IP 作为其 jobmanager.rpc.address
来启动您的 JobManager pod。
为了用它的 IP 启动每个 JobManager pod,你不能配置一个包含 Flink 配置的 ConfigMap,因为它对于每个 JobManager pod 都是相同的配置。相反,您需要将以下代码段添加到您的 JobManager 部署中:
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: ${MY_POD_IP}
通过这种方式,我们告诉每个 JobManager pod 将 pod 的 IP 用于 jobmanager.rpc.address
,它也写入 K8s HA 服务。如果这样做,那么在 K8s 集群内部运行的每个 K8s HA 服务用户都可以找到当前的领导者。
接下来,您需要为要使用 K8s HA 服务的所有部署进行配置。您可以通过扩展 FLINK_PROPERTIES
环境变量来实现:
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: ${MY_POD_IP}
kubernetes.cluster-id: foobar
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
将此添加到您的 JobManager pod 定义和
env:
- name: FLINK_PROPERTIES
value: |
kubernetes.cluster-id: foobar
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: hdfs:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
到您的 TaskManager 部署应该可以解决问题。
可以在此处找到完整的部署 yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.12.1
args: ["jobmanager"]
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: ${MY_POD_IP}
kubernetes.cluster-id: foobar
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image,change if necessary
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.12.1
args: ["taskmanager"]
env:
- name: FLINK_PROPERTIES
value: "kubernetes.cluster-id: foobar\n
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory\n
high-availability.storageDir: file:///flink/recovery\n
restart-strategy: fixed-delay\n
restart-strategy.fixed-delay.attempts: 10"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
- containerPort: 6121
name: metrics
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image,change if necessary
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。