微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 MirrorMaker 2.0 迁移消费者偏移量?

如何解决如何使用 MirrorMaker 2.0 迁移消费者偏移量?

在 Kafka 2.7.0 中,我使用 Mirromaker 2.0 作为 Kafka 连接器将所有主题从主 Kafka 集群复制到备份集群。

除了__consumer_offsets之外,所有主题都被完美复制。以下是连接配置:

{
    "name": "test-connector","config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector","topics.blacklist": "some-random-topic","replication.policy.separator": "","source.cluster.alias": "","target.cluster.alias": "","exclude.internal.topics":"false","tasks.max": "10","key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter","value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter","source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094","target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094","topics": "test-topic-from-primary,primary-kafka-connect-offset,primary-kafka-connect-config,primary-kafka-connect-status,__consumer_offsets"
    }
}

在类似的问题 here 中,接受的答案如下:

在您的 consumer.config添加

exclude.internal.topics=false

并将其添加到您的 producer.config 中:

client.id=__admin_client

我应该将这些添加到我的配置中的什么位置?

此处的 Connector Configuration Properties 没有名为 client.id属性,但我已将 exclude.internal.topics 的值设置为 false

这里有我遗漏的东西吗?

更新

我了解到 Kafka 2.7 及更高版本支持使用 here 中提到的 MirrorCheckpointTask 自动消费者偏移同步。

我为此创建了一个具有以下配置的连接器:

{
    "name": "mirror-checkpoint-connector","config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector","sync.group.offsets.enabled": "true","topics": "__consumer_offsets"
    }
}

仍然没有帮助。 这是正确的方法吗?有什么需要吗?

解决方法

您不想复制 connsumer_offsets。由于各种原因,从 src 到目标集群的偏移量不会相同。

MirrorMaker2 提供了进行偏移平移的能力。它将使用从 src 集群生成的转换偏移量填充目标集群。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。