微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何将 RabbitListeners 绑定到 CloudAMQP?

如何解决如何将 RabbitListeners 绑定到 CloudAMQP?

我目前在我的两个应用程序(网络/工作者)之间实现 RabbitMQ 消息传递时遇到问题。我的 RabbitMQ 服务托管在 CloudamQP(Heroku 插件)上。但是,我声明的任何 @RabbitListener 似乎都在尝试连接到 localhost 而不是云服务。

将以下组件添加到我的工作应用程序后:

@Service
public class TaskConsumer {
    @RabbitListener(queues = "worker.rpc.requests",containerFactory = "rabbitListenerContainerFactory")
    public String fetch(String p) {
        return p;
    }
}

我遇到以下错误

2021-07-05 14:38:23.006  INFO 18840 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2021-07-05 14:38:32.145  WARN 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception,processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2021-07-05 14:38:32.145  INFO 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@32c8d668: tags=[[]],channel=null,ackNowledgeMode=AUTO local queue size=0

如何绑定 RabbitListener 使其连接到 AMQP 环境?这是我的配置:

@Configuration
@EnableRabbit
public class RabbitConfig {

    protected final String workerQueueName = "worker.rpc.requests";
    protected final String routingKeyName = "rpc";
    protected final String directExcName = "worker.exchange";

    @Bean
    public ConnectionFactory connectionFactory() {
        final URI ampqUrl;
        try {
            ampqUrl = new URI(getEnvOrThrow("CLOUdamQP_URL"));
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }

        final CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(ampqUrl.getUserInfo().split(":")[0]);
        factory.setPassword(ampqUrl.getUserInfo().split(":")[1]);
        factory.setHost(ampqUrl.getHost());
        factory.setPort(ampqUrl.getPort());
        factory.setVirtualHost(ampqUrl.getPath().substring(1));

        try {
            factory.getRabbitConnectionFactory().setUri(ampqUrl);
        } catch (URISyntaxException e) {
            e.printstacktrace();
        } catch (NoSuchAlgorithmException e) {
            e.printstacktrace();
        } catch (KeyManagementException e) {
            e.printstacktrace();
        }

        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());

        SimpleMessageListenerContainer container = factory
                .createListenerContainer();
        factory.setConcurrentConsumers(50);
        factory.setMaxConcurrentConsumers(100);
        container.setStartConsumerMinInterval(3000);
        container.setQueues(queue());
        factory.setMaxConcurrentConsumers(5);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setRoutingKey(this.workerQueueName);
        template.setDefaultReceiveQueue(this.workerQueueName);
        return template;
    }

    @Bean
    public Queue queue() {
        return new Queue(this.workerQueueName);
    }

    @Bean
    public DirectExchange direct() {
        return new DirectExchange(this.directExcName);
    }

    @Bean
    public Binding binding(DirectExchange direct,Queue autoDeleteQueue1) {
        return BindingBuilder.bind(autoDeleteQueue1)
                .to(direct)
                .with(this.routingKeyName);
    }

    /**
     * required for executing adminstration functions against an AMQP broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    private static String getEnvOrThrow(String name) {
        final String env = getenv(name);
        if (env == null) {
            throw new IllegalStateException("Environment variable [" + name + "] is not set."); 
        }
        return env;
    }

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?