如何解决发生错误时,RabbitTransactionManager 不会在 ChainedTransactionManager 回滚
我正在尝试为 Rabbit 和 Kafka 使用一个事务管理器 (ChainedTransactionManager),将 RabbitTransactionManager 和 KafkaTransactionManager 链接起来。我们打算实现尽力而为的 1 阶段提交。
为了测试它,事务方法在 2 个操作(发送消息到 Rabbit 交换和 Kafka 中的发布和事件)后抛出异常。运行测试时,日志提示已启动回滚,但消息最终还是在 Rabbit 中。
- 注意事项:
- 我们使用 QPid 模拟内存中的 RabbitMQ 进行测试(版本 7.1.12)
- 我们使用内存中的 Kafka 进行测试 (spring-kafka-test)
- 其他相关框架/库:spring-cloud-stream
出现问题的方法如下:
@Transactional
public void processMessageAndEvent() {
Message<String> message = MessageBuilder
.withPayload("Message to RabbitMQ")
.build();
outputToRabbitMQExchange.output().send(message);
outputToKafkaTopic.output().send(
withPayload("Message to Kafka")
.setHeader(KafkaHeaders.MESSAGE_KEY,"Kafka message key")
.build()
);
throw new RuntimeException("We want the prevIoUs changes to rollback");
}
这里是主要的 Spring-boot 应用配置:
@SpringBootApplication
**@EnableTransactionManagement**
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class,args);
}
}
这是 TransactionManager 配置:
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm,BinderFactory binders) {
ProducerFactory<byte[],byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTransactionManager<byte[],byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(ktm,rtm);
}
最后是application.yml文件中的相关配置:
spring:
application:
name: my-application
main:
allow-bean-deFinition-overriding: true
cloud:
stream:
bindings:
source_outputToRabbitMQExchange:
content-type: application/json
destination: outputToRabbitMQExchange
group: ${spring.application.name}
sink_outputToKafkaTopic:
content-type: application/json
destination: outputToKafkaTopic
binder: kafka
rabbit:
bindings:
output_outputToRabbitMQExchange:
producer:
transacted: true
routing-key-expression: headers.myKey
kafka:
bindings:
sink_outputToKafkaTopic:
producer:
transacted: true
binder:
brokers: ${...kafka.hostname}
transaction:
transaction-id-prefix: ${CF_INSTANCE_INDEX}.${spring.application.name}.T
default-binder: rabbit
kafka:
producer:
properties:
max.block.ms: 3000
transaction.timeout.ms: 5000
enable.idempotence: true
retries: 1
acks: all
bootstrap-servers: ${...kafka.hostname}
当我们执行该方法时,尽管日志说要回滚事务,但我们可以看到消息仍在 Rabbit 中。
有什么我们可能遗漏或误解的吗?
解决方法
@EnableBinding
已被弃用,以支持更新的函数式编程模型。
也就是说,我几乎按原样复制了您的代码/配置(transacted
不是 kafka 生产者绑定属性),它对我来说很好用(引导 2.4.5,云 2020.0.2)。 ..
@SpringBootApplication
@EnableTransactionManagement
@EnableBinding(Bindings.class)
public class So67297869Application {
public static void main(String[] args) {
SpringApplication.run(So67297869Application.class,args);
}
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
@Bean(name = "transactionManager")
@Primary
public ChainedTransactionManager chainedTransactionManager(RabbitTransactionManager rtm,BinderFactory binders) {
ProducerFactory<byte[],byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTransactionManager<byte[],byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(ktm,rtm);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.send("test");
};
}
}
interface Bindings {
@Output("source_outputToRabbitMQExchange")
MessageChannel rabbitOut();
@Output("sink_outputToKafkaTopic")
MessageChannel kafkaOut();
}
@Component
class Foo {
@Autowired
Bindings bindings;
@Transactional
public void send(String in) {
bindings.rabbitOut().send(MessageBuilder.withPayload(in)
.setHeader("myKey","test")
.build());
bindings.kafkaOut().send(MessageBuilder.withPayload(in)
.setHeader(KafkaHeaders.MESSAGE_KEY,"test".getBytes())
.build());
throw new RuntimeException("fail");
}
}
spring:
application:
name: my-application
main:
allow-bean-definition-overriding: true
cloud:
stream:
bindings:
source_outputToRabbitMQExchange:
content-type: application/json
destination: outputToRabbitMQExchange
group: ${spring.application.name}
sink_outputToKafkaTopic:
content-type: application/json
destination: outputToKafkaTopic
binder: kafka
rabbit:
bindings:
source_outputToRabbitMQExchange:
producer:
transacted: true
routing-key-expression: headers.myKey
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: foo.${spring.application.name}.T
default-binder: rabbit
kafka:
producer:
properties:
max.block.ms: 3000
transaction.timeout.ms: 5000
enable.idempotence: true
retries: 1
acks: all
bootstrap-servers: localhost:9092
logging:
level:
org.springframework.transaction: debug
org.springframework.kafka: debug
org.springframework.amqp.rabbit: debug
2021-04-28 09:35:32.488 DEBUG 53253 --- [ main] o.s.a.r.t.RabbitTransactionManager : Initiating transaction rollback
2021-04-28 09:35:32.489 DEBUG 53253 --- [ main] o.s.a.r.connection.RabbitResourceHolder : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,2),conn: Proxy@3c770db4 Shared Rabbit Connection: SimpleConnection@1f736d00 [delegate=amqp://guest@127.0.0.1:5672/,localPort= 63439]
2021-04-28 09:35:32.490 DEBUG 53253 --- [ main] o.s.a.r.t.RabbitTransactionManager : Resuming suspended transaction after completion of inner transaction
2021-04-28 09:35:32.490 DEBUG 53253 --- [ main] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
2021-04-28 09:35:32.490 DEBUG 53253 --- [ main] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@38e83838] abortTransaction()
并且队列中没有消息表明我与 RK #
绑定到交换。
您使用的是什么版本?
编辑
这是删除弃用后的等效应用程序,使用功能模型和 StreamBridge
(相同的 yaml):
@SpringBootApplication
@EnableTransactionManagement
public class So67297869Application {
public static void main(String[] args) {
SpringApplication.run(So67297869Application.class,rtm);
}
@Bean
public ApplicationRunner runner(Foo foo) {
return args -> {
foo.send("test");
};
}
}
@Component
class Foo {
@Autowired
StreamBridge bridge;
@Transactional
public void send(String in) {
bridge.send("source_outputToRabbitMQExchange",MessageBuilder.withPayload(in)
.setHeader("myKey","test")
.build());
bridge.send("sink_outputToKafkaTopic",MessageBuilder.withPayload(in)
.setHeader(KafkaHeaders.MESSAGE_KEY,"test".getBytes())
.build());
throw new RuntimeException("fail");
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。