如何解决Spark结构化流使用pyspark将NULL输出到Hive
我在从Dstreams移植到结构化流媒体时遇到了麻烦,并且我已经设置了代码以将数据从比特币价格API输出到蜂巢中。
数据看起来像这样
[{"time": 1599859680,"open": "10328.0","high": "10330.8","low": "10328.0","close": "10330.8","vwap": "10330.8","volume": "0.00321565","count": 1},{"time": 1599859740,"open": "10330.8","high": "10331.9","low": "10330.8","close": "10331.0","vwap": "10331.5","volume": "0.92199459","count": 12},{"time": 1599859800,"open": "10331.0","high": "10331.0","low": "10331.0","vwap": "0.0","volume": "0.00000000","count": 0}]
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import *
bootstrapServers = "localhost:9099"
topics = "kraken"
subscribeType = "subscribe"
spark = SparkSession\
.builder\
.appName("StructuredBitcoin")\
.config("spark.sql.warehouse.dir","/user/hive/warehouse")\
.config("hive.metastore.uris","thrift://localhost:9083")\
.enableHiveSupport()\
.getorCreate()
data = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers",bootstrapServers)\
.option(subscribeType,topics)\
.load()
我按如下方式定义架构
schema = StructType([
StructField("time",TimestampType()),StructField("open",IntegerType()),StructField("high",StructField("low",StructField("close",StructField("vwap",StructField("volume",StructField("count",IntegerType())
])
然后我选择数据(我认为这是我要去的地方)
df = data.select(from_json(col("value").cast("string"),schema)\
.alias("bitcoin")).selectExpr("bitcoin.*")
并写入Hive
def write(df,epoch_id):
df.show()
df.write.mode("append").saveAsTable("bitcoin.data")
之后,我编写流并等待终止
query = df.writeStream.foreachBatch(write).start()
query.awaitTermination()
输出结果只是一堆NULL值,这使我相信数据无法正确读取。
NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL
我在哪里错了?
感谢帮助
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。