如何解决在 Pyspark 结构化流中仅捕获 CDC 的有效负载?
- 我正在尝试建立从 SQL Server 到 Pyspark 的管道以捕获 SQL Server 中的数据更改,我已准备好一切:
- 在 SQL Server 中启用 CDC
- 从 SQL Server 生成到 Kafka,并在 Pyspark 结构化流中从 Kafka 主题消费。
- 问题是:当我尝试使用控制台消费者检查数据更改是否通过 Kafka 时,它向我显示 JSON 格式的消息分为两条记录:Schema 和 Payload,在 Payload 内部有 Before 和 After您分别是更改前的数据和更改后的数据。
- 我只在有效载荷中受到了治疗-->在此 JSON 消息的一部分之后
- 因为当我像这样流式传输它时,在 Jupyter 命令行中我需要的字段上显示 null,我理解这是因为 JSON 格式很复杂
- 这是我的 pyspark 代码:
- 我只在有效载荷中受到了治疗-->在此 JSON 消息的一部分之后
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 pyspark-shell'
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
kafka_topic_name = "test-spark"
kafka_bootstrap_servers = '192.168.1.3:9092'
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
.master("local[*]") \
.getOrCreate()
# Construct a streaming DataFrame that reads from TEST-SPARK
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",kafka_bootstrap_servers) \
.option("subscribe",kafka_topic_name) \
.load()
print("Printing Schema of df: ")
df.printSchema()
df1 = df.selectExpr("CAST(value AS STRING)","timestamp")
df1.printSchema()
schema = StructType() \
.add("name",StringType()) \
.add("type",StringType())
df2 = df1\
.select(from_json(col("value"),schema)\
.alias("records"),"timestamp")
df3 = df2.select("records.*","timestamp")
print("Printing Schema of records_df3: ")
df3.printSchema()
records_write_stream = df3 \
.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.option("truncate","false")\
.format("console") \
.start()
records_write_stream.awaitTermination()
print("Stream Data Processing Application Completed.")
- 这是一张图片,显示了到达 Kafka 的 CDC 消息:
- 如果有人知道如何只消耗有效负载-->在参与 Pyspark 结构化流媒体之后,请帮助我。
解决方法
您应该将您的 Debezeium 连接器修改为具有 value.converter.schemas.enabled=false
,然后您将只有 payload
字段可供使用。
否则,您可以为整个对象创建一个类/模式以及 from_json()
函数,或者将值保留为字符串并使用 get_json_object()
Spark 函数解析数据
同样相关 - 您可能想要提取 NewRecordState
,-搜索更多信息后,我发现了如何仅显示和捕获 CDC 消息的有效负载部分。
- 您需要将其添加到您的 Worker.properties 中:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。