动态添加队列使用者春季问题

如何解决动态添加队列使用者春季问题

我正在尝试以排他模式将新队列添加到现有侦听器,但是出现问题。 代码

@GetMapping(value = "/consumer")
    public void consumer() {
        Set<String> queue = new HashSet<>(Arrays.asList("Test queue1","Test queue2","Test queue3"));
//        for (int i = 0; i < 10; i++) {
            addQueuetoListener("Test container1",queue);
//        }
    }
public void addQueuetoListener(String containerId,String queueName) {
        SimpleMessageListenerContainer listener = (SimpleMessageListenerContainer)
                rabbitListenerEndpointRegistry.getListenerContainer(containerId);
        if (Objects.nonNull(listener)) {
            String[] queueNames = listener.getQueueNames();
            System.out.println("Queue Names: " + Arrays.toString(queueNames) + " going to Add: " + queueName);
            System.out.println("search result for queue name: " + queueName + "     -->" + Arrays.binarySearch(queueNames,queueName));
            //do not add same queue name to listener
            if (Arrays.binarySearch(queueNames,queueName) < 0) {
                System.out.println("Adding queue: " + queueName);
                listener.addQueueNames(queueName);
                System.out.println("added queue: " + queueName);
            }
        }
    }

    public void addQueuetoListener(String containerId,Set<String> queueName) {
        for (String queue : queueName) {
            addQueuetoListener(containerId,queue);
        }
    }

兔子监听器代码

@RabbitListener(id = "Test container1",queues = "#{rabbitMQConsumer.getEntriesQueueName()}",containerFactory = "consumerBatchContainerFactory")
    public void receiveMessageBatch(List<Message> messages) throws InterruptedException,IOException {
        for (Message message : messages) {
            Thread.sleep(30000);
//      ObjectMapper jsonObjectMapper = new ObjectMapper();
//      Employee employee = jsonObjectMapper.readValue(Arrays.toString(message.getBody()),Employee.class);
            String queueName = (String) message.getMessageProperties().getHeaders().get("queueName");
            System.out.println("RabbitMQConsumer.receiveMessage QueueName:" + queueName);
//      System.out.println("Recieved Message From RabbitMQ: " + employee + "      QueueName:"+ queueName);
        }
    }

    public List<String> getEntriesQueueName() {
        return Collections.emptyList();
//        return Arrays.asList("Test queue1","Test queue3");
    }

错误:当我设置listener.setIsExclusive(true);

Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=403,reply-text=ACCESS_REFUSED - queue 'Test queue3' in vhost 'my_vhost' in exclusive use,class-id=60,method-id=20)

甚至尝试在Rabbitlistener上设置独家设置

@RabbitListener(id = "Test container1",containerFactory = "consumerBatchContainerFactory",exclusive = true)

但有相同的错误

代码参考:https://github.com/rajat-g/SpringBootRabbitMQConsumer

解决方法

有一个bug in the listener container when you call addQueueNames,我们最终要回收消费者多次,这是一场竞赛-我们不等待消费者被取消。

改为使用listener.addQueues(new Queue(queueName));

此外,我将setExclusive移到了容器工厂bean ...

factory.setContainerCustomizer(container -> container.setExclusive(true));

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?