如何解决在运行时为Rebus生成消息处理程序
我要使用Rebus订阅和处理Rabbitmq中的消息时遇到了一个问题。第三方程序集中定义了多种消息类型,新的消息类型将定期添加到该程序集中。
我需要以某种方式使Rebus订阅并处理所有这些消息类型,并将它们转发(发布)到另一个Rabbitmq实例。我的服务本质上是转发消息,并在这样做时添加自定义的重载标头。
问题是我不想为每种消息类型生成处理程序类(因为功能是相同的,而不管消息类型如何)。我也不想每次在第三方程序集中添加新消息类型时更新代码(编写新的处理程序类)。
我尝试使用TypeBuilder为通过反射找到的每种类型动态创建消息处理程序类,但是感觉有点混乱,所以我希望有另一种方法吗?
public void SubscribeAndHandleMessages()
{
// These types will be determined runtime by using reflection but thats omitted for clarity
var messageTypes = new List<Type>(){typeof(MessageA),typeof(MessageB)};
var activator = new BuiltinHandlerActivator();
Configure.With(activator)
.Transport(t => t.UseRabbitMq(_rabbitConnectionString,"MyQueue"))
.Start();
//Subscribe and register handlers
foreach (var type in messageTypes)
{
activator.Bus.Subscribe(type); //This works,I can see the queue subscribing to the correct topics
activator.Handle<type>(async (bus,context,message) => //This doesnt work since type is not kNown at compile time
{
//Forwarding to another rabbit instance,same handling for all types of messages
});
}
}
解决方法
建立必要的订阅后,您只需要能够处理收到的所有消息。
使用Rebus的最佳方法是避免普通的消息处理管道(反序列化=>查找处理程序=>调度),而是以原始格式(即,以“传输消息”格式)处理消息。
您可以使用Rebus的传输消息转发功能来做到这一点。有了它,一个100%的通用消息处理程序可能看起来像这样:
Configure.With(activator)
.Transport(t => t.UseInMemoryTransport(new InMemNetwork(),"router-tjek"))
.Routing(r => r.AddTransportMessageForwarder(async transportMessage =>
{
var headers = transportMessage.Headers; //< Dictionary<string,string>
var body = transportMessage.Body; //< byte[]
// handle the message here,e.g.
// by deserializing the body into a JObject,// storing the bytes in a database,or by
// forwarding the message to another queue
return // appropriate forward action here
}))
.Start();
您可以在此处了解更多信息:Transport message forwarding
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。