微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

春季集成重试策略

如何解决春季集成重试策略

我想用Spring集成创建一个简单的IntegrationFlow,但遇到了困难。

我想创建一个集成流程,该流程从Rabbit Mq中的队列中获取消息并将消息发布到端点Rest。

我要处理的问题是,当请求失败时,它会不断重试,如何在此代码中实施重试策略? 例如,我要3次重试,第一次重试1秒后,第二次重试5秒,第三次重试1分钟。


        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        RestTemplate restTemplate = new RestTemplate();
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(BOUTIQUE_QUEUE_NAME);
        container.setAckNowledgeMode(AckNowledgeMode.AUTO);
        return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
                .handle(msg ->
                {
                    String msgString = new String((byte[]) msg.getPayload(),StandardCharsets.UTF_8);
                    httpentity<String> requestBody = new httpentity<String>(msgString,headers);
                    restTemplate.postForObject(ENDPOINT_LOCAL_URL,requestBody,String.class);
                    System.out.println(msgString);
                   
                })
                .get();
    }

解决方法

将重试拦截器添加到侦听器容器的通知链。参见https://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#retryhttps://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#async-listeners

编辑

@SpringBootApplication
public class So63596805Application {

    private static final Logger LOG = LoggerFactory.getLogger(So63596805Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So63596805Application.class,args);
    }

    @Bean
    IntegrationFlow flow(SimpleRabbitListenerContainerFactory factory,RabbitTemplate template) {
        SimpleMessageListenerContainer container = factory.createListenerContainer();
        container.setQueueNames("foo");
        container.setAdviceChain(RetryInterceptorBuilder.stateless()
                .maxAttempts(5)
                .backOffOptions(1000,2.0,10000)
                .recoverer((msg,cause) -> LOG.info("Retries exhausted for " + msg))
                .build());
        return IntegrationFlows.from(Amqp.inboundAdapter(container))
                .handle(msg -> {
                    LOG.info(msg.getPayload().toString());
                    throw new RuntimeException("test");
                })
                .get();
    }

}

这使用指数退避策略。

如果您使用

.maxAttempts(4)
.backOffOptions(1000,5.0,60000)

1、5和25秒后,您将获得3次重试。

1000,8.0,60000将为您提供1、8和60秒。

如果必须具有规格(1、5、60),则需要自定义BackOffPolicy。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。