如何解决我没有从与 java 集成的 ksql StreamQuery 获得结果当我为客户端打印日志时显示未完成
我使用的是融合的 kafka 6.o 版 从 https://www.confluent.io/download/
下载我指的是
- https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-clients/java-client/ 批评 .
- https://www.youtube.com/watch?v=85udigshlNI
使用 java 生产者代码,我可以将值发送到 ksql。但无法检索此值。 当我打印 streamQuery 结果的日志时,我收到未完成消息。
使用 Maven 依赖作为:
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>${ksqldb.version}</version>
</dependency>
</dependencies>
Java 代码:
public class ExampleApp {
public static String KsqlDB_SERVER_HOST = "localhost";
public static int KsqlDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) {
ClientOptions options = ClientOptions.create()
.setHost(KsqlDB_SERVER_HOST)
.setPort(KsqlDB_SERVER_HOST_PORT);
Client client = Client.create(options);
// Send requests with the client by following the other examples
// Terminate any open connections and close the client
client.close();
}
}public class ExampleApp {
public static String KsqlDB_SERVER_HOST = "localhost";
public static int KsqlDB_SERVER_HOST_PORT = 8088;
public static void main(String[] args) {
ClientOptions options = ClientOptions.create()
.setHost(KsqlDB_SERVER_HOST)
.setPort(KsqlDB_SERVER_HOST_PORT);
Client client = Client.create(options);
StreamedQueryResult streamedQueryResult = client.streamQuery("SELECT * FROM MY_STREAM EMIT CHANGES;").get();
for (int i = 0; i < 10; i++) {
// Block until a new row is available
Row row = streamedQueryResult.poll();
if (row != null) {
System.out.println("Received a row!");
System.out.println("Row: " + row.values());
} else {
System.out.println("Query has ended.");
}
}
client.close();
}
}
输出: 即使将值添加到主题等待中,get() 仍然等待很长时间,最终给出超时异常。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。