如何解决pubSubSource:两次接收相同的消息 说明如何复制
说明
- 我在 Kafka Connect 分布式模式下有一个 pubSubSource 连接器,它只是从 PubSub 订阅中读取并写入 Kafka 主题。问题是,即使我将一条消息发布到 GCP PubSub,我也会在我的 Kafka 主题中收到此消息两次。
如何复制
-
部署 Kafka 和 Kafka 连接
-
使用以下
pubSubSource
配置创建连接器:curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "pubSubSource","config": { "connector.class":"com.google.pubsub.kafka.source.CloudPubSubSourceConnector","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter","tasks.max":"1","cps.subscription":"pubsub-test-sub","kafka.topic":"kafka-sub-topic","cps.project":"test-project123","gcp.credentials.file.path":"/tmp/gcp-creds/account-key.json" } }'
-
以下是 Kafka 连接配置:
"plugin.path": "/usr/share/java,/usr/share/confluent-hub-components" "key.converter": "org.apache.kafka.connect.json.JsonConverter" "value.converter": "org.apache.kafka.connect.json.JsonConverter" "key.converter.schemas.enable": "false" "value.converter.schemas.enable": "false" "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter" "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter" "config.storage.replication.factor": "1" "offset.storage.replication.factor": "1" "status.storage.replication.factor": "1"
-
使用以下命令将消息发布到 PubSub 主题:
gcloud pubsub topics publish test-topic --message='{"someKey":"someValue"}'
-
从目标 Kafka 主题读取消息:
/usr/bin/kafka-console-consumer --bootstrap-server xx.xxx.xxx.xx:9092 --topic kafka-topic --from-beginning # Output {"someKey":"someValue"} {"someKey":"someValue"}
为什么会这样,是不是我做错了什么?
解决方法
我在 https://cloud.google.com/pubsub/docs/faq 找到了以下信息,您似乎遇到了同样的问题。您可以尝试生成大消息,看看结果是否相同?
来自链接的详细信息:
为什么重复消息太多? Pub/Sub 保证至少一次消息传递,这意味着偶尔会出现重复。但是,高重复率可能表明客户端未在配置的 ack_deadline_seconds 内确认消息,并且 Pub/Sub 正在重试消息传递。这可以在请求订阅的监控指标 pubsub.googleapis.com/subscription/pull_ack_message_operation_count 和推送订阅的 pubsub.googleapis.com/subscription/push_request_count 中观察到。在 /response_code 中查找提升的 expired 或 webhook_timeout 值。如果有很多小消息,这种情况尤其可能发生,因为 Pub/Sub 可能会在内部对消息进行批处理,并且部分确认的批处理将被完全重新传送。
另一种可能是订阅者没有确认某些消息,因为处理这些特定消息的代码路径失败,并且从未进行 Acknowledge 调用;或者推送端点永远不会响应或响应错误。
如何检测重复消息? Pub/Sub 为每条消息分配一个唯一的 message_id,可用于检测订阅者收到的重复消息。但是,这不会允许您检测由对同一数据的多个发布请求产生的重复项。检测这些将需要发布者提供唯一的消息标识符。如需进一步讨论,请参阅 Pub/Sub I/O。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。