微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink SQL单元测试:如何分配水印?

如何解决Flink SQL单元测试:如何分配水印?

我正在为使用match_recognize的Flink sql语句编写单元测试。我正在像这样设置测试数据

Table data = tEnv.fromValues(DataTypes.ROW(
  DataTypes.FIELD("event_time",DataTypes.TIMESTAMP(3)),DataTypes.FIELD("foobar",DataTypes.STRING()),....
  ),row(...),row(...)
);

我有两个问题,

  • 如何将event_time指定为加水印字段? (指示行时间)
  • 不太重要,给表创建一个有意义的名称吗?

FLINK版本:1.11

解决方法

您遇到了Table API的当前限制:无法结合forValues方法来定义水印和行时间属性;您需要一个连接器。有几种解决方法:

1。。请使用与csv堆叠在一起的VALUES连接器,如this example所示。

2。。使用内置的DataGen connector。由于您正在为CEP进行单元测试,因此我想您希望对生成的数据进行某种程度的控制,因此这可能不是一个可行的选择。还以为我会提起它。

注意:建议使用SQL DDL语法从Flink 1.10创建表。这将使您要尝试做的两件事(定义水印并命名表)更加简单:

tEnv.executeSql("CREATE TABLE table_name (\n" +
                "             event_time TIMESTAMP(3),\n" +
                "             foobar STRING \n" +
                "             WATERMARK FOR event_time AS event_time\n" +
                ") WITH (...)"
);

Table data = tEnv.from("table_name");

水印被声明为列,您可以选择使用多种水印策略。请检查this documentation page了解更多详情。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。