微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用现有的流式 DF 将有状态事件更新到 Delta Lake 表?

如何解决如何使用现有的流式 DF 将有状态事件更新到 Delta Lake 表?

我正在尝试将 Kafka 中的事件更新插入 Delta Lake 表中。我这样做with this。 新事件即将到来,增量表中的值会根据合并条件进行更新。现在,当我停止执行然后重新运行 upsert 脚本时,Delta Lake 似乎没有按照它们在脚本已经运行时进入时的相同顺序对我的流式 df 中的每一行执行 upsert。合并函数无法识别更新事件的键与增量表中应该已经存在的键之间的匹配,它只是插入每一行,即使键应该已经被前一个事件插入。

任何人都可以向我解释是否可以从一开始就将事件作为增量表中的 upsert 重放?如果是这样,你会怎么做?

我想要的:

  1. 带有键 a 的事件传入,a 的键和值作为新行插入
  2. 带有键 b 的事件传入,b 的键和值作为新行插入
  3. 带有键 a 的事件进入,a 的值被更新

当我重新开始读取流和 Delta Lake 合并函数时会发生什么:

  1. 带有键 a 的事件传入,a 的键和值作为新行插入
  2. 带有键 b 的事件传入,b 的键和值作为新行插入
  3. 带有键 a 的事件传入,a 的键和值作为新行插入

我希望发生的事情: writeStream with forEachBatch 从 Kafka 的第一个最早的 micro-batch 开始,然后 upsertToDelta 从头开始​​按顺序插入 micro-batch 中的行

我的代码

from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
import json

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers","[HOST]") \
    .option("subscribe","[topic]") \
    .option("includeHeaders","true") \
    .option("startingOffsets","earliest") \
    [...].load()

dfgrouped = df.selectExpr("CAST(value AS STRING)")
records = (dfgrouped.withColumn("my_key",my_udf(dfgrouped["value"])))
# my_udf is a custom function to get a key based in the row value

deltaTable = DeltaTable.forName(spark,"mydeltable")

@udf
def get_updated_value(my_key,update_value,events_value):
    [...]
    return blob

@udf
def get_new_value(my_key,update_value):
    [...]
    return blob
    
def upsertToDelta(updatesDF,id):
    deltaTable.alias("events") \
    .merge(
        source = updatesDF.alias("updates"),condition = expr("events.my_key = updates.my_key") # It does not detect events.my_key for updates
    ) \
    .whenMatchedUpdate(set =
        {
        "value": get_updated_value(col("updates.my_key"),col("updates.value"),col("events.value"))
        }
    ) \
    .whenNotMatchedInsert(values =
        {
        "my_key": col("updates.my_key"),"value": get_new_value(col("updates.my_key"),col("updates.value"))
        }
    ) \
    .execute()
    
records.writeStream \
    .format("kafka") \
    .foreachBatch(upsertToDelta) \
    .outputMode("update") \
    .option("checkpointLocation","/delta/events/_checkpoints/[CHECKPOINT]") \
    .option("kafka.bootstrap.servers","[HOST]") \
    .option("topic","[SINK_TOPIC]") \
    [...].start()    

解决方法

Delta Lake 在充当流接收器时仅支持 Append Only 或 Complete 模式(即,将所有记录附加到表中或替换整个表) https://docs.delta.io/latest/delta-streaming.html#delta-table-as-a-sink

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。