微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如果将两个集成流传递给一个通用 MessageHandler 类,它是否是线程安全的?在 Spring 集成 DSL 中

如何解决如果将两个集成流传递给一个通用 MessageHandler 类,它是否是线程安全的?在 Spring 集成 DSL 中

我有两个集成流 都从 Apache Kafka 接收消息

first IntegrationFlow - 在输入通道中,Consumer1(concurrency=4) 读取topic_1

第二个IntegrationFlow - 在输入通道中,Consumer2(concurrency=4) 读取topic_2

但是这两个IntegrationFlows,将消息发送到输出通道,其中指定了一个公共类 MyMessageHandler

像这样:

@Bean
public IntegrationFlow sendFromQueueFlow1(MyMessageHandler message) {
    return IntegrationFlows
            .from(Kafka
                    .messageDrivenChannelAdapter(consumerFactory1,"topic_1")
                    .configureListenerContainer(configureListenerContainer_priority1)
                    )
            .handle(message)
            .get();
}


@Bean
public IntegrationFlow sendFromQueueFlow2(MyMessageHandler message) {
    return IntegrationFlows
            .from(Kafka
                    .messageDrivenChannelAdapter(consumerFactory2,"topic_2")
                    .configureListenerContainer(configureListenerContainer_priority2)
                    )
            .handle(message)
            .get();
}

class MyMessageHandler方法 send(message),这个方法将消息进一步传递给另一个服务

class MyMessageHandler {
            
    protected void handleMessageInternal(Message<?> message)
    {
        String postResponse = myService.send(message); // remote service calling
        msgsstatisticsService.sendMessage(message,postResponse);
        // *******
    }
}

在每个 IntegrationFlow 中,4 个消费者线程正在工作( 一共8个线程),他们都去一个类MyMessageHandler, 一种方法 send()

会出现什么问题? 两个IntegrationFlow,当他们将消息传递给一个公共类时,他们会看到对方吗???我需要在 MyMessageHandler 类中提供线程安全吗???我需要在 send() 方法前面加上 synchronized 吗???

但是如果我们创建第三个 IntegrationFlow 呢?

所以只有一个 IntegrationFlow 可以通过自身将消息传递到 MyMessageHandler 类?那么它会是线程安全的吗?例子:

@Bean
public IntegrationFlow sendFromQueueFlow1() {
return IntegrationFlows
        .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory1,"topic_1")
                .configureListenerContainer(configureListenerContainer_priority1)
        )
        .channel(**SOME_CHANNEL**())
        .get();

}

@Bean
public IntegrationFlow sendFromQueueFlow2() {
return IntegrationFlows
        .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory2,"topic_2")
                .configureListenerContainer(configureListenerContainer_priority2)
        )
        .channel(**SOME_CHANNEL**())
        .get();

}

@Bean
public MessageChannel **SOME_CHANNEL**() {

    DirectChannel channel = new DirectChannel();
    return channel;
 }

@Bean
public IntegrationFlow sendALLFromQueueFlow(MyMessageHandler message) {

return IntegrationFlows
        .from(**SOME_CHANNEL**())
        .handle(message)
        .get();
}

解决方法

您需要使处理程序代码线程安全。

在整个方法中使用 synchronized 您将有效地禁用并发。

最好使用线程安全技术 - 没有可变字段或使用有限的同步块,仅在关键代码周围。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?