微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

以 JSONArray 作为输入的 Spark Streaming [Pyspark]

如何解决以 JSONArray 作为输入的 Spark Streaming [Pyspark]

我从 EventHubs(类似于 Kafka)以 JSONArray 的形式获取事件。
示例:

[
    {
        "guid": "001","userName": "Peter","timestamp": "2021-05-10T10:04:01Z"
    },{
        "guid": "002","userName": "Jane","timestamp": "2021-05-10T10:05:01Z"
    },{
        "guid": "003","userName": "Richard","timestamp": "2021-05-10T10:06:01Z"
    }
]

我的代码如下:

from pyspark.sql.types import *
from pyspark.sql.functions import *
import json

# defining the schema for the JSONArray
read_schema = ArrayType(StructType([
  StructField("guid",StringType(),True),StructField("userName",StructField("timestamp",TimestampType(),True)]))

# defining the raw dataframe from EventHubs
streamingInputDF = (spark
    .readStream                       
    .format("eventhubs")
    .options(**eh_conf)
    .load()
)

# defining the dataframe based on the prevIoUs one. Only need the body which contains the data
streamingBodyDF = (
  streamingInputDF
    .selectExpr("cast(Body as string) as json")
    .select(from_json("json",read_schema)
    .alias("data"))
)

streamingNewDF = (
  streamingBodyDF
    .select(explode_outer("data"))
)


query = (
  streamingNewDF 
    .writeStream
    .format("json")        
    .queryName("my_stream")     
    .outputMode("append")
    .option("path","/FileStore/sink_test")
    .option("checkpointLocation","/FileStore/chkpt_dir_test")
    .start()
)

流运行良好,图表显示了事件的情节。 过了一会儿,我停下来检查水槽。它说我有数千行,但没有列。我的接收器中的所有 json 都是 0 字节文件。 我想我对 read_schema 的定义和/或explode_outer 函数的使用的配置有问题。

这看起来很像 this question,但解决方案对我没有帮助。

感谢您的帮助。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。