微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

带有 RabbitMQ 绑定器的 Spring Cloud Stream 和带有数据库操作的事务性消费者/生产者

如何解决带有 RabbitMQ 绑定器的 Spring Cloud Stream 和带有数据库操作的事务性消费者/生产者

我有一个 Spring Cloud Stream 应用程序,它使用 RabbitMQ 接收来自 Rabbit Binder 的消息、更新我的数据库并发送一条或多条消息。我的申请可以概括为demo app

问题是 @Transactional 似乎不起作用(或者至少这是我的印象),因为如果出现异常,数据库会回滚,但即使消费者/生产者认配置为发送消息交易。

鉴于我想要实现的是发生异常时我希望消费的消息在重试后进入 DLQ 数据库回滚并且不发送消息。

我怎样才能做到这一点?

这是我发送消息 my-input exchange 时演示应用程序的输出

2021-01-19 14:31:20.804 ERROR 59593 --- [nput.my-group-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking MyListener#process[1 args]; nested exception is java.lang.RuntimeException: MyError,FailedMessage=Genericmessage [payload=byte[4],headers={amqp_receivedDeliveryMode=NON_PERSISTENT,amqp_receivedRoutingKey=#,amqp_receivedExchange=my-input,amqp_deliveryTag=2,deliveryAttempt=3,amqp_consumerQueue=my-input.my-group,amqp_redelivered=false,id=006f733f-5eab-9119-347a-625570383c47,amqp_consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ,sourceData=(Body:'[B@177259f3(byte[4])' MessageProperties [headers={},contentLength=0,receivedDeliveryMode=NON_PERSISTENT,redelivered=false,receivedExchange=my-input,receivedRoutingKey=#,deliveryTag=2,consumerTag=amq.ctag-CnT_p-IXTJqIBNNG4sGPoQ,consumerQueue=my-input.my-group]),contentType=application/json,timestamp=1611063077789}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.Abstractdispatcher.tryOptimizeddispatch(Abstractdispatcher.java:115)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dodispatch(Unicastingdispatcher.java:133)
    at org.springframework.integration.dispatcher.Unicastingdispatcher.dispatch(Unicastingdispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericmessagingTemplate.doSend(GenericmessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:66)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:308)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:304)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: MyError
    at com.example.demo.MyListener.process(DemoApplication.kt:46)
    at com.example.demo.MyListener$$FastClassBySpringcglib$$4381219a.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.invokeJoinpoint(cglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.proceed(cglibAopProxy.java:750)
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.cglibAopProxy$cglibMethodInvocation.proceed(cglibAopProxy.java:750)
    at org.springframework.aop.framework.cglibAopProxy$DynamicAdvisedInterceptor.intercept(cglibAopProxy.java:692)
    at com.example.demo.MyListener$$EnhancerBySpringcglib$$f4ed3689.process(<generated>)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.messaging.handler.invocation.invocableHandlerMethod.doInvoke(invocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.invocableHandlerMethod.invoke(invocableHandlerMethod.java:120)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 29 more

message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto
message should not be received here hello world
employee name still toto == toto

解决方法

由于您将失败的消息发布到 DLQ,因此从 Rabbit 的角度来看,事务成功并且原始消息得到确认并从队列中删除,并且提交了 Rabbit 事务。

你不能用 republishToDlq 做你想做的事。

如果您使用普通的 DLQ 机制(republishToDlq=false,代理将原始消息发送到 DLQ)而不是使用额外的元数据重新发布,它将起作用。

如果您想使用元数据重新发布,您可以使用非事务性 RabbitTemplate 手动发布到 DLQ(因此 DLQ 发布不会随着其他发布而回滚)。

编辑

这是一个如何做你需要做的事情的例子。

注意事项:

  1. 我们必须添加一个错误处理程序来重新抛出异常。
  2. 我们必须将重试移至侦听器容器而不是绑定器;否则,重试将在事务内发生,如果重试成功,多条消息将被存放在输出队列中。
  3. 要使有状态重试工作,我们必须能够唯一标识每条消息;最简单的解决方案是让发件人设置唯一的 message_id 属性(例如 UUID)。
@SpringBootApplication
@EnableBinding(Processor.class)
public class So65792643Application {

    public static void main(String[] args) {
        SpringApplication.run(So65792643Application.class,args);
    }

    @Autowired
    Processor processor;

    @StreamListener(Processor.INPUT)
    public void in(Message<String> in) {
        System.out.println(in.getPayload());
        processor.output().send(new GenericMessage<>(in.getPayload().toUpperCase()));
        int attempt = RetrySynchronizationManager.getContext().getRetryCount();
        if (in.getPayload().equals("okAfterRetry") && attempt == 1) {
            System.out.println("success");
        }
        else {
            throw new RuntimeException();
        }
    }

    @Bean
    RepublishMessageRecoverer repub(RabbitTemplate template) {
        RepublishMessageRecoverer repub =
                new RepublishMessageRecoverer(template,"DLX","rk");
        return repub;
    }

    @Bean
    Queue dlq() {
        return new Queue("my-output.dlq");
    }

    @Bean
    DirectExchange dlx() {
        return new DirectExchange("DLX");
    }

    @Bean
    Binding dlqBinding() {
        return BindingBuilder.bind(dlq()).to(dlx()).with("rk");
    }

    @ServiceActivator(inputChannel = "my-input.group1.errors")
    void errorHandler(ErrorMessage message) {
        MessagingException mex = (MessagingException) message.getPayload();
        throw mex;
    }

    @RabbitListener(queues = "my-output.dlq")
    void dlqListen(Message<String> in) {
        System.out.println("DLQ:" + in);
    }

    @RabbitListener(queues = "my-output.group2")
    void outListen(String in) {
        if (in.equals("OKAFTERRETRY")) {
            System.out.println(in);
        }
        else {
            System.out.println("Should not see this:" + in);
        }
    }

    /*
     * We must move retries from the binder to stateful retries in the container so that
     * each retry is rolled back,to avoid multiple publishes to output.
     * See max-attempts: 1 in the yaml.
     * In order for stateful retry to work,inbound messages must have a unique message_id
     * property.
     */
    @Bean
    ListenerContainerCustomizer<AbstractMessageListenerContainer> customizer(RepublishMessageRecoverer repub) {
        return (container,destinationName,group) -> {
            if ("group1".equals(group)) {
                container.setAdviceChain(RetryInterceptorBuilder.stateful()
                        .backOffOptions(1000,2.0,10000)
                        .maxAttempts(2)
                        .recoverer(recoverer(repub))
                        .keyGenerator(args -> {
                            // or generate a unique key some other way
                            return ((org.springframework.amqp.core.Message) args[1]).getMessageProperties()
                                    .getMessageId();
                        })
                        .build());
            }
        };
    }

    private MethodInvocationRecoverer<?> recoverer(RepublishMessageRecoverer repub) {
        return (args,cause) -> {
            repub.recover(((ListenerExecutionFailedException) cause).getFailedMessage(),cause);
            throw new AmqpRejectAndDontRequeueException(cause);
        };
    }

}
spring:
  cloud:
    stream:
      rabbit:
        default:
          producer:
            transacted: true
          consumer:
            transacted: true
            requeue-rejected: true
      bindings:
        input:
          destination: my-input
          group: group1
          consumer:
            max-attempts: 1
        output:
          destination: my-output
          producer:
            required-groups: group2
okAfterRetry
2021-01-20 12:45:24.385  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
okAfterRetry
success
OKAFTERRETRY

notOkAfterRetry
2021-01-20 12:45:39.336  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
notOkAfterRetry
2021-01-20 12:45:39.339  WARN 77477 --- [-input.group1-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
...
DLQ:GenericMessage [payload=notOkAfterRetry,...,x-exception-message...

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。