如何解决如何使用 MassTransit 和 RabbitMQ 在微服务之间共享消息定义?
我是微服务的新手,我在使用 MassTransit 框架将发布者与订阅者连接起来时遇到了一些问题。我在创建用户时在其中一项服务中生成了一个示例集成事件。定义如下:
public sealed record UserCreatedIntegrationEvent : IntegrationEvent
{
public UserCreatedIntegrationEvent(Guid id,string login,string firstName,string lastName,string mailAddress)
: base(id,nameof(UserCreatedIntegrationEvent))
{
Login = login;
FirstName = firstName;
LastName = lastName;
MailAddress = mailAddress;
}
public string Login { get; }
public string FirstName { get; }
public string LastName { get; }
public string MailAddress { get; }
}
该消息包含在 User.Application 项目中。消息正在消息代理总线上正确发布。现在我需要在其他服务中接收一个事件,所以我需要知道 UserCreatedIntegrationEvent 的定义,反序列化它等。我可以参考 User.Application 项目,但在我看来它可能会导致一些问题,一般来说这种方法我认为违反了微服务自治规则。另一种解决方案是在接收方服务中复制消息定义。然后我复制 UserCreatedIntegrationEvent 并将新粘贴的事件连接到特定的处理程序:
public abstract class IntegrationEventHandler<TIntegrationEvent> : IConsumer<TIntegrationEvent>
where TIntegrationEvent : IntegrationEvent
{
protected ConsumeContext<TIntegrationEvent> ConsumeContext { get; private set; }
public async Task Consume(ConsumeContext<TIntegrationEvent> context)
{
ConsumeContext = context;
await HandleAsync(context.Message);
}
public abstract Task HandleAsync(TIntegrationEvent @event);
}
...
public sealed class UserCreatedIntegrationEventHandler : IntegrationEventHandler<UserCreatedIntegrationEvent>
{
public override async Task HandleAsync(UserCreatedIntegrationEvent @event)
{
throw new System.NotImplementedException();
}
}
问题是,即使我复制了事件定义,所以完全一样,没有调用相应处理程序中的 HandleAsync 方法。但是当我尝试直接从其他服务引用 User.Application.UserCreatedIntegrationEvent 时,该方法被正确调用,但我并不喜欢这个解决方案。如何正确解决问题?我是否应该在其他服务之间复制定义(如何使用 MassTransit 将它们连接起来),也许我应该将合同移至其他一些包并从这两个服务中引用这些包?负责代理依赖注册的代码:
internal static IServiceCollection AddRabbitMQ(this IServiceCollection services,IConfiguration configuration,bool useHealthCheck,Assembly consumersAssembly)
{
var settingsSection = configuration.GetSection(RabbitMQSettingsSectionKey);
var rabbitMQSettings = settingsSection.Get<RabbitMQSettings>();
services
.AddMassTransit(configurator =>
{
configurator.AddConsumers(consumersAssembly);
configurator.SetKebabCaseEndpointNameFormatter();
configurator.UsingRabbitMq((context,busFactoryConfigurator) =>
{
busFactoryConfigurator
.Host(rabbitMQSettings.HostName,rabbitMQSettings.VirtualHostName,hostConfigurator =>
{
hostConfigurator.Username(rabbitMQSettings.UserName);
hostConfigurator.Password(rabbitMQSettings.Password);
});
busFactoryConfigurator.ConfigureEndpoints(context);
});
})
.AddMassTransitHostedService()
.Configure<RabbitMQSettings>(settingsSection)
.AddScoped<IIntegrationEventPublisher,EventBus>();
if (useHealthCheck)
{
services
.AddHealthChecks()
.AddRabbitMQ(string.Format(RabbitMQConnectionStringPattern,rabbitMQSettings.HostName),name: RabbitConnectionCheckName,tags: new[] { DefaultRabbitMQTag });
}
return services;
}
感谢您的帮助!
解决方法
如果您将消息类从发布者复制到消费者,则需要确保消息类型相同,包括命名空间。这是文档中的 clearly highlighted。如果类型不匹配,就可以解释为什么服务没有使用它。
无论是复制文件还是使用合同共享 NuGet 包,都可以。两者都被广泛使用。 MassTransit 拥有 guidelines 不断发展的消息合同,以确保向后兼容。
,来自docs:
重要
MassTransit 使用完整的类型名称,包括命名空间,用于 消息合同。在两个独立的消息类型中创建相同的消息类型时 项目,命名空间必须匹配,否则消息将不会 消耗了。
常见的问题是,虽然合约按名称和所有属性匹配,但命名空间不同。消费者将尝试绑定到包含命名空间的交换。
您可以在 RMQ 管理 UI 中轻松验证这一点,方法是查看使用者队列及其具有的绑定。很可能您有两种不同的交换器:一种是所有已发布的消息都发送到(然后消失),另一种是消费者队列绑定到的交换器,它保持空闲状态。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。