微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 Spring RabbitMQ 中,我抛出 AmqpRejectAndDontRequeueException 但消息仍然重新排队

如何解决在 Spring RabbitMQ 中,我抛出 AmqpRejectAndDontRequeueException 但消息仍然重新排队

我的服务监听 RabbitMQ 队列。我在消费者端配置重试策略。当我抛出异常时,所有死信消息都会重新排队。但是取决于我的业务逻辑,在抛出 StopRequeueException (除 SmsException 之外的每个异常)后,我想停止对此消息的重试。但消息仍然重新排队。 这是我的配置

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 3s
          max-attempts: 10
          max-interval: 12s
          multiplier: 2
        missing-queues-fatal: false 
if (!checkMobileService.isMobileNumberAdmitted(mobileNumber())) {
    throw new StopRequeueException("SMS_BIMTEK.MOBILE_NUMBER_IS_NOT_ADMITTED");
}

我的错误处理程序:

public class CustomErrorHandler implements ErrorHandler {

    @Override
    public void handleError(Throwable t) {
        if (!(t.getCause() instanceof SmsException)) {
            throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal",t);
        }
    }
}

解决方法

调用错误处理程序在重试范围之外;重试结束后调用。

需要在重试级别分类哪些异常是可以重试的,并在recoverer中进行转换。

这是一个例子:

@SpringBootApplication
public class So67406799Application {

    public static void main(String[] args) {
        SpringApplication.run(So67406799Application.class,args);
    }

    @Bean
    public RabbitRetryTemplateCustomizer customizer(
            @Value("${spring.rabbitmq.listener.simple.retry.max-attempts}") int attempts) {

        return (target,template) -> template.setRetryPolicy(new SimpleRetryPolicy(attempts,Map.of(StopRequeueException.class,false),true,true));
    }

    @Bean
    MessageRecoverer recoverer() {
        return (msg,cause) -> {
            throw new AmqpRejectAndDontRequeueException("Stop requeue after " +
                    RetrySynchronizationManager.getContext().getRetryCount() + " attempts");
        };
    }

    @RabbitListener(queues = "so67406799")
    void listen(String in) {
        System.out.println(in);
        if (in.equals("dontRetry")) {
            throw new StopRequeueException("test");
        }
        throw new RuntimeException("test");
    }

    @Bean
    Queue queue() {
        return new Queue("so67406799");
    }

}

@SuppressWarnings("serial")
class StopRequeueException extends NestedRuntimeException {

    public StopRequeueException(String msg) {
        super(msg);
    }

}

编辑

定制器被Spring Boot调用一次;在设置重试策略和回退策略后调用它。请参阅RetryTemplateFactory

在这种情况下,定制器将重试策略替换为带有异常分类器的新策略(这就是我们需要在此处注入最大尝试次数的原因)。

请参阅 SimpleRetryPolicy 构造函数。

    /**
     * Create a {@link SimpleRetryPolicy} with the specified number of retry attempts. If
     * traverseCauses is true,the exception causes will be traversed until a match is
     * found. The default value indicates whether to retry or not for exceptions (or super
     * classes) are not found in the map.
     * @param maxAttempts the maximum number of attempts
     * @param retryableExceptions the map of exceptions that are retryable based on the
     * map value (true/false).
     * @param traverseCauses true to traverse the exception cause chain until a classified
     * exception is found or the root cause is reached.
     * @param defaultValue the default action.
     */
    public SimpleRetryPolicy(int maxAttempts,Map<Class<? extends Throwable>,Boolean> retryableExceptions,boolean traverseCauses,boolean defaultValue) {

上面配置中的最后一个布尔值 (true) 是默认行为(重试不在地图中的异常),第三个 (true) 告诉策略遵循原因链来查找异常(例如您的 { {1}} 在错误处理程序中)。地图 getCause() 说不要重试这个。

您也可以反过来配置它(地图值中的默认 <Exception,Boolean>false),明确说明您要重试哪些异常,而不对所有其他异常进行重试。

针对所有异常调用 true,无论是对分类异常立即调用,还是在其他异常重试用尽时调用。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?