如何解决RabbitMQ 捕获队列长度异常
背景信息
第一个被称为 powershell 脚本,它通知系统数据已准备好从客户 1 的数据库中提取。这被插入到队列 1 中。
当第二个微服务获得准备就绪的信息时,它正在侦听队列一。它连接到数据库并开始用需要发送的数据填充队列 2。
第三个微服务正在列到队列 2 中,并开始将排队的数据发送到第三方网关。 (在我的系统上会有一个限制,我可以向网关发出多少请求,我还没有被告知这是什么,因为网关服务器仍然处于开发阶段。)
我的问题是队列 2 可能由于某些原因而过载
- 服务客户立即准备就绪。
- 网关关闭,队列未清空。
所以我试图确保如果队列 2 已满,第二个微服务会回到队列 1,它需要重试。我担心的是,如果微服务两个 nack 回到队列一个(不删除它),它会立即再次尝试。我需要在这里实施某种退避。我想知道我是否应该在微服务 2 中实现指数回退,而不是将其回退。但是在某些时候,如果网关关闭了几天,我需要停止它我不确定这是否应该永远继续尝试。
我已经开始执行一些检查,在队列 2 开始加载之前检查它有多少空间。
我以前使用过 RabbitMQ,但我从来没有做过所有设置。
我做了什么
我已将队列配置为 10 个非常低的限制,我正在尝试测试 Polly 重试和断路器。
如您所见,其最大长度为 10。
var args = new Dictionary<string,object>
{
{"x-max-length",_queueSettings.MaxLength},{"x-overflow","reject-publish"},{"x-max-length-bytes",_queueSettings.MaxLengthInBytes}
};
using var channel = _connection.CreateModel();
channel.QueueDeclare(queue: _queueSettings.Name,durable: _queueSettings.Durable,exclusive: false,autoDelete: false,arguments: args);
问题是当它运行时它不会失败。它只是没有向队列中添加任何内容,并且所有内容都可能只是被转储到死信中。我需要它返回到我的应用程序,以便我可以将它设置为停止发送(断路器),直到队列被清除,以便我可以再次添加内容。否则我只会失去请求。
不带波莉发送
channel.ConfirmSelect();
// uses a 5 second timeout
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // persistent
_logger.LogTrace("Publishing report to RabbitMQ: {RecordId}",@jsonReportData.RecordId);
channel.Basicpublish(exchange: "",routingKey: _queueSettings.Name,basicProperties: properties,body: body);
与波莉一起发送
var policy = RetryPolicy
.Handle<OperationInterruptedException>()
.WaitAndRetry(1,retryAttempt => TimeSpan.FromSeconds(Math.Pow(2,retryAttempt)),(ex,time) =>
{
_logger.LogWarning(ex,"Could not publish RecordId: {RecordId} after {Timeout}s ({ExceptionMessage})",@jsonReportData.RecordId,$"{time.TotalSeconds:n1}",ex.Message);
});
channel.ConfirmSelect();
// uses a 5 second timeout
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
policy.Execute(() =>
{
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // persistent
_logger.LogTrace("Publishing report to RabbitMQ: {RecordId}",@jsonReportData.RecordId);
channel.Basicpublish(exchange: "",body: body);
});
我发誓它在我上次尝试添加最大长度时抛出了一个异常,但现在它只是停在 10 并且忽略了其他。
听
我也尝试添加一个侦听器,但在发布到队列后我没有从 Rabbit 获取任何信息也无济于事。
channel.ConfirmSelect();
channel.BasicAcks += (sender,ea) =>
{
// code when message is confirmed
int i = 1;
};
channel.BasicNacks += (sender,ea) =>
{
int i = 1;
};
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。