微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark结构化流使用pyspark将NULL输出到Hive

如何解决Spark结构化流使用pyspark将NULL输出到Hive

我在从Dstreams移植到结构化流媒体时遇到了麻烦,并且我已经设置了代码以将数据从比特币价格API输出到蜂巢中。

数据看起来像这样

[{"time": 1599859680,"open": "10328.0","high": "10330.8","low": "10328.0","close": "10330.8","vwap": "10330.8","volume": "0.00321565","count": 1},{"time": 1599859740,"open": "10330.8","high": "10331.9","low": "10330.8","close": "10331.0","vwap": "10331.5","volume": "0.92199459","count": 12},{"time": 1599859800,"open": "10331.0","high": "10331.0","low": "10331.0","vwap": "0.0","volume": "0.00000000","count": 0}]

我的设置包括以下内容

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import *

bootstrapServers = "localhost:9099"
topics = "kraken"
subscribeType = "subscribe"

spark = SparkSession\
    .builder\
    .appName("StructuredBitcoin")\
    .config("spark.sql.warehouse.dir","/user/hive/warehouse")\
    .config("hive.metastore.uris","thrift://localhost:9083")\
    .enableHiveSupport()\
    .getorCreate()

data = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers",bootstrapServers)\
    .option(subscribeType,topics)\
    .load()

我按如下方式定义架构

schema = StructType([
    StructField("time",TimestampType()),StructField("open",IntegerType()),StructField("high",StructField("low",StructField("close",StructField("vwap",StructField("volume",StructField("count",IntegerType())
    ])

然后我选择数据(我认为这是我要去的地方)

df = data.select(from_json(col("value").cast("string"),schema)\
    .alias("bitcoin")).selectExpr("bitcoin.*")

并写入Hive

def write(df,epoch_id):
    df.show()
    df.write.mode("append").saveAsTable("bitcoin.data")

之后,我编写流并等待终止

query = df.writeStream.foreachBatch(write).start()

query.awaitTermination()

输出结果只是一堆NULL值,这使我相信数据无法正确读取。

NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL

我在哪里错了?

感谢帮助

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。