微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

组中的一个卡夫卡消费者始终拒绝协调员,但前提是Spark和Kafka都在EC2中

如何解决组中的一个卡夫卡消费者始终拒绝协调员,但前提是Spark和Kafka都在EC2中

我有一个Java应用程序尝试使用spark-streaming-kafka-0-10_2.11来使用EC2中Kafka 2.5.1集群的主题。它仅适用于Spark集群或独立安装 outside AWS:当Spark也托管在EC2中时,Kafka使用者组永远不会完全初始化。对于总共三个主题,只有两个消费者曾经连接,而第三个消费者反复拒绝组协调器为“不可用或无效”。

它总是相同的第三个主题,其使用者失败,但是第二个和第三个主题配置相同,并且均为空;它们之间的唯一区别是名称删除并重新创建第三个主题不会更改任何内容。忽略应用程序代码中的主题3(很难轻易解开前两个)会成功启动。

所有不同的Spark版本均为2.4.5,Google Guava JAR已从出厂的14.0.1更新为19.0,否则没有特殊配置。

Kafka是一个三节点EC2集群,每个节点托管一个代理,一个Zookeeper实例和一个Spark工作者。一切都在说话,并且可以与其他一切打通。 server.propertieslisteners配置为内部 DNS名称,而advertised.listeners是外部DNS名称

listeners=PLAINTEXT://ip-abc-def-ghi-jkl.region.compute.internal:9092
advertised.listeners=PLAINTEXT://ec2-mno-pqr-stu-vwx.region.compute.amazonaws.com:9092

失败的Spark应用程序从EC2内部启动:

2020-10-08/21:09:32.694/UTC org.apache.kafka.clients.consumer.ConsumerConfig INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [ip-(broker 1 private dns).region.compute.internal:9092]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = mygroup
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        Metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2,TLSv1.1,TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2020-10-08/21:09:32.843/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:32.844/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:33.098/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:33.100/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1,groupId=mygroup] discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1,groupId=mygroup] Revoking prevIoUsly assigned partitions []
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1,groupId=mygroup] (Re-)joining group
2020-10-08/21:09:39.155/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1,groupId=mygroup] Successfully joined group with generation 1
2020-10-08/21:09:39.159/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1,groupId=mygroup] Setting newly assigned partitions [partitions here]
2020-10-08/21:09:39.188/UTC org.apache.kafka.clients.consumer.internals.Fetcher INFO [Consumer clientId=consumer-1,groupId=mygroup] Resetting offset for partition station-data-19 to offset 1976.

(more offset resets; consumer 2 has also joined the group successfully between 21:09:32 and 21:09:39. No activity from consumer 3 yet,unlike launches from an external Spark. Consumer 3 spin-up starts next)

2020-10-08/21:09:39.200/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:39.205/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:39.213/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:39.214/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3,groupId=mygroup] discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-3,groupId=mygroup] Revoking prevIoUsly assigned partitions []
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3,groupId=mygroup] (Re-)joining group
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1,groupId=mygroup] Attempt to heartbeat Failed since group is rebalancing
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2,groupId=mygroup] Attempt to heartbeat Failed since group is rebalancing

(more heartbeat failures for consumers 1 and 2)

2020-10-08/21:10:09.254/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3,groupId=mygroup] Group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null) is unavailable or invalid,will attempt rediscovery
2020-10-08/21:10:09.330/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2,groupId=mygroup] Attempt to heartbeat Failed since group is rebalancing
2020-10-08/21:10:09.331/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1,groupId=mygroup] Attempt to heartbeat Failed since group is rebalancing
2020-10-08/21:10:09.377/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3,groupId=mygroup] discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)

在EC2之外使用Spark时,这种情况不会发生:所有三个消费者都发现协调员,或多或少同时加入小组,计算出他们的抵消额,然后就开始了比赛。但是,当将应用程序提交到EC2中的Spark集群时,只有前两个使用方成功地加入了该组。第三个使用者直到前两个使用者连接并重置其偏移量后才开始初始化,随后它发现组协调器,尝试与之对话(导致重新平衡,从而防止其他使用者心跳加快),并失败并确定是无效,然后再次找到同一协调人,重复广告恶心。

像这样的失败Spark提交和来自外部EC2的成功Spark提交之间唯一的显着配置差异是,后者bootstrap.servers必须指向代理的外部DNS名称。但是,内部应用程序启动失败,无论是指向经纪人的外部内部名称一个经纪人还是多个经纪人。

这是来自经纪人3的Kafka server.log,上面被确定为组协调者:

[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Dynamic Member with unkNown member id joins group mygroup in Empty state. Created a new member id consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 0 (__consumer_offsets-25) (reason: Adding new member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,136] INFO [GroupCoordinator 3]: Dynamic Member with unkNown member id joins group mygroup in PreparingRebalance state. Created a new member id consumer-1-63909784-c821-4903-a08a-98a250d49b19 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,128] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 1 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,146] INFO [GroupCoordinator 3]: Assignment received from leader for group mygroup for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Dynamic Member with unkNown member id joins group mygroup in Stable state. Created a new member id consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 1 (__consumer_offsets-25) (reason: Adding new member consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac with group instance id None) (kafka.coordinator.group.GroupCoordinator)

... (more unkNown member ids joining the group in PreparingRebalance)

[2020-10-08 21:14:37,756] INFO [GroupCoordinator 3]: Member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 in group mygroup has Failed,removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:37,757] INFO [GroupCoordinator 3]: Member consumer-1-63909784-c821-4903-a08a-98a250d49b19 in group mygroup has Failed,757] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 2 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:39,625] INFO [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 in group mygroup has Failed,removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 2 (__consumer_offsets-25) (reason: removing member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-58c8ca8c-2daa-46c9-964b-7be883193287 in group mygroup has Failed,removing it from the group (kafka.coordinator.group.GroupCoordinator)

... (more members failing and being removed)

[2020-10-08 21:14:47,759] INFO [GroupCoordinator 3]: Group mygroup with generation 3 is Now empty (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)

解决方法

这才是关键:

第三个使用者直到前两个使用者连接并重置其偏移量后才开始初始化

消费者3旋转时,前两个消费者几秒钟没有任何活动,此后心跳失败开始。

Spark驱动程序是具有两个vCPU内核的旧实例(m3.large)。我们成功的测试来自具有更多内核的最新计算机,并且当我们在测试计算机上使用taskset限制CPU可用性时,我们可以准确地重现该问题。允许spark-submit三个内核成功。

以前,对于我们来说,使用“简单的” Kafka 0.8+ API并不是一个问题,但是开始使用适用于0.10+的“新” Consumer API似乎需要组中每个消费者的核心。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。