微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

我的 MirrorMaker2 如何镜像消费者偏移量,为什么这么慢?

如何解决我的 MirrorMaker2 如何镜像消费者偏移量,为什么这么慢?

我正在使用 Mirror Maker 2 从一个 AWS MSK 集群迁移到另一个。源集群运行 Kafka 2.4.1.1,目标集群运行 2.7。

我的 MirrorMaker2 正在使用 Kafka 2.7 SDK 的 M5.large EC2 实例上运行。

我希望将所有主题和消费者偏移量从 $SOURCE_CLUSTER 复制到 $TARGET_CLUSTER

我的 testTopic 似乎已正确复制(包括消费者组偏移量)。我相信这一点,因为当我使用 kafkacattestTopic 消费 $SOURCE_CLUSTER 然后,后来从 $TARGET_CLUSTER 消费时,消息不会在目标上重新消费,因为偏移量已于 $TARGET_CLUSTER 更新(由 MirrorMaker),因此不会重新使用该消息。

但是,当我检查一些较大的主题时,似乎偏移量正在以每秒 2-3 次的速度更新,正如我在下面尝试演示的那样。

这里我描述的是 MyConsumerGroup 上的 $SOURCE_CLUSTER 组。

[ec2-user@ip-x-x-x-x ~]$ k/bin/kafka-consumer-groups.sh --bootstrap-server $SOURCE_CLUSTER --describe --group MyConsumerGroup

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
MyConsumerGroup MyLargetopic           4          772259          772259          0               <some consumer id> rdkafka
MyConsumerGroup MyLargetopic           8          821326          821326          0               <some consumer id> rdkafka
MyConsumerGroup MyLargetopic           9          786077          786077          0               <some consumer id> rdkafka
MyConsumerGroup MyLargetopic           7          844962          844964          2               <some consumer id> rdkafka
MyConsumerGroup MyLargetopic           0          784451          784451          0               <some consumer id> rdkafka
MyConsumerGroup MyLargetopic           3          845682          845682          0               <some consumer id> rdkafka
MyConsumerGroup MyLargetopic           6          827488          827488          0               <some consumer id> rdkafka
MyConsumerGroup MyLargetopic           2          843823          843823          0               <some consumer id> rdkafka
MyConsumerGroup MyLargetopic           5          818343          818343          0               <some consumer id> rdkafka
MyConsumerGroup MyLargetopic           1          802264          802264          0               <some consumer id> rdkafka

这里我描述的是 MyConsumerGroup 上的 $TARGET_CLUSTER 组。

[ec2-user@ip-x-x-x-x ~]$ k/bin/kafka-consumer-groups.sh --bootstrap-server $TARGET_CLUSTER --describe --group MyConsumerGroup

Consumer group 'MyConsumerGroup' has no active members.

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
MyConsumerGroup MyLargetopic           7          832171          288324          -543847         -               -               -
MyConsumerGroup MyLargetopic           6          814062          260857          -553205         -               -               -
MyConsumerGroup MyLargetopic           5          801912          254791          -547121         -               -               -
MyConsumerGroup MyLargetopic           4          758982          249167          -509815         -               -               -
MyConsumerGroup MyLargetopic           9          770665          238708          -531957         -               -               -
MyConsumerGroup MyLargetopic           8          806443          267920          -538523         -               -               -
MyConsumerGroup MyLargetopic           3          831331          283500          -547831         -               -               -
MyConsumerGroup MyLargetopic           2          831028          250147          -580881         -               -               -
MyConsumerGroup MyLargetopic           1          789425          272097          -517328         -               -               -
MyConsumerGroup MyLargetopic           0          768326          245568          -522758         -               -               -

上述命令的后续运行表明 LOG-END-OFFSET 每秒递增 2-3。

我的 mm2.properties 文件是:

[ec2-user@ip-x-x-x-x ~]$ cat k/config/mm2.properties
clusters = source,target
source.bootstrap.servers=$SOURCE_CLUSTER
target.bootstrap.servers=$TARGET_CLUSTER

# Source and target clusters configurations.
source.config.storage.replication.factor = 3
target.config.storage.replication.factor = 3

source.offset.storage.replication.factor = 3
target.offset.storage.replication.factor = 3

source.status.storage.replication.factor = 3
target.status.storage.replication.factor = 3

source->target.enabled = true
target->source.enabled = false

source->target.sync.group.offsets.enabled=true
source->target.producer.override.**compression.type=gzip
source->target.emit.heartbeats.enabled = true
source->target.emit.checkpoints.enabled = true

source.producer.override.batch.size = 327680

# Mirror maker configurations.
offset-syncs.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
checkpoints.topic.replication.factor = 3

topics = .*
groups = .*
replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
source.cluster.producer.enable.idempotence = true
target.cluster.producer.enable.idempotence = true

tasks.max = 1
replication.factor = 3
refresh.topics.enabled = true

source.producer.compression.type=gzip
target.producer.compression.type=gzip
source.producer.connections.max.idle.ms=180000

producer.enable.idempotence=true

# Enable heartbeats and checkpoints.

# customize as needed
sync.topic.acls.enabled = false

谁能解释为什么 LOG-END-OFFSET 在我的目标集群上上升如此缓慢?没有消费者连接到 $TARGET_CLUSTER,因此所有更新都通过 MirrorMaker2。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。