如何解决如何使JMS onMessage方法异步
我正在研究一种侦听一个IBM MQ的JMS POC。 MQConnectionFactory
已用于与MQ连接,并使用onMessage()
方法来监听输入消息。收到消息后,将使用MarshallingMessageConverter
将其转换为所需的对象类型,并在执行更多操作后将其推入另一个队列(响应)。
到目前为止,它运行良好。但是,似乎队列中的所有消息都被同步使用,例如仅在完成第一条消息的处理后,第二条消息才进入onMessage()
方法。
Q1。我如何使其异步以提高性能?
第二季度。是否建议使其异步?
下面是我的代码段:
config.xml:
<bean id="oxmMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<property name="marshaller" ref="jaxbMapper" />
<property name="unmarshaller" ref="jaxbMapper" />
<property name="targettype">
<util:constant
static-field="org.springframework.jms.support.converter.MessageType.TEXT" />
</property>
</bean>
<bean id="jmsResponseSenderA" class="org.springframework.jms.core.jmstemplate">
<property name="connectionFactory" ref="jmsMQConnectionFactoryA" />
<property name="defaultDestination" ref="jmsResponseQueue" />
<property name="messageConverter" ref="oxmMessageConverter" />
</bean>
<bean id="jmsMQConnectionFactoryA" class="com.ibm.mq.jms.MQConnectionFactory">
<property name="transportType">
<util:constant
static-field="com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CM_CLIENT" />
</property>
<property name="clientReconnectOptions">
<util:constant
static-field="com.ibm.msg.client.wmq.common.CommonConstants.WMQ_CLIENT_RECONNECT_Q_MGR" />
</property>
<property name="queueManager" value="${queue.manager.A}" />
<property name="CCDTURL" value="${ccdt.url}"></property>
</bean>
监听器类:
public class ListenerServiceImpl implements MessageListener {
public void onMessage(Message message) {
// action1
// action2
// action3
jmsResponseSender.convertAndSend(response);
}
}
解决方法
JMS onMessage()
实现的MessageListener
方法已经已经异步了。您的问题是您只有1个MessageConsumer
。您需要配置多个使用者,以便他们可以使用自己的MessageListener
同时使用消息。
看起来您正在使用 spring-jms
。如果您还使用 mq-jms-spring-boot-starter
,则可以配置连接池。您可以在 https://github.com/ibm-messaging/mq-jms-spring
对于您的情况,假设您想要一个包含 5 个连接的池:
ibm.mq.pool.enabled=true
ibm.mq.pool.maxConnections=5
当您使用池化时,您需要确保您的 onMessage
方法符合可重入性。在我的测试中,我有一个带有并发集的 @JmsListener
,并且休眠以便我可以看到并行处理。
@JmsListener(destination = "${queue.name}",concurrency = "3-5")
public void receiveString(String comment) {
String fid = Integer.toString(rand.nextInt(99));
logger.info(fid);
logger.info(fid + " ========================================");
logger.info(fid + " Received string message is: " + comment);
logger.info(fid + " Sleeping");
try {
Thread.sleep(10000);
} catch (InterruptedException e) { }
logger.info(fid + " Waking" );
logger.info(fid + " ========================================");
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。