如何解决集成Spark Streaming和Twitter API时,为什么总是将空RDD发送给Spark?
我是Spark Streaming的新手,我正在做一个小型个人项目来研究这项技术。我想使用Twitter API获取实时推文,然后使用Spark Streaming转换流数据以可视化流行标签。
我编写了一个Python脚本twitter_api.py
,以从Twitter API获取推文,然后通过TCP连接将数据发送到Spark。我认为这一步没有问题,因为我可以打印出获得的推文。
但是,在另一个脚本spark.py
中,我在处理RDD时总是得到'ValueError'
。它说我的RDD是空的。
我认为pyspark
在笔记本电脑上配置良好,因为我可以运行静态示例。
spark.py
脚本如下所示:
#!/usr/bin/env python3
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession,Row
from twitter_app import TCP_IP,TCP_PORT,KEYWORD
import seaborn as sns
import matplotlib.pyplot as plt
import time
import sys
def spark(TCP_IP,KEYWORD):
sc = SparkContext(appName='TwitterStreamingApp')
sc.setLogLevel('ERROR')
ssc = StreamingContext(sc,5)
ssc.checkpoint("checkpoint_Twitterapp")
data_stream = ssc.socketTextStream(TCP_IP,TCP_PORT)
lines = data_stream.window(20)
words = lines.flatMap(lambda x: x.split(' '))
hashtags = words.filter(lambda x: '#' in x)
def process_rdd(rdd):
try:
# initialization of SparkSession
spark = SparkSession.builder.config(conf=rdd.context.getConf()).getorCreate()
# map each rdd to each row
rdd_row = rdd.map(lambda x: Row(tag=x))
# create the dataframe
hashtags_df = spark.createDataFrame(rdd_row)
hashtags_df.createOrReplaceTempView('tags')
hashtags_count_df = spark.sql(
'SELECT tag,count(tag) FROM tags GROUP BY tag ORDER BY COUNT(tag) DESC LIMIT 10')
pd_df = hashtags_count_df.toPandas()
plt.figure(figsize=(10,8))
sns.barplot(x="total",y="word",data=pd_df.head(20))
plt.show()
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
hashtags.foreachRDD(process_rdd)
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()
if __name__ == "__main__":
spark(TCP_IP,KEYWORD)
我真的不知道出什么问题了!
-
- 我可以使用
twitter_api.py
The Tweets I obtained 获取数据
- 我可以使用
-
- pyspark应该配置正确,因为我可以在笔记本电脑上运行静态Spark。 For example,even though I got warnings I could have the correct output using Spark
p.s。我正在将MacBook Pro 2018与Catalina 10.15.5、2.3 GHz四核,Intel Core i5配合使用
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。