微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在运行时为Rebus生成消息处理程序

如何解决在运行时为Rebus生成消息处理程序

我要使用Rebus订阅和处理Rabbitmq中的消息时遇到了一个问题。第三方程序集中定义了多种消息类型,新的消息类型将定期添加到该程序集中。

我需要以某种方式使Rebus订阅并处理所有这些消息类型,并将它们转发(发布)到另一个Rabbitmq实例。我的服务本质上是转发消息,并在这样做时添加自定义的重载标头。

问题是我不想为每种消息类型生成处理程序类(因为功能是相同的,而不管消息类型如何)。我也不想每次在第三方程序集中添加新消息类型时更新代码(编写新的处理程序类)。

我尝试使用TypeBuilder为通过反射找到的每种类型动态创建消息处理程序类,但是感觉有点混乱,所以我希望有另一种方法吗?

下面的代码概述了即使代码未编译,我希望达到的目标。

public void SubscribeAndHandleMessages()
        {
            // These types will be determined runtime by using reflection but thats omitted for clarity
            var messageTypes = new List<Type>(){typeof(MessageA),typeof(MessageB)}; 

            var activator = new BuiltinHandlerActivator();

            Configure.With(activator)
                .Transport(t => t.UseRabbitMq(_rabbitConnectionString,"MyQueue"))
                .Start();

            //Subscribe and register handlers
            foreach (var type in messageTypes)
            {
                activator.Bus.Subscribe(type); //This works,I can see the queue subscribing to the correct topics
                activator.Handle<type>(async (bus,context,message) => //This doesnt work since type is not kNown at compile time
                {
                    //Forwarding to another rabbit instance,same handling for all types of messages
                });
            }
        }

解决方法

建立必要的订阅后,您只需要能够处理收到的所有消息。

使用Rebus的最佳方法是避免普通的消息处理管道(反序列化=>查找处理程序=>调度),而是以原始格式(即,以“传输消息”格式)处理消息。

您可以使用Rebus的传输消息转发功能来做到这一点。有了它,一个100%的通用消息处理程序可能看起来像这样:

Configure.With(activator)
    .Transport(t => t.UseInMemoryTransport(new InMemNetwork(),"router-tjek"))
    .Routing(r => r.AddTransportMessageForwarder(async transportMessage =>
    {
        var headers = transportMessage.Headers; //< Dictionary<string,string>
        var body = transportMessage.Body;       //< byte[]

        // handle the message here,e.g.
        // by deserializing the body into a JObject,// storing the bytes in a database,or by
        // forwarding the message to another queue
        return // appropriate forward action here
    }))
    .Start();

您可以在此处了解更多信息:Transport message forwarding

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?