如何解决我的 MirrorMaker2 如何镜像消费者偏移量,为什么这么慢?
我正在使用 Mirror Maker 2 从一个 AWS MSK 集群迁移到另一个。源集群运行 Kafka 2.4.1.1,目标集群运行 2.7。
我的 MirrorMaker2 正在使用 Kafka 2.7 SDK 的 M5.large EC2 实例上运行。
我希望将所有主题和消费者偏移量从 $SOURCE_CLUSTER
复制到 $TARGET_CLUSTER
。
我的 testTopic
似乎已正确复制(包括消费者组偏移量)。我相信这一点,因为当我使用 kafkacat
从 testTopic
消费 $SOURCE_CLUSTER
然后,后来从 $TARGET_CLUSTER
消费时,消息不会在目标上重新消费,因为偏移量已于 $TARGET_CLUSTER
更新(由 MirrorMaker),因此不会重新使用该消息。
但是,当我检查一些较大的主题时,似乎偏移量正在以每秒 2-3 次的速度更新,正如我在下面尝试演示的那样。
这里我描述的是 MyConsumerGroup
上的 $SOURCE_CLUSTER
组。
[ec2-user@ip-x-x-x-x ~]$ k/bin/kafka-consumer-groups.sh --bootstrap-server $SOURCE_CLUSTER --describe --group MyConsumerGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
MyConsumerGroup MyLargetopic 4 772259 772259 0 <some consumer id> rdkafka
MyConsumerGroup MyLargetopic 8 821326 821326 0 <some consumer id> rdkafka
MyConsumerGroup MyLargetopic 9 786077 786077 0 <some consumer id> rdkafka
MyConsumerGroup MyLargetopic 7 844962 844964 2 <some consumer id> rdkafka
MyConsumerGroup MyLargetopic 0 784451 784451 0 <some consumer id> rdkafka
MyConsumerGroup MyLargetopic 3 845682 845682 0 <some consumer id> rdkafka
MyConsumerGroup MyLargetopic 6 827488 827488 0 <some consumer id> rdkafka
MyConsumerGroup MyLargetopic 2 843823 843823 0 <some consumer id> rdkafka
MyConsumerGroup MyLargetopic 5 818343 818343 0 <some consumer id> rdkafka
MyConsumerGroup MyLargetopic 1 802264 802264 0 <some consumer id> rdkafka
这里我描述的是 MyConsumerGroup
上的 $TARGET_CLUSTER
组。
[ec2-user@ip-x-x-x-x ~]$ k/bin/kafka-consumer-groups.sh --bootstrap-server $TARGET_CLUSTER --describe --group MyConsumerGroup
Consumer group 'MyConsumerGroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
MyConsumerGroup MyLargetopic 7 832171 288324 -543847 - - -
MyConsumerGroup MyLargetopic 6 814062 260857 -553205 - - -
MyConsumerGroup MyLargetopic 5 801912 254791 -547121 - - -
MyConsumerGroup MyLargetopic 4 758982 249167 -509815 - - -
MyConsumerGroup MyLargetopic 9 770665 238708 -531957 - - -
MyConsumerGroup MyLargetopic 8 806443 267920 -538523 - - -
MyConsumerGroup MyLargetopic 3 831331 283500 -547831 - - -
MyConsumerGroup MyLargetopic 2 831028 250147 -580881 - - -
MyConsumerGroup MyLargetopic 1 789425 272097 -517328 - - -
MyConsumerGroup MyLargetopic 0 768326 245568 -522758 - - -
上述命令的后续运行表明 LOG-END-OFFSET
每秒递增 2-3。
我的 mm2.properties
文件是:
[ec2-user@ip-x-x-x-x ~]$ cat k/config/mm2.properties
clusters = source,target
source.bootstrap.servers=$SOURCE_CLUSTER
target.bootstrap.servers=$TARGET_CLUSTER
# Source and target clusters configurations.
source.config.storage.replication.factor = 3
target.config.storage.replication.factor = 3
source.offset.storage.replication.factor = 3
target.offset.storage.replication.factor = 3
source.status.storage.replication.factor = 3
target.status.storage.replication.factor = 3
source->target.enabled = true
target->source.enabled = false
source->target.sync.group.offsets.enabled=true
source->target.producer.override.**compression.type=gzip
source->target.emit.heartbeats.enabled = true
source->target.emit.checkpoints.enabled = true
source.producer.override.batch.size = 327680
# Mirror maker configurations.
offset-syncs.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
checkpoints.topic.replication.factor = 3
topics = .*
groups = .*
replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
source.cluster.producer.enable.idempotence = true
target.cluster.producer.enable.idempotence = true
tasks.max = 1
replication.factor = 3
refresh.topics.enabled = true
source.producer.compression.type=gzip
target.producer.compression.type=gzip
source.producer.connections.max.idle.ms=180000
producer.enable.idempotence=true
# Enable heartbeats and checkpoints.
# customize as needed
sync.topic.acls.enabled = false
谁能解释为什么 LOG-END-OFFSET
在我的目标集群上上升如此缓慢?没有消费者连接到 $TARGET_CLUSTER
,因此所有更新都通过 MirrorMaker2。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。