如何解决KeyValueStore.get返回不一致的结果
stateStore.get()
在transform()
上的KStream
中使用时返回不一致的结果。即使相应的键值已put()
进入存储,它也会返回null。
有人可以解释KeyValueStore<>
的这种行为吗?
@Component
public class StreamProcessor {
@StreamListener
public void process(@Input(KStreamBindings.INPUT_STREAM) KStream<String,JsonNode> inputStream) {
KStream<String,JsonNode> joinedEvents = inputStream
.selectKey((key,value) -> computeKey(value))
.transform(
() -> new SelfJoinTransformer((v1,v2) -> join(v1,v2),"join_store"),"join_store"
);
joinedEvents
.foreach((key,value) -> System.out.format("%s,joined=%b\n",key,value.has("right")));
}
private JsonNode join(JsonNode left,JsonNode right) {
((ObjectNode) left).set("right",right);
return left;
}
}
public class SelfJoinTransformer implements Transformer<String,JsonNode,KeyValue<String,JsonNode>> {
private KeyValueStore<String,JsonNode> stateStore;
private ValueJoiner<JsonNode,JsonNode> valueJoiner;
private String storeName;
public SelfJoinTransformer(ValueJoiner<JsonNode,JsonNode> valueJoiner,String storeName) {
this.storeName = storeName;
this.valueJoiner = valueJoiner;
}
@Override
public void init(ProcessorContext context) {
this.stateStore = (KeyValueStore<String,JsonNode>) context.getStateStore(storeName);
}
@Override
public KeyValue<String,JsonNode> transform(String key,JsonNode value) {
JsonNode oldValue = stateStore.get(key);
if (oldValue != null) { //this condition rarely holds true
stateStore.delete(key);
System.out.format("%s,joined\n",key);
return KeyValue.pair(key,valueJoiner.apply(oldValue,value));
}
stateStore.put(key,value);
return null;
}
}
解决方法
原因似乎是消息消失了(假设标点符号没有将其删除)是因为您使用了KStream :: selectKey(...),它会更改密钥,但不会进行重新分区 而且您可能会在错误的分区中寻找密钥。
看下面的场景:
- 消息1:
k1
,v1
(partition0
) - 消息2:
k2
,v2
(partition1
)
假设消息放在不同的分区中(由于键)
在selectKey之后:k1 -> k
,k2 -> k
- 消息1:
k
,v1
- 消息2:
k
,v2
操作selectKey
是无状态的,因此消息不会发送到下游(主题),也不会发生重新分区。
对于第一条消息:将键-k的值放入存储区(partition0)
当第二条消息到达时:对于键-k,没有消息,因为它是不同的分区(partition1)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。