如何解决Spark流updateStateWithKey失败
我正在做一个作业,我需要在Python Spark流作业中保持批次之间的数据总计运行。我正在使用updateStateByKey(下面的代码的结尾):
import sys
from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import os
if __name__ == "__main__":
# Create Spark Context
sc = SparkContext(appName="PythonStreamingDirectKafkaCount")
ssc = StreamingContext(sc,1)
# Checkpoint for backups
ssc.checkpoint("file:///tmp/spark")
brokers,topic = sys.argv[1:]
print(brokers)
print(topic)
sc.setLogLevel("WARN")
#Connect to Kafka
kafkaParams = {"Metadata.broker.list": brokers}
kafkaStream = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams)
def parse_log_line(line):
(uuid,timestamp,url,user,region,browser,platform,cd,ttf) = line.strip().split(",")
hour = timestamp[0:13]
return (url,1)
lines = kafkaStream.map(lambda x: x[1])
parsed_lines = lines.map(parse_log_line)
clicks = parsed_lines.reduceByKey(lambda a,b: a + b)
clicks.pprint()
def countKeys(newValues,lastSum):
if lastSum is None :
lastSum = 0
return sum(newValues,lastSum)
# The problem is here
sum_clicks = clicks.updateStateByKey(countKey)
# I tried this but it didn't help
# sum_clicks = clicks.updateStateByKey(countKeys,numPartitions=2)
sum_clicks.pprint()
ssc.start()
ssc.awaitTermination()
ssc.stop()
调用pprint()时会显示错误消息的相关部分,但是我认为这仅仅是因为这会触发评估。错误是:
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 195,num of partitions: 2]; Checkpoint RDD [ID: 267,num of partitions: 0].
它表明原始RDD和Checkpoint RDD中的分区数是不同的-但我尝试指定numPartitions = 2并没有区别。
有人知道我在做什么错吗?谢谢!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。