微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark流updateStateWithKey失败

如何解决Spark流updateStateWithKey失败

我正在做一个作业,我需要在Python Spark流作业中保持批次之间的数据总计运行。我正在使用updateStateByKey(下面的代码的结尾):

import sys

from pyspark import SparkContext,SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

import os

if __name__ == "__main__":
    # Create Spark Context
    sc = SparkContext(appName="PythonStreamingDirectKafkaCount")
    ssc = StreamingContext(sc,1)

    # Checkpoint for backups
    ssc.checkpoint("file:///tmp/spark")

    brokers,topic = sys.argv[1:]

    print(brokers)
    print(topic)

    sc.setLogLevel("WARN")

    #Connect to Kafka
    kafkaParams = {"Metadata.broker.list": brokers}
    kafkaStream = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams)


    def parse_log_line(line):
        (uuid,timestamp,url,user,region,browser,platform,cd,ttf) = line.strip().split(",")
        hour = timestamp[0:13]
        return (url,1)

    lines = kafkaStream.map(lambda x: x[1])
    parsed_lines = lines.map(parse_log_line)

    clicks = parsed_lines.reduceByKey(lambda a,b: a + b)
    clicks.pprint()

    def countKeys(newValues,lastSum):
        if lastSum is None :
            lastSum = 0
        return sum(newValues,lastSum)

    # The problem is here
    sum_clicks = clicks.updateStateByKey(countKey)
    # I tried this but it didn't help
    # sum_clicks = clicks.updateStateByKey(countKeys,numPartitions=2)
    sum_clicks.pprint()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

调用pprint()时会显示错误消息的相关部分,但是我认为这仅仅是因为这会触发评估。错误是:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Checkpoint RDD has a different number of partitions from original RDD. Original RDD [ID: 195,num of partitions: 2]; Checkpoint RDD [ID: 267,num of partitions: 0].

它表明原始RDD和Checkpoint RDD中的分区数是不同的-但我尝试指定numPartitions = 2并没有区别。

有人知道我在做什么错吗?谢谢!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。