如何解决spring-kafka 如何在流 bean 上设置重试
我看到 spring-kafka 支持使用@RetryableTopic 的非阻塞重试。我只看到 @RetryableTopic 正在与 @kafkaListener 一起工作。但我想在我的流聚合上进行“重试”。如何通过 spring-kafka 做到这一点?
下面的代码示例是关于银行交易(流)和账户余额(状态)。
假设一笔银行交易是这样的:将 10 美元从账户 10001 转移到账户 10002。
我有下面的流代码,使用 reduce 函数从 10001 到 -10 和 +10 到 10002。
并且余额被物化到状态存储 BALANCE。
如果账户10001余额小于10,交易将无法完成。
但需要重试,因为存款交易可能会在短时间内到来。并存入10001后,余额>10,则本次交易完成。
这是我的流豆
@Bean
public KStream<String,BankTransaction> alphaBankKStream(StreamsBuilder streamsBuilder) {
JsonSerde<BankTransaction> valueSerde = new JsonSerde<>(BankTransaction.class);
KStream<String,BankTransaction> stream = streamsBuilder.stream(Topic.TRANSACTION_RAW,Consumed.with(Serdes.String(),valueSerde));
KStream<String,BankTransaction>[] branches = stream.branch(
(key,value) -> isBalanceEnough(value),(key,value) -> true /* all other records */
);
branches[0].flatMap((k,v) -> {
List<BankTransactionInternal> txInternals = BankTransactionInternal.splitBankTransaction(v);
List<KeyValue<String,BankTransactionInternal>> result = new LinkedList<>();
result.add(KeyValue.pair(v.getFromAccount(),txInternals.get(0)));
result.add(KeyValue.pair(v.getToAccount(),txInternals.get(1)));
return result;
}).filter((k,v) -> !Constants.EXTERNAL_ACCOUNT.equalsIgnoreCase(k))
.map((k,v) -> KeyValue.pair(k,v.getAmount()))
.groupBy((account,amount) -> account,Grouped.with(Serdes.String(),Serdes.Double()))
.reduce(Double::sum,Materialized.<String,Double,KeyValueStore<Bytes,byte[]>>as(StateStore.BALANCE).withValueSerde(Serdes.Double()));
return stream;
}
private boolean isBalanceEnough(BankTransaction bankTransaction) {
// read balance from state store BALANCE
return balance >= bankTransaction.amount
}
解决方法
KStream
不在 Spring for Apache Kafka 的范围内; spring 只参与建立拓扑;设置完成后,您将直接使用 kafka-streams。所有功能都由您设置的拓扑提供。
@RetrybleTopic
功能仅适用于 @KafkaListener
(或更具体地说是 kafka 列表容器)。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。