如何解决如果将两个集成流传递给一个通用 MessageHandler 类,它是否是线程安全的?在 Spring 集成 DSL 中
我有两个集成流 都从 Apache Kafka 接收消息
first IntegrationFlow - 在输入通道中,Consumer1(concurrency=4) 读取topic_1
第二个IntegrationFlow - 在输入通道中,Consumer2(concurrency=4) 读取topic_2
但是这两个IntegrationFlows,将消息发送到输出通道,其中指定了一个公共类 MyMessageHandler
像这样:
@Bean
public IntegrationFlow sendFromQueueFlow1(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1,"topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.handle(message)
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2(MyMessageHandler message) {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2,"topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.handle(message)
.get();
}
class MyMessageHandler 有方法 send(message),这个方法将消息进一步传递给另一个服务
class MyMessageHandler {
protected void handleMessageInternal(Message<?> message)
{
String postResponse = myService.send(message); // remote service calling
msgsstatisticsService.sendMessage(message,postResponse);
// *******
}
}
在每个 IntegrationFlow 中,4 个消费者线程正在工作( 一共8个线程),他们都去一个类MyMessageHandler, 一种方法 send()
会出现什么问题? 两个IntegrationFlow,当他们将消息传递给一个公共类时,他们会看到对方吗???我需要在 MyMessageHandler 类中提供线程安全吗???我需要在 send() 方法前面加上 synchronized 吗???
但是如果我们创建第三个 IntegrationFlow 呢?
所以只有一个 IntegrationFlow 可以通过自身将消息传递到 MyMessageHandler 类?那么它会是线程安全的吗?例子:
@Bean
public IntegrationFlow sendFromQueueFlow1() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory1,"topic_1")
.configureListenerContainer(configureListenerContainer_priority1)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public IntegrationFlow sendFromQueueFlow2() {
return IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory2,"topic_2")
.configureListenerContainer(configureListenerContainer_priority2)
)
.channel(**SOME_CHANNEL**())
.get();
}
@Bean
public MessageChannel **SOME_CHANNEL**() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
public IntegrationFlow sendALLFromQueueFlow(MyMessageHandler message) {
return IntegrationFlows
.from(**SOME_CHANNEL**())
.handle(message)
.get();
}
解决方法
您需要使处理程序代码线程安全。
在整个方法中使用 synchronized
您将有效地禁用并发。
最好使用线程安全技术 - 没有可变字段或使用有限的同步块,仅在关键代码周围。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。