如何解决使用 Azure 函数和服务总线的 MassTransit 异常处理和错误队列
我在 Azure Functions v3 中使用服务总线作为传输运行 MassTransit 7.1.8。当使用者第一次抛出异常时,会在我的服务总线命名空间中创建两个主题:masstransit~fault
(没有任何订阅)和 masstransit~fault--mynamespace~myevent--
(有一个订阅 Fault-MassTransit
)。>
Azure 函数 Startup.cs
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddMassTransitForAzureFunctions(cfg =>
{
cfg.AddConsumersFromNamespaceContaining<MyConsumer>();
});
}
Azure 函数
[FunctionName("MyFunction")]
public async Task Run(
[ServiceBusTrigger("my-topic","my-subscription",Connection = "AzureWebJobsServiceBus")] Message message,ILogger log)
{
try
{
await _receiver.HandleConsumer<MyConsumer>("my-topic",message,default(CancellationToken));
}
catch (System.Exception ex)
{
}
}
消费者
public class MyConsumer : IConsumer<MyEvent>
{
public async Task Consume(ConsumeContext<MyEvent> context)
{
throw new System.Exception("Very bad things happened inside the consumer.");
}
}
我还必须在 masstransit~fault
上手动创建订阅才能开始在那里接收消息。
我的问题:
-
MyFunction
正在吞下异常以防止将消息发送到 DLQ。这个可以吗?吞下异常感觉很奇怪,但另一方面,MassTransit 会将故障发送到错误队列,这样它就不会丢失。 - 文档谈到创建一个
_error
队列(适当的前缀),但我没有看到任何这样的队列或主题,只有masstransit~fault
和masstransit~fault--mynamespace~myevent--
。我猜masstransit~fault--mynamespace~myevent--
是否相当于 RabbitMQ_error
的服务总线? -
masstransit~fault--mynamespace~myevent--
有消息发送给它,但是当我查看订阅时它是空的。订阅有一个默认过滤器,可以接受所有内容。我是否需要针对该主题进行一些额外配置? - 有没有办法重命名上面的错误主题(可能同时是 changed)?
解决方法
Azure Functions 并没有真正使用 MassTransit 作为交通工具。但是,可以配置某些方面来模仿相同的行为。在队列上设置 DeliveryCount = 1
将阻止 Azure 重新传递消息(这会导致重试五次,默认值为 5
)。
回答各种问题:
- 这是交付计数,MassTransit 发布故障。
- 可以配置重试策略(如下图)
- 使用 Azure Functions 时没有_错误队列,因为 MassTransit 不是交通工具。
- 除非您为故障配置消费者,否则它不会被消费。
- 您可以使用实体名称格式化程序(也如下所示)更改故障主题。
为 Azure 函数配置总线重试:
.AddMassTransitForAzureFunctions(cfg =>
{
cfg.SetKebabCaseEndpointNameFormatter();
cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();
},(context,cfg) =>
{
cfg.MessageTopology.SetEntityNameFormatter(new CleanEntityNameFormatter(cfg.MessageTopology.EntityNameFormatter));
cfg.UseMessageRetry(r => r.Intervals(200,500,1000,2000));
})
使用 Azure Functions,当重试用完时,Azure 会将消息移动到死信队列(逻辑队列是队列本身的一部分,在 Azure 门户中可见)。
指定的实体名称格式化程序将更改故障的主题名称。它必须用于每个将产生/消耗故障的总线实例,以确保名称匹配。
using MassTransit;
using MassTransit.Internals.Extensions;
using MassTransit.Topology;
public class CleanEntityNameFormatter :
IEntityNameFormatter
{
readonly IEntityNameFormatter _entityNameFormatter;
public CleanEntityNameFormatter(IEntityNameFormatter entityNameFormatter)
{
_entityNameFormatter = entityNameFormatter;
}
public string FormatEntityName<T>()
{
if (typeof(T).ClosesType(typeof(Fault<>),out Type[] types))
{
var name = (string) typeof(IEntityNameFormatter)
.GetMethod("FormatEntityName")
.MakeGenericMethod(types)
.Invoke(_entityNameFormatter,Array.Empty<object>());
var suffix = typeof(T).Name.Split('`').First();
return $"{name}-{suffix}";
}
return _entityNameFormatter.FormatEntityName<T>();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。