微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

发生错误时,RabbitTransactionManager 不会在 ChainedTransactionManager 回滚

如何解决发生错误时,RabbitTransactionManager 不会在 ChainedTransactionManager 回滚

我正在尝试为 Rabbit 和 Kafka 使用一个事务管理器 (ChainedTransactionManager),将 RabbitTransactionManager 和 KafkaTransactionManager 链接起来。我们打算实现尽力而为的 1 阶段提交。

为了测试它,事务方法在 2 个操作(发送消息到 Rabbit 交换和 Kafka 中的发布和事件)后抛出异常。运行测试时,日志提示已启动回滚,但消息最终还是在 Rabbit 中。

  • 注意事项:
  • 我们使用 QPid 模拟内存中的 RabbitMQ 进行测试(版本 7.1.12)
  • 我们使用内存中的 Kafka 进行测试 (spring-kafka-test)
  • 其他相关框架/库:spring-cloud-stream

出现问题的方法如下:

    @Transactional
public void processMessageAndEvent() {
    Message<String> message = MessageBuilder
            .withPayload("Message to RabbitMQ")
            .build();
    outputToRabbitMQExchange.output().send(message);

    outputToKafkaTopic.output().send(
            withPayload("Message to Kafka")
                    .setHeader(KafkaHeaders.MESSAGE_KEY,"Kafka message key")
                    .build()
    );

    throw new RuntimeException("We want the prevIoUs changes to rollback");
}

这里是主要的 Spring-boot 应用配置:

    @SpringBootApplication
**@EnableTransactionManagement**
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class,args);
    }
}

这是 TransactionManager 配置:

    @Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
    return new RabbitTransactionManager(cf);
}

@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm,BinderFactory binders) {
    ProducerFactory<byte[],byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",MessageChannel.class))
            .getTransactionalProducerFactory();
    KafkaTransactionManager<byte[],byte[]> ktm = new KafkaTransactionManager<>(pf);
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return new ChainedKafkaTransactionManager<>(ktm,rtm);
}

最后是application.yml文件中的相关配置:

spring:
  application:
    name: my-application
  main:
    allow-bean-deFinition-overriding: true
  cloud:
    stream:
      bindings:
        source_outputToRabbitMQExchange:
          content-type: application/json
          destination: outputToRabbitMQExchange
          group: ${spring.application.name}
        sink_outputToKafkaTopic:
          content-type: application/json
          destination: outputToKafkaTopic
          binder: kafka
      rabbit:
        bindings:
          output_outputToRabbitMQExchange:
            producer:
              transacted: true
              routing-key-expression: headers.myKey
      kafka:
        bindings:
          sink_outputToKafkaTopic:
            producer:
              transacted: true
        binder:
          brokers: ${...kafka.hostname}
          transaction:
            transaction-id-prefix: ${CF_INSTANCE_INDEX}.${spring.application.name}.T
      default-binder: rabbit

  kafka:
    producer:
      properties:
        max.block.ms: 3000
        transaction.timeout.ms: 5000
        enable.idempotence: true
        retries: 1
        acks: all
    bootstrap-servers: ${...kafka.hostname}

当我们执行该方法时,尽管日志说要回滚事务,但我们可以看到消息仍在 Rabbit 中。

有什么我们可能遗漏或误解的吗?

解决方法

@EnableBinding 已被弃用,以支持更新的函数式编程模型。

也就是说,我几乎按原样复制了您的代码/配置(transacted 不是 kafka 生产者绑定属性),它对我来说很好用(引导 2.4.5,云 2020.0.2)。 ..

@SpringBootApplication
@EnableTransactionManagement
@EnableBinding(Bindings.class)
public class So67297869Application {

    public static void main(String[] args) {
        SpringApplication.run(So67297869Application.class,args);
    }

    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

    @Bean(name = "transactionManager")
    @Primary
    public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm,BinderFactory binders) {
        ProducerFactory<byte[],byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",MessageChannel.class))
                        .getTransactionalProducerFactory();
        KafkaTransactionManager<byte[],byte[]> ktm = new KafkaTransactionManager<>(pf);
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return new ChainedKafkaTransactionManager<>(ktm,rtm);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.send("test");
        };
    }

}

interface Bindings {

    @Output("source_outputToRabbitMQExchange")
    MessageChannel rabbitOut();

    @Output("sink_outputToKafkaTopic")
    MessageChannel kafkaOut();

}

@Component
class Foo {

    @Autowired
    Bindings bindings;

    @Transactional
    public void send(String in) {
        bindings.rabbitOut().send(MessageBuilder.withPayload(in)
                .setHeader("myKey","test")
                .build());
        bindings.kafkaOut().send(MessageBuilder.withPayload(in)
                .setHeader(KafkaHeaders.MESSAGE_KEY,"test".getBytes())
                .build());
        throw new RuntimeException("fail");
    }

}
spring:
  application:
    name: my-application
  main:
    allow-bean-definition-overriding: true
  cloud:
    stream:
      bindings:
        source_outputToRabbitMQExchange:
          content-type: application/json
          destination: outputToRabbitMQExchange
          group: ${spring.application.name}
        sink_outputToKafkaTopic:
          content-type: application/json
          destination: outputToKafkaTopic
          binder: kafka
      rabbit:
        bindings:
          source_outputToRabbitMQExchange:
            producer:
              transacted: true
              routing-key-expression: headers.myKey
      kafka:
        binder:
          brokers: localhost:9092
          transaction:
            transaction-id-prefix: foo.${spring.application.name}.T
      default-binder: rabbit

  kafka:
    producer:
      properties:
        max.block.ms: 3000
        transaction.timeout.ms: 5000
        enable.idempotence: true
        retries: 1
        acks: all
    bootstrap-servers: localhost:9092

logging:
  level:
    org.springframework.transaction: debug
    org.springframework.kafka: debug
    org.springframework.amqp.rabbit: debug
2021-04-28 09:35:32.488 DEBUG 53253 --- [           main] o.s.a.r.t.RabbitTransactionManager       : Initiating transaction rollback
2021-04-28 09:35:32.489 DEBUG 53253 --- [           main] o.s.a.r.connection.RabbitResourceHolder  : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2),conn: Proxy@3c770db4 Shared Rabbit Connection: SimpleConnection@1f736d00 [delegate=amqp://guest@127.0.0.1:5672/,localPort= 63439]
2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.a.r.t.RabbitTransactionManager       : Resuming suspended transaction after completion of inner transaction
2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.k.t.KafkaTransactionManager          : Initiating transaction rollback
2021-04-28 09:35:32.490 DEBUG 53253 --- [           main] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38e83838] abortTransaction()

并且队列中没有消息表明我与 RK # 绑定到交换。

您使用的是什么版本?

编辑

这是删除弃用后的等效应用程序,使用功能模型和 StreamBridge(相同的 yaml):

@SpringBootApplication
@EnableTransactionManagement
public class So67297869Application {

    public static void main(String[] args) {
        SpringApplication.run(So67297869Application.class,rtm);
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> {
            foo.send("test");
        };
    }

}

@Component
class Foo {

    @Autowired
    StreamBridge bridge;

    @Transactional
    public void send(String in) {
        bridge.send("source_outputToRabbitMQExchange",MessageBuilder.withPayload(in)
                .setHeader("myKey","test")
                .build());
        bridge.send("sink_outputToKafkaTopic",MessageBuilder.withPayload(in)
                .setHeader(KafkaHeaders.MESSAGE_KEY,"test".getBytes())
                .build());
        throw new RuntimeException("fail");
    }

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?