如何解决Artemis qpid消费者无法与经纪人建立联系
下面是Qpid客户端的配置,使用Camel上下文连接Artemis经纪人。
上下文文件已按预期启动,但是在这种情况下,qpid客户端无法连接到Apache Artemis 2.14.0代理。
qpid客户端无法连接代理服务器,并且存在线程阻止。我不确定客户端实际发生了什么。
但是我们注意到,对于另一项服务,我们使用单个spring camel上下文文件,其中所有队列和引用均按预期工作。
生产者使用camelContextAware实现使用生产者模板来推送消息数据。
队列中的生产者和消费者是否有可能在这里发生冲突?
我所看到的只是用户端的一些线程块。
common-context.xml
...
<bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
<property name="remoteURI" value="failover:(amqp://host1:5672,amqp://host2:5672)"/>
</bean>
<bean id="pooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory"
init-method="start" destroy-method="stop" >
<property name="maxConnections" value="3" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="concurrentConsumers" value="3" />
</bean>
<bean id="env-queue" class="org.apache.camel.component.amqp.AMQPComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
...
service-context.xml 注意:在这个单独的xml文件中,即使使用骆驼2.20.0版本,所有上下文都得到了两个骆驼上下文
...
< import resource="classpath:/common-context.xml"/>
...
<route id="messageProcessor" shutdownRoute="Default" shutdownRunningTask="CompleteAllTasks">
<from uri="env-queue:queue:order-demo-queue" id="OrderInfo"/>
<camel:setProperty propertyName="orderType">
<camel:jsonpath trim="true">$.orderType</camel:jsonpath>
</camel:setProperty>
...
日志:
2020-10-05 23:25:21,078: Thread-1 DEBUG (DefaultCamelContext.java:3993) - Starting consumer (order: 1000) on route: messageProcessor
2020-10-05 23:25:21,079: Thread-1 DEBUG (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=orderventprocesor,type=consumers,name=JmsConsumer(0x3fed52a3)
2020-10-05 23:25:21,079: Thread-1 DEBUG (DefaultConsumer.java:144) - Starting consumer: Consumer[env-queue://queue:order-demo-queue]
2020-10-05 23:25:21,138: Thread-1 DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-10-05 23:25:21,140: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://host1:5672 in-progress
2020-10-05 23:25:21,145: Thread-1 DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
2020-10-05 23:25:21,145: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@51524bce
2020-10-05 23:25:21,146: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@708b22ca
2020-10-05 23:25:21,147: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@314f82d7
2020-10-05 23:25:21,148: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@73719eb3
2020-10-05 23:25:21,148: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@30beb728
2020-10-05 23:25:21,155: FailoverProvider: async work thread INFO (FailoverProvider.java:751) - Connection attempt:[1] to: amqp://host1:5672 failed
2020-10-05 23:25:21,156: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://host2:5672 in-progress
2020-10-05 23:25:21,157: FailoverProvider: async work thread INFO (FailoverProvider.java:751) - Connection attempt:[1] to: amqp://host2:5672 failed
2020-10-05 23:25:21,167: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[2] to: amqp://host1:5672 in-progress
线程转储(如下所示)
Camel (orderventprocesor) thread #5 - JmsConsumer[order-demo-queue]" #45 daemon prio=5 os_prio=0 tid=0x00007faf395b3000 nid=0x58e3 waiting for monitor entry [0x00007faf1290c000]
INFO | jvm 1 | 2020/10/06 00:01:38 | java.lang.Thread.State: BLOCKED (on object monitor)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.AbstractJmsListeningContainer.getSharedConnection(AbstractJmsListeningContainer.java:492)
INFO | jvm 1 | 2020/10/06 00:01:38 | - waiting to lock <0x00000007ba976500> (a java.lang.Object)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1170)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
INFO | jvm 1 | 2020/10/06 00:01:38 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
INFO | jvm 1 | 2020/10/06 00:01:38 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
INFO | jvm 1 | 2020/10/06 00:01:38 | at java.lang.Thread.run(Thread.java:748)
已更新:
当我将remoteUri更改为简单的amqp://localhost:5672
时,我在netty-resolver上看到一个异常
java.lang.NoClassDefFoundError: "io/netty/resolver/AddressResolverGroup"
包括netty-resolver 4.1.51 jar之后,现在我看到以下异常。
2020-10-09 15:11:04.944 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR JmsConnection:165 - Failed to connect to remote at: amqp://localhost:5672
2020-10-09 15:11:04.946 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR DefaultJmsMessageListenerContainer:934 - Could not refresh JMS Connection for destination 'order-demo-queue' - retrying using FixedBackOff{interval=5000,currentAttempts=0,maxAttempts=unlimited}. Cause: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
javax.jms.JMSException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:176)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:212)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:199)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createProviderConnection(JmsPoolConnectionFactory.java:651)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:108)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:105)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.addObject(GenericKeyedObjectPool.java:1221)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createJmsPoolConnection(JmsPoolConnectionFactory.java:705)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:237)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:232)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:413)
at org.springframework.jms.listener.AbstractJmsListeningContainer.refreshSharedConnection(AbstractJmsListeningContainer.java:398)
at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:915)
at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:890)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1061)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderIOException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport.createOrPassthroughFatal(ProviderExceptionSupport.java:46)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:308)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:162)
... 19 more
Caused by: java.lang.AbstractMethodError: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:50)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:70)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:65)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:56)
at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:151)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:230)
... 20 more
2020-10-09 15:11:09.949 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR JmsConnection:165 - Failed to connect to remote at: amqp://localhost:5672
2020-10-09 15:11:09.950 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR DefaultJmsMessageListenerContainer:934 - Could not refresh JMS Connection for destination 'order-demo-queue' - retrying using FixedBackOff{interval=5000,currentAttempts=1,maxAttempts=unlimited}. Cause: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
javax.jms.JMSException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:176)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:212)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:199)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createProviderConnection(JmsPoolConnectionFactory.java:651)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:108)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:105)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.addObject(GenericKeyedObjectPool.java:1221)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createJmsPoolConnection(JmsPoolConnectionFactory.java:705)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:237)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:232)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:413)
at org.springframework.jms.listener.AbstractJmsListeningContainer.refreshSharedConnection(AbstractJmsListeningContainer.java:398)
at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:915)
at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:890)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1061)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderIOException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport.createOrPassthroughFatal(ProviderExceptionSupport.java:46)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:308)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:162)
... 19 more
Caused by: java.lang.AbstractMethodError: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:50)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:70)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:65)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:56)
at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:151)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:230)
... 20 more
解决方法
Qpid客户端正尝试使用配置的故障转移机制进行连接。客户端似乎无法连接,这可能是由于没有代理服务正在运行或根本无法访问,或者是由于配置错误(例如,与非SSL接受者的SSL连接)或相反。如果为了符合JMS规范要求而没有连接,则客户端将阻止连接启动。您需要更深入地了解为什么无法联系到您的经纪人。
,在使用amqp的故障转移配置时,客户端在主机之间交替而不打印任何异常日志。
将ConnectionFactory,remoteUrl更新为使用单个主机(amqp:// localhost:5672),并注意到该异常是在netty.io.resolver上。
我包括了以下依赖项,现在连接已按预期建立。
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
<version>4.1.51.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.51.Final</version>
</dependency>
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。