如何解决如何在 kubernetes 部署中扩展 sarama 消费者组?
我正在尝试让一些消费者处理来自 kafka 的消息,我想实现 kubernetes 部署可扩展性以实现弹性消息处理能力。
我从 Sarama 官方指南 https://pkg.go.dev/github.com/Shopify/sarama#NewConsumerGroup 中找到了此代码:
package main
import (
"context"
"fmt"
)
type exampleConsumerGroupHandler struct{}
func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession,claim ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n",msg.Topic,msg.Partition,msg.Offset)
sess.MarkMessage(msg,"")
}
return nil
}
func main() {
config := NewTestConfig()
config.Version = V2_0_0_0 // specify appropriate version
config.Consumer.Return.Errors = true
group,err := NewConsumerGroup([]string{"localhost:9092"},"my-group",config)
if err != nil {
panic(err)
}
defer func() { _ = group.Close() }()
// Track errors
go func() {
for err := range group.Errors() {
fmt.Println("ERROR",err)
}
}()
// Iterate over consumer sessions.
ctx := context.Background()
for {
topics := []string{"my-topic"}
handler := exampleConsumerGroupHandler{}
// `Consume` should be called inside an infinite loop,when a
// server-side rebalance happens,the consumer session will need to be
// recreated to get the new claims
err := group.Consume(ctx,topics,handler)
if err != nil {
panic(err)
}
}
}
我有一些问题:
- 如何设置消费者组中的消费者数量?
- 如果我在 Pod 中部署这个程序,我可以安全地扩展它吗?我的意思是,假设一个程序正在运行,并且我将副本从 1 扩展到 2,另一个具有相同组 ID 的
NewConsumerGroup
调用是否可以完美运行而不会发生冲突?
提前致谢。
注意:我使用的是 Kafka 2.8,我听说 Sarama_cluster 包已弃用。
解决方法
提醒组不能超过主题分区数
缩放 pod 是使用消费者组的正确方法,使用相同的组名是正确的,但是我建议将其和代理地址提取到环境变量中,以便在部署时轻松更改它们
容器化代码无法使用 localhost 作为 Kafka 连接字符串,因为这将是 pod 本身
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。