如何解决状态存储卡夫卡删除
我是 kafka 的新手。我定义了以下状态存储:
StoreBuilder<keyvalueStore<String,String>> storeBuilder=Stores.keyvalueStoreBuilder(
Stores.inMemorykeyvalueStore(storeName),Serdes.String(),Serdes.String());
builder.addStateStore(storeBuilder);
我想将记录删除到商店中,但我不能这样做。我曾尝试使用墓碑,但它会向我插入带有键和空值的新记录,而不是删除所有内容。我试过做 stream.cleanup() 但它甚至没有去。我的商店越来越多,我不知道该怎么做。 这是我使用商店尝试取消记录的一段代码:
private String checkAndUpdateStateStore(Vessel v) throws ParseException {
store.put(v.getVessel(),null);
//store.delete(v.getVessel());
return v.getVessel();
}
另一个问题是,在存储中,我希望对于同一个键,旧值必须替换为新值,而不是为同一个键创建新记录。 提前感谢那些让我摆脱困境的人
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。