微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 Spring Integration for AMQP 中使用 ImmediateRequeueMessageRecoverer?

如何解决在 Spring Integration for AMQP 中使用 ImmediateRequeueMessageRecoverer?

我们注意到,当 Spring Integration Endpoint(来自 RabbitMQ)收到错误消息时,它们不会重试。如果我们的业务代码(即接收消息的“服务方法”)存在问题,从而引发异常,则会按预期进行重试。

这是我们的配置:

var myService = ...
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory,queueName)
                .id(integrationFlowId)
                .autoStartup(autoStartup)
                .configureContainer(c -> c.ackNowledgeMode(MANUAL)
                        .prefetchCount(10)
                        .concurrentConsumers(1)
                        .maxConcurrentConsumers(3))
                .messageConverter(messageConverter))
                .aggregate(...)
                .handle(myService,"myMethod",e -> e.advice(myAdvice()))
                .get();

myAdvice 方法是这样实现的:

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setinitialInterval(200L);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(5000L);

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy((new SimpleRetryPolicy(MAX_VALUE)));
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.registerListener(new RetryListenerSupport() {
    @Override
    public <T,E extends Throwable> void onError(RetryContext ctx,RetryCallback<T,E> callback,Throwable e) {
        log.error("Caught {} due to {} (count = {})",e.getClass().getSimpleName(),e.getMessage(),ctx.getRetryCount(),e);
    }
});
StatelessRetryOperationsInterceptorfactorybean bean = new StatelessRetryOperationsInterceptorfactorybean();
bean.setRetryOperations(retryTemplate);
bean.setMessageRecoverer(new ImmediateRequeueMessageRecoverer());
return bean.getobject();

问题在于,例如,如果我们发布一条消息(例如 { "yo" : "MTV Raps" }),而 org.springframework.amqp.support.converter.MessageConverter 无法将其转换为 DTO,则不会重试该消息:

[my-service-97c696799-6xs26] org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1436)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1720)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1495)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
[my-service-97c696799-6xs26]    at java.base/java.lang.Thread.run(Thread.java:831)
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
[my-service-97c696799-6xs26]    ... 6 common frames omitted
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.support.converter.MessageConversionException: Don't kNow how to convert (Body:'{ "yo" : "MTV Raps" }' MessageProperties [headers={content_type=application/json},contentType=application/json,contentLength=0,receivedDeliveryMode=NON_PERSISTENT,redelivered=false,receivedExchange=,receivedRoutingKey=myservice.routingkey,deliveryTag=1,consumerTag=amq.ctag-9De2w0uuQxnve_9k6HZ7tw,consumerQueue=myservice.myqueue]) to an object because no event type was found
[my-service-97c696799-6xs26]    at com.mycompany.RabbitMQEventMessageConverter.fromMessage(RabbitMQEventMessageConverter.java:47)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.convertPayload(AmqpInboundChannelAdapter.java:361)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createMessageFromAmqp(AmqpInboundChannelAdapter.java:342)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:334)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:299)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
[my-service-97c696799-6xs26]    ... 10 common frames omitted

似乎没有使用 ImmediateRequeueMessageRecoverer 方法中指定的 myAdvice(),而是使用了认的 AmqpRejectAndDontRequeueException。在我看来,原因很可能是 myAdvice() 方法尚未被 Spring 基础架构调用。我已经尝试找到一种方法来切换 configureContainer 中的消息恢复器,但我似乎找不到这样做的方法

有谁知道我如何在 Spring 集成调用“服务方法”之前 重新排队/重试失败的消息?

我们使用的是 Spring Integration 5.4.6 和 Spring Boot 2.4.6。

解决方法

在创建消息之前执行转换。

转换错误通常被认为是致命的 - 重试是没有意义的,因为它会再次失败。

向入站适配器添加一个.errorChannel;其下游流将因转换错误而获得 ErrorMessage

但是,它也会从下游流获取错误消息,因此您必须在那里处理所有错误类型。

编辑

您可以添加错误通道并处理其流上的转换异常。但请记住,消息会一次又一次地重新传递,不会有任何延迟。

@SpringBootApplication
public class So67801807Application {

    public static void main(String[] args) {
        SpringApplication.run(So67801807Application.class,args);
    }

    @Bean
    IntegrationFlow flow(ConnectionFactory cf) {
        return IntegrationFlows.from(Amqp.inboundAdapter(cf,"foo")
                    .messageConverter(new MC())
                    .errorChannel("errors"))
                .handle(...)
                .get();
    }

    @Bean
    IntegrationFlow errorFlow() {
        return IntegrationFlows.from("errors")
                .handle(msg -> {
                    if (((ErrorMessage) msg).getPayload().getCause() instanceof MessageConversionException) {
                        throw new ImmediateRequeueAmqpException("Requeuing due to conversion");
                    }
                    else {
                        // handle some other exception thrown by the downstream flow
                    }
                })
                .get();
    }

}

class MC implements MessageConverter {

    @Override
    public Message toMessage(Object object,MessageProperties messageProperties) throws MessageConversionException {
        return null;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        throw new MessageConversionException("test");
    }

}

或者您可以向容器添加自定义错误处理程序。默认错误处理程序认为转换异常是致命的。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#exception-handling

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。