微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

主题的偏移量如何在KafkaKafka_net中工作

如何解决主题的偏移量如何在KafkaKafka_net中工作

我有一个基本的生产者应用程序和一个消费者应用程序。如果我同时运行这两个程序,并且都开始专注于各自的主题,那么我的工作系统很好。我的想法是,如果我启动了生产者并发送了一条消息,那么我将能够启动消费者,并让其接受该消息。我错了。

除非两者都已启动并且正在运行,否则我会丢失消息(否则它们不会被消耗)。

我的消费类应用看起来像这样...

Uri uri = new Uri("http://localhost:9092");

KafkaOptions options = new KafkaOptions(uri);                
brokerRouter brokerRouter = new brokerRouter(options);
Consumer consumer = new Consumer(new ConsumerOptions(receivetopic,brokerRouter));
             

List<OffsetResponse> offset = consumer.GetTopicOffsetAsync(receivetopic,100000).Result;

IEnumerable<OffsetPosition> t = from x in offset select new OffsetPosition(x.PartitionId,x.Offsets.Max());
consumer.SetoffsetPosition(t.ToArray());
IEnumerable<KafkaNet.Protocol.Message> msgs = consumer.Consume();

foreach (KafkaNet.Protocol.Message msg in msgs)
{
    do some stuff here based on the message received
}

除非两行之间都有代码,否则每次启动应用程序时它都会从头开始。 什么是管理主题偏移的正确方法,以便断开连接后消费消息?

如果我跑步

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic chat-message-reply-XXX consumer-property fetch-size=40000000 --from-beginning 

我可以看到消息,但是当我将应用程序连接到该主题时,consumer.Consume()不会拾取它尚未看到的消息。我已经尝试过,并且没有运行上面的bat文件,看看是否有任何区别。当我查看consumer.SetoffsetPosition(t.ToArray())调用(特别是t)时,它表明偏移量是该主题所有消息的计数。

请帮助

解决方法

auto.offset.reset中的ConsumerOptions配置设置为earliest。消费者组启动消费消息时,它将从最新的偏移量开始消费,因为auto.offset.reset的默认值是最新的。

但是我现在查看了 kafka-net API,它没有AutoOffsetReset属性,并且在消费者中的配置似乎还不够。它还缺少方法摘要的文档。

我建议您使用 Confluent .NET Kafka Nuget软件包,因为它由 Confluent 本身拥有。

此外,为什么要调用GetTopicOffsets并在使用者中再次设置该偏移量。我认为在配置使用者时,您应该只使用Consume()开始阅读消息。

尝试一下:

static void Main(string[] args)
{
    var uri = new Uri("http://localhost:9092");

    var kafkaOptions = new KafkaOptions(uri);                
    var brokerRouter = new BrokerRouter(kafkaOptions);
    var consumerOptions = new ConsumerOptions(receivedTopic,brokerRouter);
    var consumer = new Consumer(consumerOptions);

    foreach (var msg in consumer.Consume())
    {
        var value = Encoding.UTF8.GetString(msg.Value);

        // Process value here
    }
}

此外,在KafkaOptionsConsumerOptions中启用日志,它们将为您提供很多帮助:

var kafkaOptions = new KafkaOptions(uri)
            {
                Log = new ConsoleLog()
            };
var consumerOptions = new ConsumerOptions(topic,brokerRouter)
            {
                Log = new ConsoleLog()
            });
,

我转而使用Confluent的C#.NET软件包,现在可以使用了。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。