如何解决如何使用 ConnectionListner 和/或 ChannelListner 记录 RabbitMQ 中消息传递的失败/成功
我正在尝试记录在 RabbitMQ 中发送消息期间发生的任何信息或异常,为此我尝试在现有连接工厂中添加 ConnectionListener。
kRabbitTemplate.getConnectionFactory().addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
System.out.println("Connection Created");
}
@Override
public void onShutDown(ShutdownSignalException signal) {
System.out.println("Connection Shutdown "+signal.getMessage());
}
});
kRabbitTemplate.convertAndSend(exchange,routingkey,empDTO);
为了测试异常场景,我从RabbitMQ控制台解绑甚至删除了队列。但我没有收到任何异常或任何关闭方法调用。
虽然,当我停止 RabbitMQ 服务时,我得到了
Exception in thread "Thread-5" org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
但是这个异常不是来自我添加的监听器。
我想知道
- 为什么我没有从关闭方法中得到任何异常或调用
- 如何使用 ConnectionListner 和/或 ChannelListner 来记录消息传递的失败/成功。
- 我们可以使用 AMQP 附加程序吗?如果可以,我们该怎么做? (任何示例/教程)
- 确保消息发送的其他方法是什么?
注意:我不想使用发布者确认的方法。
解决方法
Connection Refused
不是 ShutdownSignalException
- 从未建立连接,因为服务器/端口上不存在代理。
您不能使用侦听器来确认单个消息的发送或返回;使用发布者确认并返回。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#publishing-is-async
有关如何使用附加程序的信息,请参阅文档。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#logging
编辑
要获得连接失败的通知,您目前需要使用其他技术,具体取决于您是发送还是接收。
下面是一个示例:
@SpringBootApplication
public class So66882099Application {
private static final Logger log = LoggerFactory.getLogger(So66882099Application.class);
public static void main(String[] args) {
SpringApplication.run(So66882099Application.class,args);
}
@RabbitListener(queues = "foo")
void listen(String in) {
}
// consumer side listeners for no connection
@EventListener
void consumerFailed(ListenerContainerConsumerFailedEvent event) {
log.error(event + " via event listener");
if (event.getThrowable() instanceof AmqpConnectException) {
log.error("Broker down?");
}
}
// or
@Bean
ApplicationListener<ListenerContainerConsumerFailedEvent> eventListener() {
return event -> log.error(event + " via application listener");
}
// producer side - use a RetryListener
@Bean
RabbitTemplate template(ConnectionFactory cf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
RetryTemplate retry = new RetryTemplate();
// configure retries here as needed
retry.registerListener(new RetryListener() {
@Override
public <T,E extends Throwable> boolean open(RetryContext context,RetryCallback<T,E> callback) {
return true;
}
@Override
public <T,E extends Throwable> void onError(RetryContext context,E> callback,Throwable throwable) {
log.error("Send failed " + throwable.getMessage());
}
@Override
public <T,E extends Throwable> void close(RetryContext context,Throwable throwable) {
}
});
rabbitTemplate.setRetryTemplate(retry);
return rabbitTemplate;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
try {
template.convertAndSend("foo","bar");
}
catch (Exception e) {
e.printStackTrace();
}
};
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。