如何解决当我停止我的程序时,如何防止我的 Java 应用程序丢弃未确认的 RabbitMQ 消息?
我写了一个使用RabbitMQ的java项目。我正在调试模式下启动我的 Spring 项目,并在 System.out.println(input);
中显示 ListenerExample.java
的行上设置一个断点。在应用程序的另一部分中,我使用以下代码行向此队列发送消息:
rabbitTemplate.convertAndSend(ManagerExample.getTopicExchangeName(),ManagerExample.getRoutingKey(),"test");
它在我的 IntelliJ 窗口中遇到断点,使用 RabbitMQ 管理器我可以看到 queue1
有一条未确认的消息,没有其他消息。如果我停止程序,那么 queue1
不包含任何消息并且我会丢失消息(它不会在队列中进入就绪状态)。如果我引入死信队列或更改 AckNowledgementMode
那么我仍然会丢失消息。我如何才能以某种方式在另一个队列或 queue1
中保留此消息?
ListenerExample.java:
package rabbitExample;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import java.io.IOException;
@Component
public class ListenerExample {
@RabbitListener(queues = ManagerExample.queueName,containerFactory = "prefetchOneRabbitListenerContainerFactory")
public static void listen(final String input,final Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) final long tag) throws IOException {
System.out.println(input);
channel.basicAck(tag,false);
}
}
ManagerExample.java:
package rabbitExample;
import lombok.Getter;
import org.springframework.amqp.core.AckNowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ManagerExample {
@Getter
static final String topicExchangeName = "exchange";
@Getter
static final String queueName = "queue1";
@Getter
static final String deadLetterQueueName = "dropped";
@Getter
static final String routingKey = "key";
@Bean
Queue queue() {
return QueueBuilder.durable(queueName)
.build();
}
@Bean
static TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(final Queue queue,final TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneRabbitListenerContainerFactory(final ConnectionFactory rabbitConnectionFactory) {
final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory);
factory.setPrefetchCount(1);
factory.setAckNowledgeMode(AckNowledgeMode.MANUAL);
return factory;
}
}
解决方法
我正在尝试运行您的示例。一切正常。一条未确认的消息毫无问题地返回到队列中。 您可以尝试对您的“聆听”方法稍作改动,例如:
@RabbitListener(queues = RabbitConfig.QUEUE,containerFactory = "prefetchOneRabbitListenerContainerFactory")
public static void listen(final String input,final Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) final long tag) throws IOException {
System.out.println(input);
System.exit(0);
channel.basicAck(tag,false);
}
你会看到,一切正常。我认为,您的情况的原始问题是“停止”应用程序的方式。我假设您在 Intellij Idea 中单击了“停止”按钮。
但由于未知原因,Intellij Idea 不会立即停止进程和 channel.basicAck(tag,false);
命令
在您按下“停止”按钮后运行。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。