如何解决事务同步:如何使用 Reactor Kafka 和 R2DBC 创建 ChainedKafkaTransactionManager bean
我的 Spring Boot (WebFlux/R2DBC/Reactor Kafka) 应用程序中有以下使用者
@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
kafkaReceiver
.receive()
.doOnNext { record ->
val myEvent = record.value()
myService.deleteSomethingFromDbById(myEvent.myId)
.thenEmpty {
record.receiverOffset().acknowledge()
}.subscribe()
}
.subscribe()
}
我想为 Kafka 和 DB 事务添加事务同步。阅读文档和一些 stakoverflow 问题后
- Transaction Synchronization in spring boot with Database+ kafka example
- Spring Kafka ChainedKafkaTransactionManager doesn't synchronize with JPA Spring-data transaction
- Transaction Synchronization in Spring Kafka
- Spring @Transactional with a transaction across multiple data sources
似乎 ChainedKafkaTransactionManager
是要走的路。
但是以下代码将不起作用,因为 ChainedKafkaTransactionManager 需要类型为 PlatformTransactionManager
的事务管理器。所以参数 r2dbcTransactionManager
不被接受。
@Bean(name = ["chainedTransactionManager"])
fun chainedTransactionManager(
r2dbcTransactionManager: R2dbcTransactionManager,kafkaTransactionManager: KafkaTransactionManager<*,*>
) = ChainedKafkaTransactionManager(kafkaTransactionManager,r2dbcTransactionManager)
还有其他方法可以实现吗?
解决方法
为 Kafka 消费者链接交易毫无意义。仅适用于发布者,即外发消息。
但是您应该确保不要多次处理相同的消息。
@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
kafkaReceiver.receive()
// Make sure to have unique index on (topic,partition,offset)
// so you receive a ConstraintViolationException
.flatMap { r ->
val msg = ConsumedMessage(r.topic(),r.partition(),r.offset())
consumedMessagesRepository.save(msg).thenReturn(r)
}
.onErrorContinue {ex,r -> log.warn("Duplicate msg") }
.flatMap { r ->
myService.deleteSomethingFromDbById(r.value().myId)
.thenReturn(r)
}
.flatMap { r ->
r.receiverOffset().commit()
}
.subscribe()
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。