如何解决以 JSONArray 作为输入的 Spark Streaming [Pyspark]
我从 EventHubs(类似于 Kafka)以 JSONArray 的形式获取事件。
示例:
[
{
"guid": "001","userName": "Peter","timestamp": "2021-05-10T10:04:01Z"
},{
"guid": "002","userName": "Jane","timestamp": "2021-05-10T10:05:01Z"
},{
"guid": "003","userName": "Richard","timestamp": "2021-05-10T10:06:01Z"
}
]
我的代码如下:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json
# defining the schema for the JSONArray
read_schema = ArrayType(StructType([
StructField("guid",StringType(),True),StructField("userName",StructField("timestamp",TimestampType(),True)]))
# defining the raw dataframe from EventHubs
streamingInputDF = (spark
.readStream
.format("eventhubs")
.options(**eh_conf)
.load()
)
# defining the dataframe based on the prevIoUs one. Only need the body which contains the data
streamingBodyDF = (
streamingInputDF
.selectExpr("cast(Body as string) as json")
.select(from_json("json",read_schema)
.alias("data"))
)
streamingNewDF = (
streamingBodyDF
.select(explode_outer("data"))
)
query = (
streamingNewDF
.writeStream
.format("json")
.queryName("my_stream")
.outputMode("append")
.option("path","/FileStore/sink_test")
.option("checkpointLocation","/FileStore/chkpt_dir_test")
.start()
)
流运行良好,图表显示了事件的情节。 过了一会儿,我停下来检查水槽。它说我有数千行,但没有列。我的接收器中的所有 json 都是 0 字节文件。 我想我对 read_schema 的定义和/或explode_outer 函数的使用的配置有问题。
这看起来很像 this question,但解决方案对我没有帮助。
感谢您的帮助。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。