微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

RabbitMQ:发布者确认中的奇怪行为

如何解决RabbitMQ:发布者确认中的奇怪行为

我是 RabbitMQ 的新手,我使用 Publisher Confirm 来确保消息成功传递。

但是我面临着一个奇怪的行为,那就是当我发布一条消息并获取一个 sequenceNumber 函数通道时。BasicAcks 被触发 n 次,其中 n 是 DeliveryTag,所以如果此消息的 DeliveryTag 是 5 函数通道.BasicAcks 被解雇了 5 次?!!

这是我的代码

            IModel channel = _customrabbitMQ.GetChannel();
            IBasicProperties properties = _customrabbitMQ.GetBasicProperties();
            ulong sequenceNumber = channel.NextPublishSeqNo;

         
            message.SequenceNumber = sequenceNumber.ToString();
            _context.Messages.Update(message);
            await _context.SaveChangesAsync();

            try
            {
                _customrabbitMQ.AddOutstandingConfirm(sequenceNumber,message.Id);
                channel.Basicpublish(message.ExchangeName,message.RoutingKey,properties,Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)));
                Console.WriteLine("Message Published");
            }
            catch (Exception ex)
            {
                // Todo: log to custom logger here
                Console.WriteLine($"Error => {ex.Message}");
            }

            channel.BasicAcks += async (sender,ea) =>
            {
                try
                {
                    Console.WriteLine("Message Confirmed");

                    using var scope = _provider.CreateScope();
                    var _context = scope.ServiceProvider.GetService<Data.DataContext>();
                    var _customrabbitMQ = scope.ServiceProvider.GetService<CustomrabbitMQ>();

                    Guid messageId = _customrabbitMQ.GetoutstandingConfirm(ea.DeliveryTag);

                    Message message = await _context.Messages.Where(m => m.Id == messageId).FirstOrDefaultAsync();
                    message.Status = MessageStatuses.INQUEUE;
                    _context.Messages.Update(message);
                    await _context.SaveChangesAsync();

                    _customrabbitMQ.RemoveOutstandingConfirm(ea.DeliveryTag);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error => {ex.Message}");
                }
            };

            channel.BasicNacks += (sender,ea) =>
            {
                Console.WriteLine("Message Not Confirmed");
                _customrabbitMQ.RemoveOutstandingConfirm(ea.DeliveryTag);
            };
        }

那么,为什么会发生这种情况以及如何阻止它并使其仅确认一次?

提前致谢。

解决方法

我认为问题在于每次发布消息时都会初始化一个侦听器。 因此,当您使用 DT = 1 发布消息时,您通过添加 channel.BasicAck += fun 创建了一个侦听器,而当使用 DT = 2 添加另一条消息时,您创建了另一个侦听器,因此当 DT_2 上的确认返回时将被推送给 2 个侦听器,这就解释了为什么您会收到具有相同数量的交付标签的确认。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。