如何解决PySpark 在流中向数据集添加行
我是 Spark 世界的新手,尤其是 pyspark。我有一个应用程序可以通过流式传输从 Kafka 获取数据,并且必须以某种方式处理这些数据。我需要的是将新的行添加到原始数据集,简而言之,我的应用程序中的数据集正在扩展。 这是我的尝试:
universe = spark_session.read.csv('./dataset/data_v4',header=True,inferSchema=True)
universe = universe.withColumn('id',monotonically_increasing_id())
universe.createOrReplaceTempView('universe')
universe.show()
ssc = StreamingContext(spark_session.sparkContext,5)
quiet_logs(spark_context)
brokers = [0]
topic = 'mytopic'
directKafkaStream = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"bootstrap.servers": 'myAmazonServer'})
print('Attendo i dati...')
directKafkaStream.foreachRDD(compute_rdd)
ssc.start()
ssc.awaitTermination()
def compute_rdd(time,rdd):
if rdd.count() > 0:
print('New data...')
stream_data = rdd.collect()
data = json.loads(stream_data[0][1])
date_format = '%Y-%m-%dT%H:%M:%s'
new_data = {
'id': universe.select('id').collect()[-1]['id'] + 1,# cambiare con funzione random o qualche funzione di spark
'timestamp': datetime.strptime(data['timestamp'],date_format),'vessel': str(data['vessel']),'veLocity': float(data['veLocity']),'distance': float(data['distance']),'drift_angle': float(data['drift_angle']),'decision': int(data['decision'])
}
times = {}
if universe.where(f'vessel=="{new_data["vessel"]}"').rdd.isEmpty():
print('Add data:',new_data)
new_row = spark_session.createDataFrame([new_data],schema=universe.schema)
universe = universe.union(new_row) # here the error
问题是我无法使用经典语法添加新行:
df = df.union(new_row)
引发的错误是 UnboundLocalError: local variable 'universe' referenced before assignment
我尝试将包含我的数据集的变量 universe
作为全局编写这些函数:
def read_universe(datafile):
if 'universe' not in globals():
universe = get_spark_context().read.csv(datafile,inferSchema=True)
universe = universe.withColumn(ID_COLUMN_NAME,monotonically_increasing_id())
universe.createOrReplaceTempView('universe')
globals()['universe'] = universe
return globals()['universe']
return globals()['universe']
def get_universe():
return globals()['universe']
def universe_update(new_universe):
globals()['universe'] = new_universe
return globals()['universe']
通过这种方式,使用这些函数,我解决了我的问题,但这是在 pyspark 框架中进行推理的正确方法吗? spark环境中的全局变量如何工作?全局变量对 Spark 中的可扩展性和并行性有影响吗?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。