微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink 的 QueryableStateClient 抛出“FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85”

如何解决Flink 的 QueryableStateClient 抛出“FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85”

我正在尝试使用其 QueryableStateClient API 在 Flink 作业中持久化的自定义对象上查询 ValueState。我可以观察到持久性工作正常,但是,来自客户端应用程序的查询抛出 FlinkRuntimeException

以下是客户端应用程序中使用的代码片段:

ValueStateDescriptor<CustomObj> descriptor =new ValueStateDescriptor<>("StateDesc",Typeinformation.of(new TypeHint<>() {}).createSerializer(new ExecutionConfig()));

QueryableStateClient client = new QueryableStateClient(proxyHostName,proxyPort);
CompletableFuture<ValueState<CustomObj>> res =client.getKvstate(jobId,"state-query",key,BasicTypeInfo.STRING_TYPE_INFO,descriptor);

当我执行 res.get() 时,以下是我得到的错误

Caused by: java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
    ... 134 common frames omitted
Caused by: org.apache.flink.util.FlinkRuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 85
    at org.apache.flink.queryablestate.client.QueryableStateClient.createState(QueryableStateClient.java:289)
    at org.apache.flink.queryablestate.client.QueryableStateClient.lambda$getKvstate$2(QueryableStateClient.java:274)
    at java.base/java.util.concurrent.CompletableFuture$uniapply.tryFire(CompletableFuture.java:642)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:326)

flink 作业中用于持久化状态的代码片段:

ValueStateDescriptor<CustomObj> descriptor =new ValueStateDescriptor<>("StateDesc",Typeinformation.of(new TypeHint<>() {}));
descriptor.setQueryable("state-query");
state = getRuntimeContext().getState(descriptor);

我也尝试过 Types.GENERIC(CustomObj.class) 使用描述符,但没有用。仅供参考,我在我的 flink 作业和客户端应用程序中都使用了我的 CustomObj 的精确副本。 如果我做错了什么,请强调。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。