微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

PySpark - 使用 Kakfa Json 消息创建一个 pyspark 数据框

如何解决PySpark - 使用 Kakfa Json 消息创建一个 pyspark 数据框

我正在使用 pyspark 结构化流式传输和读取来自 Kafka 主题的数据,该主题采用 Json 复杂格式。

我使用 Spark structred Streaming Format 作为 Kafka,代码如下 -

spark = SparkSession.builder \
        .appName("PythonSparkStreamingKafka") \
        .getorCreate()

kafkaStreamDF = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers","localhost:9092") \
            .option("subscribe","main.test.MysqL.main.test_bank_data") \
            .option("startingOffsets","earliest") \
            .load()

kafkaStreamDF1 = kafkaStreamDF.selectExpr("CAST(value AS STRING)")

message_schema = StructType().add("payload",StringType())
kafkaStreamDF2 = kafkaStreamDF1.select(from_json(col("value"),message_schema).alias("message"))

consoleOutput = kafkaStreamDF2.writeStream \
                .outputMode("append") \
                .format("console") \
                .option("truncate","false") \
                .start()

我已经从消息中提取了数据,直到 kafka json 消息的 Payload 部分及其在控制台上的输出,如下所示 -

|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"MysqL","name":"main.test.MysqL","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|

|[{"before":null,"after":{"transaction_id":21,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","withdrawal_amt":"AV741A==","balance_amt":"KkPpRA=="},"transaction":null}]|

现在我想提取后部分的数据并读取数据框中的归档数据,如下所示 -

transaction_id|account_no|transaction_date|transaction_details|value_date|withdrawal_amt|deposit_amt|   balance_amt

20              | 409000611074  |   16/08/2020       |  INDO GIBL Indiaforensic STL12071 |  16/08/2020  |   129000.00      |    (null)      | 7320950.00

21              | 409000611074  |   16/08/2020       |  INDO GIBL Indiaforensic STL13071 |  16/08/2020  |   230013.00      |    (null)      | 7090937.00

请建议我如何在 pyspark 数据框中实现此预期输出

在kafka消息的确切值字段下方添加-

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"transaction_id"},{"type":"int64","field":"account_no"},{"type":"int32","optional":true," name":"io.debezium.time.Date","version":1,"field":"transaction_date"},{"type":"string","field":"transaction_details" },"name":"io.debezium.time.Date","field":"value_date"},{"type" :"bytes","name":"org.apache.kafka.connect.data.Decimal","parameters":{"scale":"2","connect .decimal.precision":"12"},"field":"withdrawal_amt"},{"type":"bytes","name":"org.apache.kafka.connect.data。 Decimal","connect.decimal.precision":"12"},"field":"deposit_amt"},{"type":" bytes","connect.decimal" .precision":"12"},"field":"balance_amt"}],"name":"main. test.MysqL.main.test_bank_data.Value","field":"before"},{"type":"struct"," field":"transaction_id"},"name" :"io.debezium.time.Date","field":"transaction_details"},"connect.decimal.precision ":"12"},"可选":tr ue,"name":"main.test.MysqL.main.test_bank_data.Value","field":"after"},"fields":[{"type":"string ","field":"version"},"field":"connector"},"field":"name"},"field":"ts_ms"},"optional ":true,"name":"io.debezium.data.Enum","parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},"field":"db"},"field ":"table"},"field":"server_id"},"field": "gtid"},"field":"file"},"field":"pos "},"field":"row"},"field":"thread"},"field":"query"}],"name":"io.debezium.connector.MysqL.source","field ":"source"},{"type":"s tring","field":"op"},"fields":[{"type":"string","field":"id"},"field":" total_order"},"field":"data_collection_order"}],"field":"transaction"}],"name":"main.test.MysqL.main.test_bank_data.Envelope"},"payload":{"before":null,"after":{"transaction_id":146,"account_no":409000611076,"transaction_date ":18652,"transaction_details":"TRF FROM Indiaforensic SERVICES","value_date":18652,"withdrawal_amt":"AA==","deposit_amt":"B6Eg","balance_amt":"B6Eg"},"source ":{"version":"1.4.0-SNAPSHOT","ts_ms":1611587463000,"snapshot":"false"," db":"main","server_id":19105,"pos":46195052," thread":1604558,"ts_ms":1611587463181,"transaction":null}}

从这里开始,我已经在 DF1 上转换为字符串,并将 Payload 的一部分转换为 DF2。

-- 最终工作条件评论-- 在 Kafka 连接端的 Debezium MysqL 连接器中转换 SMT 后添加我在 PySpark 结构化流中使用 Kafaka 获取消息值,如下所示 -

Value = 
{"transaction_id":21,"transaction_date":"2020-08- 
229","transaction_details":"INDO GIBL Indiaforensic STL13071 
","value_date":"2020-08-22","withdrawal_amt":"230013.00","balance_amt":"7090937.00"}

message_schema = StructType([
StructField('transaction_id',IntegerType(),True),StructField('account_no',LongType(),StructField('transaction_date',StringType(),StructField('transaction_details',StructField('value_date',StructField('withdrawal_amt',StructField('deposit_amt',StructField('balance_amt',True)   
]
)

解决方法

您可以将字符串 JSON 消息的架构传递给 from_json 函数。

有这样的消息:

#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|value                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
#|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
#|[{"before":null,"after":{"transaction_id":21,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","withdrawal_amt":"AV741A==","balance_amt":"KkPpRA=="},"transaction":null}]|
#+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

您可以修改代码以将 json 中的 after 字段解析为 MapType,然后选择您想要的键作为列:

message_schema = StructType([
     StructField('before',MapType(StringType(),StringType(),True),StructField('after',StructField('source',StructField('op',StructField('ts_ms',StructField('transaction',True)
     ]
)

after_fields = [
    "account_no","balance_amt","deposit_amt","transaction_date","transaction_details","transaction_id","value_date","withdrawal_amt"
]

# parse json strings using from_json and select message.after.*
 kafkaStreamDF.withColumn(
     "message",F.from_json(F.col("value"),message_schema)
 ).select(
     *[F.col("message.after").getItem(f).alias(f) for f in after_fields]
 ).writeStream \
  .outputMode("append") \
  .format("console") \
  .option("truncate","false") \
  .start() \
  .awaitTermination()   

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。