微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

K8 HA 模式下的 Flink Fencing 错误

如何解决K8 HA 模式下的 Flink Fencing 错误

我正在使用 Flink 1.12 并尝试通过 Kubernetes 集群 (AKS) 将作业管理器保留在 HA 中。我正在运行 2 个作业管理器和 2 个任务管理器 Pod。

我面临的问题是任务经理无法找到作业经理领导。

原因是他们试图为 jobmanager(这是一个 clusterIP 服务)点击 K8“服务”,而不是点击领导者的 pod IP。因此,有时jobmanager Service 会解决对备用jobmanager 的注册调用,这使得Taskmanger 无法找到jobmanager leader

这里是jobmanager-leader文件内容

{
    "apiVersion": "v1","data": {
        "address": "akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_2","sessionId": "ee14c446-82b0-45ab-b470-ee445ddd0e0f"
    },"kind": "ConfigMap","Metadata": {
        "annotations": {
            "control-plane.alpha.kubernetes.io/leader": "{\"holderIdentity\":\"e6a42a4f-235e-4b97-93c6-40f4b987f56b\",\"leaseDuration\":15.000000000,\"acquireTime\":\"2021-02-16T05:13:37.365000Z\",\"renewTime\":\"2021-02-16T05:22:17.386000Z\",\"leaderTransitions\":105}"
        },"creationTimestamp": "2021-02-15T16:13:26Z","labels": {
            "app": "flinktestk8cluster","configmap-type": "high-availability","type": "flink-native-kubernetes"
        },"name": "flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader","namespace": "default","resourceVersion": "46202881","selfLink": "/api/v1/namespaces/default/configmaps/flinktestk8cluster-bc7b6f9aa8b0a111e1c50b10155a85be-jobmanager-leader","uid": "1d5ca6e3-dc7e-4fb7-9fab-c1bbb956cda9"
    }
}

这里的 flink-jobmanager 是 jobmanager 的 K8 服务的名称

有没有办法解决这个问题?如何让jobmanager在leader文件中写入podIP而不是服务名?

这是个例外

2021-02-12 06:15:53,849 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Registration at ResourceManager Failed due to an error
java.util.concurrent.CompletionException: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144,RemoteRpcInvocation(registerTaskExecutor(TaskExecutorRegistration,Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:661) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_275]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_275]
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1044) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:263) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.OnComplete.internal(Future.scala:261) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.DefaultMessagedispatcher.dispatch(Endpoint.scala:101) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:999) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.MailBox.processMailBox(MailBox.scala:258) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.MailBox.run(MailBox.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.MailBox.exec(MailBox.scala:235) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message RemoteFencedMessage(954fe694bb4d268a2e32b4497e944144,Time))) sent to akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0 because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcmessage(FencedAkkaRpcActor.java:67) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 9 more
2021-02-12 06:15:53,849 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Pausing and re-attempting registration in 10000 ms

解决方法

问题在于,您希望在使用备用 JobManager 时为 JobManager pod 提供唯一地址。因此,您不得配置组件用于相互通信的服务。相反,您应该使用 pod IP 作为其 jobmanager.rpc.address 来启动您的 JobManager pod。

为了用它的 IP 启动每个 JobManager pod,你不能配置一个包含 Flink 配置的 ConfigMap,因为它对于每个 JobManager pod 都是相同的配置。相反,您需要将以下代码段添加到您的 JobManager 部署中:

env:
  - name: MY_POD_IP
    valueFrom:
      fieldRef:
        fieldPath: status.podIP
  - name: FLINK_PROPERTIES
    value: |
      jobmanager.rpc.address: ${MY_POD_IP}

通过这种方式,我们告诉每个 JobManager pod 将 pod 的 IP 用于 jobmanager.rpc.address,它也写入 K8s HA 服务。如果这样做,那么在 K8s 集群内部运行的每个 K8s HA 服务用户都可以找到当前的领导者。

接下来,您需要为要使用 K8s HA 服务的所有部署进行配置。您可以通过扩展 FLINK_PROPERTIES 环境变量来实现:

env:
  - name: MY_POD_IP
    valueFrom:
      fieldRef:
        fieldPath: status.podIP
  - name: FLINK_PROPERTIES
    value: |
      jobmanager.rpc.address: ${MY_POD_IP}
      kubernetes.cluster-id: foobar
      high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
      high-availability.storageDir: hdfs:///flink/recovery
      restart-strategy: fixed-delay
      restart-strategy.fixed-delay.attempts: 10

将此添加到您的 JobManager pod 定义和

env:
  - name: FLINK_PROPERTIES
    value: |
      kubernetes.cluster-id: foobar
      high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
      high-availability.storageDir: hdfs:///flink/recovery
      restart-strategy: fixed-delay
      restart-strategy.fixed-delay.attempts: 10

到您的 TaskManager 部署应该可以解决问题。

可以在此处找到完整的部署 yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: flink:1.12.1
        args: ["jobmanager"]
        env:
          - name: MY_POD_IP
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
          - name: FLINK_PROPERTIES
            value: |
              jobmanager.rpc.address: ${MY_POD_IP}
              kubernetes.cluster-id: foobar
              high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
              high-availability.storageDir: file:///flink/recovery
              restart-strategy: fixed-delay
              restart-strategy.fixed-delay.attempts: 10
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image,change if necessary
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink:1.12.1
        args: ["taskmanager"]
        env:
          - name: FLINK_PROPERTIES
            value: "kubernetes.cluster-id: foobar\n
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory\n
high-availability.storageDir: file:///flink/recovery\n
restart-strategy: fixed-delay\n
restart-strategy.fixed-delay.attempts: 10"
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        - containerPort: 6121
          name: metrics
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image,change if necessary

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。