如何解决Flink Table API 中如何为表中的每一行分配唯一 ID?
我正在使用 Flink 来计算一系列操作。每个操作都会生成一个表,该表既用于下一个操作,也用于存储在 S3 中。这样就可以在计算中查看每个中间步骤的数据,并查看每个操作的效果。
我需要为每个表中的每一行分配一个唯一标识符,以便当该标识符在接下来的步骤中再次出现时(可能在不同的列中)我知道两行彼此关联。
第一个明显的候选似乎是 ROW_NUMBER()
函数,但是:
-
它似乎没有出现在表表达式 API 中的任何地方。我必须构造 sql 字符串吗?
-
我如何使用它?当我尝试此查询时:
SELECT *,ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp
我收到此错误:
org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
-
是否总是需要对表格进行排序?这似乎是我宁愿避免的开销。
下一个选项只是为每一行生成一个随机 UUID。但是当我尝试这个时,同一个 UUID 从来没有使用过两次,所以它完全没用。举个例子:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object SandBox {
def main(args: Array[String]): Unit = {
val env = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment
)
val inp = env.fromValues(1.as("id"))
val out1 = inp.addColumns(uuid().as("u"))
val out2 = out1.addColumns($"u".as("u2"))
env.executesql("""
CREATE TABLE out1 ( id INTEGER,u VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.executesql("""
CREATE TABLE out2 ( id INTEGER,u VARCHAR(36),u2 VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.createStatementSet()
.addInsert("out1",out1)
.addInsert("out2",out2)
.execute()
// Equivalent to the createStatementSet method:
out1.executeInsert("out1")
out2.executeInsert("out2")
}
}
我得到的输出:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,55da264d-1e15-4c40-94d4-822e1cd5db9c,c9a78f93-580c-456d-9883-08bc998124ed)
我需要 out1
的 UUID 重新出现在 out2
的两列中,例如:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d,4e6008ad-868a-4f95-88b0-38ee7969067d)
我想这是由于 docs 中的这个注释:
此函数不是确定性的,这意味着将为每条记录重新计算值。
如何只计算一次 UUID 并使其“具体”,以便将相同的值发送到 out1
和 out2
?
class uuidUdf extends ScalarFunction {
def eval(): String = UUID.randomUUID().toString
}
val out1 = inp.addColumns(call(new uuidUdf()).as("u"))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。