如何解决删除 RabbitMQ 消费者并在浏览器的 RabbitMQ 控制台中查看
免责声明:RabbitMq 和/或 Spring Integration 和/或 Spring Cloud Stream 中的 Noobie。
我有以下课程:
@Component
public class RabbitMQChannelBindingFactory {
...
private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;
private org.springframework.cloud.stream.config.BindingServiceProperties bindingServiceProperties;
private org.springframework.cloud.stream.binding.BindingService bindingService;
private org.springframework.beans.factory.config.ConfigurableListablebeanfactory beanfactory;
private org.springframework.cloud.stream.binding.SubscribableChannelBindingTargetFactory bindingTargetFactory;
private org.springframework.amqp.rabbit.connection.ConnectionFactory rabbitConnectionFactory;
...
}
需要什么?
我有一个机制可以创建一个 Exchange+Queue+Consumer,我有一个机制可以破坏这些。 交换和队列的自动删除设置为 true。
有什么问题?
破坏所有这 3 个元素的继承机制不起作用。
它只删除 Exchange。队列不会被删除,因为它仍然有一个消费者,我也可以在我的应用程序中看到它。
尝试了什么?
我尝试使用 JVisualVM 获取客户标签的 String 实例,然后我沿着层次结构移除消费者。
我已经更改了应用程序中的 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer,以便它首先由类加载器加载。
我在里面添加了这样的东西,以便跟踪在我的应用程序中创建的所有消费者:
public class BlockingQueueConsumer {
...
public static List<BlockingQueueConsumer> all = new ArrayList<>();
public BlockingQueueConsumer(...) {
...
all.add(this);
...
}
...
}
完成上一步后,我在 RabbitMQChannelBindingFactory 中添加了另一个方法 为所有消费者调用取消方法的类,如下所示:
class RabbitMQChannelBindingFactory {
public void disconnect(...) {
BlockingQueueConsumer lastBlockingQueueConsumer =
BlockingQueueConsumer.all.get(BlockingQueueConsumer.all.size() - 1);
lastBlockingQueueConsumer.getConsumerTags()
.forEach(consumerTag -> basicCancel(lastBlockingQueueConsumer,consumerTag));
}
}
此时在加载了 RabbitMQ Console 的浏览器上,我们可以看到 Queue 被删除(除了 Exchange 和 Consumer)。
有什么问题?
我找不到将 BlockingQueueConsumer 连接到自动装配属性的方法。
例如我试过
public void deleterabbitMQConsumer() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.execute(channel -> {
if (channel instanceof ChannelN) {
ChannelN channelN = (ChannelN) channel;
return true;
}
return false;
});
}
但似乎ChannelN内部没有消费者。
请您给我一个方向,首先需要了解什么?
或者有哪些资源可以帮助我?
我已经尝试了 https://stackoverflow.com/a/27633771/13622666 解决方案。
解决方案
@Component
public class RabbitMQChannelBindingFactory {
...
private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;
private void connectAnddisconnectConsumer(...) {
...
Binding<MessageChannel> messageChannelBinding =
binder.bindConsumer(exchangeName,"",channel,consumerProperties);
... // receive messages
messageChannelBinding.stop();
...
}
}
还有堆栈跟踪:
messageChannelBinding.stop();
DefaultBinding#stop
AbstractEndpoint#stop()
AmqpInboundChannelAdapter#doStop
AbstractMessageListenerContainer#stop()
AbstractMessageListenerContainer#doStop
AbstractMessageListenerContainer#shutdown
SimpleMessageListenerContainer#doShutdown
BlockingQueueConsumer#basicCancel(boolean)
解决方法
投票结束。你不能像那样滥用 Java 类系统,而是更专注于学习你使用的库。可能有人已经询问过您正在寻找的解决方案。正如 Gary 所说:Spring Cloud Stream 绑定上只有 if else
,它将停止 for
,反过来,它将取消它在队列中的所有消费者。您的自动删除队列将从 RabbitMQ 中删除。没有理由破坏交易所。虽然您可以通过 stop()
做到这一点。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。