微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何一次从三角洲湖泊表查询所有版本以跟踪对特定ID所做的更改

如何解决如何一次从三角洲湖泊表查询所有版本以跟踪对特定ID所做的更改

我有一个雇员表,其中有salaryemployee使用三角洲湖泊管理。

我可以使用delta lake支持的时间旅行功能,基于 version timestamp 查询表

SELECT *
FROM DELTA.`EMPLOYEE`
VERSION AS OF 3

但是我想知道在所有增量表版本中对雇员所做的所有更改的历史记录。像这样

SELECT *,timestamp -- From delta table,version -- From delta table
FROM DELTA.`EMPLOYEE`
WHERE EMPLOYEE = 'George'
WITHIN ALL VERSIONS --Never exists but just for understanding

解决方法

这是一个老问题,但今天我偶然发现了它,因为我有一些问题要解决。我认为 Delta (delta.io) 没有为此提供一种方法,因为 Delta 围绕时间旅行旋转到特定时间点而不是一段时间。

但如果我必须得到这个,我想一种方法是直接读取镶木地板文件(忽略增量日志),这将导致记录的所有过去版本/状态(将真空等放在一边)。

现在,如果需要获取每个记录创建的确切版本(这是我的要求),请使用类似

dataframe.withColumn("input_file",input_file_name()) 这将显示记录来自的确切文件名。

现在查询 .json _delta_log 事务文件,它会告诉我们哪个版本添加了哪个文件,就像这样

>>> details = spark.read.json('/data/gcs/delta/ingest/bigtable/_delta_log/*.json')
>>> details = details.select(col('add')['path'].alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL")
>>> details.show(5,100)
+-------------------------------------------------------------------+-------+
|                                                          file_path|version|
+-------------------------------------------------------------------+-------+
|part-00000-148c98cc-0db1-495e-bb67-0ba1cc4fd45e-c000.snappy.parquet|      4|
|part-00001-2caa89b7-c990-47e0-b7b0-92430b15b141-c000.snappy.parquet|      4|
|part-00002-1f900af7-d819-48e9-a048-ad22e5c7ce65-c000.snappy.parquet|      4|
|part-00003-e043f466-861b-47f0-a1cf-4b67e75a5ed2-c000.snappy.parquet|      4|
|part-00000-93cc0747-ca0b-46ef-ada4-b3fb18e48925-c000.snappy.parquet|      0|
+-------------------------------------------------------------------+-------+
only showing top 5 rows

在 file_path 上加入这两个数据帧,您将看到记录的每个状态/版本以及它在其中创建的增量版本。我的示例 -

parquet_table = spark.read.parquet('/data/gcs/delta/ingest/bigtable/*.parquet')

>>> parquet_table.printSchema()
root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Sales_Channel: string (nullable = true)


parquet_table = parquet_table.where(col("Order_ID")==913712584).\
            withColumn("input_file",38,1000)).\
           select(["Order_ID","Region","Country","Sales_Channel","input_file"]).\
            orderBy("Country")
            
>>> parquet_table.join(details,parquet_table.input_file == details.file_path).select("Order_ID","version").orderBy("version").show(100)
+---------+------------------+-------+-------------+-------+                    
| Order_ID|            Region|Country|Sales_Channel|version|
+---------+------------------+-------+-------------+-------+
|913712584|Sub-Saharan Africa|Lesotho|       Online|      0|
|913712584|Sub-Saharan Africa|Lesotho|       Online|      0|
|913712584|Sub-Saharan Africa|Lesotho|       Online|      0|
,

在 Databricks 上,从 Databricks Runtime 8.2 开始,有一个名为 Change Data Feed 的功能可以跟踪对表所做的更改,您可以以批处理或流的形式提取更改源以进行分析或实施更改数据捕获式处理。

在表上启用更改数据提要后,您可以使用批处理或流 API 读取数据,如下所示:

spark.read.format("delta") \
  .option("readChangeFeed","true") \
  .option("startingVersion",0) \
  .table("myDeltaTable")

并且您将获得带有 additional columns 的所有更改记录,这些记录描述了进行了哪些更改(插入/更新/删除)、何时发生(时间戳)以及在哪个版本中。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。