当我停止我的程序时,如何防止我的 Java 应用程序丢弃未确认的 RabbitMQ 消息?

如何解决当我停止我的程序时,如何防止我的 Java 应用程序丢弃未确认的 RabbitMQ 消息?

我写了一个使用RabbitMQ的java项目。我正在调试模式下启动我的 Spring 项目,并在 System.out.println(input);显示 ListenerExample.java 的行上设置一个断点。在应用程序的另一部分中,我使用以下代码行向此队列发送消息:

rabbitTemplate.convertAndSend(ManagerExample.getTopicExchangeName(),ManagerExample.getRoutingKey(),"test");

它在我的 IntelliJ 窗口中遇到断点,使用 RabbitMQ 管理器我可以看到 queue1 有一条未确认的消息,没有其他消息。如果我停止程序,那么 queue1 不包含任何消息并且我会丢失消息(它不会在队列中进入就绪状态)。如果我引入死信队列或更改 AckNowledgementMode 那么我仍然会丢失消息。我如何才能以某种方式在另一个队列或 queue1 中保留此消息?

ListenerExample.java:

package rabbitExample;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;

import java.io.IOException;

@Component
public class ListenerExample {
  @RabbitListener(queues = ManagerExample.queueName,containerFactory = "prefetchOneRabbitListenerContainerFactory")
  public static void listen(final String input,final Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) final long tag) throws IOException {
    System.out.println(input);
    channel.basicAck(tag,false);
  }
}

ManagerExample.java:

package rabbitExample;

import lombok.Getter;
import org.springframework.amqp.core.AckNowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ManagerExample {
  @Getter
  static final String topicExchangeName = "exchange";
  @Getter
  static final String queueName = "queue1";
  @Getter
  static final String deadLetterQueueName = "dropped";
  @Getter
  static final String routingKey = "key";

  @Bean
  Queue queue() {
    return QueueBuilder.durable(queueName)
      .build();
  }

  @Bean
  static TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
  }

  @Bean
  Binding binding(final Queue queue,final TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingKey);
  }

  @Bean
  public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneRabbitListenerContainerFactory(final ConnectionFactory rabbitConnectionFactory) {
    final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory);
    factory.setPrefetchCount(1);
    factory.setAckNowledgeMode(AckNowledgeMode.MANUAL);
    return factory;
  }
}

解决方法

我正在尝试运行您的示例。一切正常。一条未确认的消息毫无问题地返回到队列中。 您可以尝试对您的“聆听”方法稍作改动,例如:

  @RabbitListener(queues = RabbitConfig.QUEUE,containerFactory = "prefetchOneRabbitListenerContainerFactory")
  public static void listen(final String input,final Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) final long tag) throws IOException {
        System.out.println(input);
        System.exit(0);
        channel.basicAck(tag,false);
  }

你会看到,一切正常。我认为,您的情况的原始问题是“停止”应用程序的方式。我假设您在 Intellij Idea 中单击了“停止”按钮。 但由于未知原因,Intellij Idea 不会立即停止进程和 channel.basicAck(tag,false); 命令 在您按下“停止”按钮后运行。

Example

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?