微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如果我手动配置 DirectMessageListenerContainer,则忽略 spring.rabbitmq.listener.simple.retry.enabled=true

如何解决如果我手动配置 DirectMessageListenerContainer,则忽略 spring.rabbitmq.listener.simple.retry.enabled=true

我正在尝试使用属性在rabbitmq 上激活deadletterqueue

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=10

当我使用注解时它工作正常

public class SimpleConsumer {

    @RabbitListener(queues = "messages.queue")
    public void handleMessage(String message){
        throw new RuntimeException(); 
    }
}

但是如果我手动配置 MessageListenerContainer,它就不起作用。

低于我的配置:

@Bean
SimpleMessageListenerContainer directMessageListenerContainer(
        ConnectionFactory connectionFactory,Queue simpleQueue,MessageConverter jsonMessageConverter,SimpleConsumer simpleConsumer)
{

    return new SimpleMessageListenerContainer(connectionFactory){{
        setQueues(simpleQueue);
        setMessageListener(new MessageListenerAdapter(simpleConsumer,jsonMessageConverter));
       // setDefaultRequeueRejected(false);
    }};

}

如果我将 setDefaultRequeueRejected 设置为 true,它会尝试解决消费者无限时间(如果抛出异常)。

如果我将 setDefaultRequeueRejected 设置为 false,它会尝试解析消费者一次,然后使用 deadLetterConsumer。

@RabbitListener(queues = "messages.queue") 在使用 spring.rabbitmq.listener 配置的幕后做了什么?

在我的github代码下面

https://github.com/crakdelpol/dead-letter-spike.git

查看分支“按配置重试”

解决方法

它在容器的通知链中添加了一个重试拦截器。见the documentation

Spring Retry 提供了几个 AOP 拦截器和很大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。 Spring AMQP 还提供了一些方便的工厂 bean,用于以方便的形式为 AMQP 用例创建 Spring Retry 拦截器,具有强类型的回调接口,您可以使用它们来实现自定义恢复逻辑。有关详细信息,请参阅 StatefulRetryOperationsInterceptorStatelessRetryOperationsInterceptor 的 Javadoc 和属性。

...

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateful()
            .maxAttempts(5)
            .backOffOptions(1000,2.0,10000) // initialInterval,multiplier,maxInterval
            .build();
}

然后将拦截器添加到容器 adviceChain 中。

编辑

请参阅我向您指出的文档;您需要将恢复器添加到拦截器中:

当所有重试都用完时,将调用 MessageRecover。 RejectAndDontRequeueRecoverer 正是这样做的。默认的 MessageRecoverer 使用错误的消息并发出警告消息。

这是一个完整的例子:

@SpringBootApplication
public class So67433138Application {

    public static void main(String[] args) {
        SpringApplication.run(So67433138Application.class,args);
    }

    @Bean
    Queue queue() {
        return QueueBuilder.durable("so67433138")
                .deadLetterExchange("")
                .deadLetterRoutingKey("so67433138.dlq")
                .build();
    }

    @Bean
    Queue dlq() {
        return new Queue("so67433138.dlq");
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory cf) {
        SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer(cf);
        smlc.setQueueNames("so67433138");
        smlc.setAdviceChain(RetryInterceptorBuilder.stateless()
                .maxAttempts(5)
                .backOffOptions(1_000,10_000)
                .recoverer(new RejectAndDontRequeueRecoverer())
                .build());
        smlc.setMessageListener(msg -> {
            System.out.println(new String(msg.getBody()));
            throw new RuntimeException("test");
        });
        return smlc;
    }

    @RabbitListener(queues = "so67433138.dlq")
    void dlq(String in) {
        System.out.println("From DLQ: " + in);
    }

}
test
test
test
test
test
2021-05-12 11:19:42.034 WARN 70667 ---[    container-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message ...
...
From DLQ: test

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?