微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何获取ksqldb表的当前状态?

如何解决如何获取ksqldb表的当前状态?

阅读一些文章后,我了解到ksqdb表实际上是聚合流。意思是它显示当前业务状态。不是业务历史。业务历史包含流。但是当我在表中使用推式查询时,它显示的是历史列表而不是当前状态。我的过程定义如下。

流创建:

CREATE STREAM products2 (product_name VARCHAR KEY,cost DOUBLE)
WITH (kafka_topic='products2',partitions=1,value_format='json');

表创建:

CREATE TABLE products2_t (product_name VARCHAR PRIMARY KEY,value_format='json');

插入查询

insert into PRODUCTS2 (product_name,cost) values ('a',1); 
insert into PRODUCTS2 (product_name,2);
insert into PRODUCTS2 (product_name,cost) values ('b',1);
insert into PRODUCTS2 (product_name,2); 

输出没问题,

select * from products2 emit changes;

enter image description here


输出根据ksqldb表定义不正确:

enter image description here

期望的输出列表将是:

a 13 
b 5

所以请帮助我。这里哪里错了。
感谢所有人。

解决方法

该表将显示当前状态,但是当状态{em>更改时,使用EMIT CHANGES(即推送查询),您将收到输出。如果您在输入暂停时重新运行该查询,则只会看到当前状态。

如果要查询当前状态而不是后续更改,则可以运行请求查询,需要在表中具体化该状态。例如:

CREATE TABLE PRODUCTS_T AS
SELECT PRODUCT_NAME,LATEST_BY_OFFSET(COST) AS LATEST_COST
FROM PRODUCTS2
GROUP BY PRODUCT_NAME;

现在您可以直接查询它了-请注意查询返回并且不等待任何将来的状态更改:

ksql> SELECT LATEST_COST FROM PRODUCTS_T WHERE PRODUCT_NAME='a';
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|LATEST_COST                                                                                                                                                                            |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2.0                                                                                                                                                                                    |
Query terminated
ksql>

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?