微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

PySpark 在流中向数据集添加行

如何解决PySpark 在流中向数据集添加行

我是 Spark 世界的新手,尤其是 pyspark。我有一个应用程序可以通过流式传输从 Kafka 获取数据,并且必须以某种方式处理这些数据。我需要的是将新的行添加到原始数据集,简而言之,我的应用程序中的数据集正在扩展。 这是我的尝试:

universe = spark_session.read.csv('./dataset/data_v4',header=True,inferSchema=True)
universe = universe.withColumn('id',monotonically_increasing_id())
universe.createOrReplaceTempView('universe')

universe.show()

ssc = StreamingContext(spark_session.sparkContext,5)

quiet_logs(spark_context)

brokers = [0]
topic = 'mytopic'
directKafkaStream = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"bootstrap.servers": 'myAmazonServer'})

print('Attendo i dati...')
directKafkaStream.foreachRDD(compute_rdd)
ssc.start()
ssc.awaitTermination()

函数compute_rdd中我试图这样做:

def compute_rdd(time,rdd):

    if rdd.count() > 0:
       
        print('New data...')

        stream_data = rdd.collect()
        data = json.loads(stream_data[0][1])
        date_format = '%Y-%m-%dT%H:%M:%s'

        new_data = {
            'id': universe.select('id').collect()[-1]['id'] + 1,# cambiare con funzione random o qualche funzione di spark
            'timestamp': datetime.strptime(data['timestamp'],date_format),'vessel': str(data['vessel']),'veLocity': float(data['veLocity']),'distance': float(data['distance']),'drift_angle': float(data['drift_angle']),'decision': int(data['decision'])

        }

        
        times = {}
        if universe.where(f'vessel=="{new_data["vessel"]}"').rdd.isEmpty():

            print('Add data:',new_data)
            new_row = spark_session.createDataFrame([new_data],schema=universe.schema)

            universe = universe.union(new_row) # here the error

问题是我无法使用经典语法添加新行:

df = df.union(new_row)

引发的错误UnboundLocalError: local variable 'universe' referenced before assignment 我尝试将包含我的数据集的变量 universe 作为全局编写这些函数

def read_universe(datafile):
    if 'universe' not in globals():
        universe = get_spark_context().read.csv(datafile,inferSchema=True)
        universe = universe.withColumn(ID_COLUMN_NAME,monotonically_increasing_id())
        universe.createOrReplaceTempView('universe')

        globals()['universe'] = universe
        return globals()['universe']

    return globals()['universe']


def get_universe():
    return globals()['universe']

def universe_update(new_universe):
    globals()['universe'] = new_universe

    return globals()['universe']

通过这种方式,使用这些函数,我解决了我的问题,但这是在 pyspark 框架中进行推理的正确方法吗? spark环境中的全局变量如何工作?全局变量对 Spark 中的可扩展性和并行性有影响吗?

当然,我也在这里寻求建议和其他解决方案。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。