微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

达到水印值时,Spring AMQP客户端挂起

如何解决达到水印值时,Spring AMQP客户端挂起

当RabbitMQ服务器达到水印级别时,尝试向RabbitMQ服务器发送消息的客户端将处于等待状态。我希望Spring AMQP客户端在等待一段时间(500毫秒)后超时。请在下面的堆栈跟踪中找到:-

 "pool-3-thread-1" #17 prio=5 os_prio=0 tid=0x000000002b7ea800 nid=0x5750 in Object.wait() [0x000000002d9fe000]
       java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000071b4daeb8> (a com.rabbitmq.utility.BlockingValueOrException)
        at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:73)
        - locked <0x000000071b4daeb8> (a com.rabbitmq.utility.BlockingValueOrException)
        at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
        - locked <0x000000071b4daeb8> (a com.rabbitmq.utility.BlockingValueOrException)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
        at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1540)
        at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1190)
        - locked <0x000000071b4b1380> (a java.lang.Object)
        at com.sun.proxy.$Proxy42.txCommit(UnkNown Source)
        at org.springframework.amqp.rabbit.connection.RabbitUtils.commitIfNecessary(RabbitUtils.java:141)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:2322)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.lambda$send$3(RabbitTemplate.java:1005)
        at org.springframework.amqp.rabbit.core.RabbitTemplate$$Lambda$122/1529091234.doInRabbit(UnkNown Source)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2147)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2106)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2058)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:1004)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1110)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:1101)

我正在使用以下XML配置创建ConnectionFactory bean:-

<bean id="connectionFactory"
        class="com.example.CustomCachingConnectionFactory">
        <constructor-arg value="#{messagingProperties['mq.hostname']}" />
        <property name="virtualHost" value="#{messagingProperties['mq.virtual-host']}" />
        <property name="username" value="#{messagingProperties['mq.username']}" />
        <property name="password" value="#{messagingProperties['mq.password']}" />
        <property name="channelCacheSize" value="25" />
    </bean>

自定义连接工厂:-

public class CustomCachingConnectionFactory extends CachingConnectionFactory {


    private com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory;

    public CustomCachingConnectionFactory(@Nullable String hostname){
        super(hostname);
    }

    @Override
    public ConnectionFactory getRabbitConnectionFactory() {
        rabbitConnectionFactory = super.getRabbitConnectionFactory();
        rabbitConnectionFactory.useNio();
        return rabbitConnectionFactory;
    }

    @postconstruct
    public void init() {
        rabbitConnectionFactory.setNioParams(getNioParams());
    }

    private NioParams getNioParams(){
        NioParams nioParams = new NioParams();
        nioParams.setWriteEnqueuingTimeoutInMs(500);
        return nioParams;
    }
}

请建议如何阻止AMQP客户端无限期等待RabbitMQ服务器响应。

amqp客户端版本-5.7.3

Spring Rabbit版本-2.2.1.RELEASE spring amqp版本-2.2.1.RELEASE 弹簧芯-5.2.1。释放

全线程转储-https://gist.github.com/CODESTHN/7eedd6881b575977a0e7d69d7228325f

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。