如何解决如何在没有所有功能的情况下创建从 Twitter 到 PySpark 的结构化流管道?
我一直在尝试将以下 Tweet_listener.py 代码简化为更容易理解的内容,因为我需要将它教给没有深厚 Python 背景的初学者。我的目标是在没有类和函数的情况下(主要是)做同样的事情。 Tweet_Listener.py 是我到目前为止所拥有的,它确实流式传输推文,但我无法让数据出现在 PySpark(最后一个代码位)中。当我尝试显示数据时,在第一个 Tweet_Listener.py 代码工作的地方没有显示任何内容。我无法弄清楚我需要在 Tweet_Listener_simple.py 代码中更改什么才能实现这一点。任何建议将不胜感激!
Tweet_Listener.py
import tweepy
from tweepy import OAuthHandler # to authenticate Twitter API
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
# Twitter developer Credentials to connect to twitter account
access_token = "get_your_own"
access_secret = "get_your_own"
consumer_key = "get_your_own" # API key
consumer_secret = "get_your_own" # API secret key
class TweetsListener(StreamListener):
# initialized the constructor
def __init__(self,csocket):
self.client_socket = csocket
def on_data(self,data):
try:
# read the Twitter data which comes as a JSON format
msg = json.loads(data)
# the 'text' in the JSON file contains the actual tweet.
# We will encode this with utf-8 which will clean out any emojis and stuff that may cause errors for us
# We can come back and change this later on if we need to
print(msg['text'].encode('utf-8'))
# the actual tweet data is sent to the client socket
self.client_socket.send(msg['text'].encode('utf-8'))
return True
except BaseException as e:
# Error handling
print("Ahh! Look what is wrong : %s" % str(e))
return True
# If there actually is an error,we will print the status
def on_error(self,status):
print(status)
return True
# Now let's set up our connection using the access tokens we got from twitter
def sendData(c_socket):
# authentication
auth = OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_secret)
# twitter_stream will get the actual live tweet data
# This is a stream object
twitter_stream = Stream(auth,TweetsListener(c_socket))
# filter the tweet Feeds related to "corona"
twitter_stream.filter(track=['corona'])
# in case you want to pass multiple criteria
# twitter_stream.filter(track=['DataScience','python','Iot'])
if __name__ == '__main__':
# create a socket object
s = socket.socket()
# Get local machine name : host and port
host = "127.0.0.1"
# You'll want to make sure this port is being used elsewhere,otherwise you'll get an error
port = 3350
# Bind to the port
s.bind((host,port))
print("Listening on port: %s" % str(port))
# Wait and Establish the connection with client.
s.listen(5)
# This sends us back a tuple with the data and the addresss where it came from
c,addr = s.accept()
# Let's print it so we can confirm that when we are at the command line
print("Received request from: " + str(addr))
# Keep the stream data available
sendData(c)
Tweet_Listener_simple.py
##### Twitter libraries #######
import tweepy
from tweepy import OAuthHandler # to authenticate Twitter API
from tweepy import Stream
from tweepy.streaming import StreamListener
###### socket library for testing ######
import socket
####### json library for handling the data ########
import json
#################################################################
########## Socket Basic set up (not specific to Twitter) ########
###################################################################
# create a socket object
s = socket.socket()#(socket.AF_INET,socket.soCK_STREAM) #do we need this?
# Get local machine name : host and port
host = "127.0.0.1"
# You'll want to make sure this port is being used elsewhere,otherwise you'll get an error
port = 3351
# Bind to the port
s.bind((host,port))
print("Listening on port: %s" % str(port))
# Okay so Now it will appear as if your code just hangs here when you run it
# Your code will not move to the next line until you establish a connection
# with the client in the next step,meaning your jupyter notebook needs to
# connect to the same address and post to kick off the listening.
# Wait and Establish the connection with client.
s.listen(5)
# This sends us back a tuple with the data and the address where it came from
client_socket,addr = s.accept()
# Let's print it so we can confirm that when we are at the command line
print("Received request from: " + str(addr)," connection created.")
############################################################
###### Now this next part is ALL Twitter Stuff ##########
############################################################
# This is logic from tweepy directly to check on the status of the stream
class MyStreamListener(tweepy.StreamListener):
def on_status(self,status):
print(status.text)
# Twitter developer Credentials to connect to twitter account
access_token = "get_your_own"
access_secret = "get_your_own"
consumer_key = "get_your_own" # API key
consumer_secret = "get_your_own" # API secret key
# Create your listen object
myStreamListener = MyStreamListener()
# authentication
auth = OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_secret)
# Create your stream object
twitter_stream = tweepy.Stream(auth = auth,listener=myStreamListener)
# You'll want to filter your results down otherwise you'll get EVERYTHING from twitter
# twitter_stream.filter(track=['corona'])
# in case you want to pass multiple criteria
twitter_stream.filter(track=['DataScience','Iot'])
# Now we can try to accept data!
# This part will change a bit depending on your data source,# But this shows you the essentials of what you'll need to do
while True:
try:
# read the Twitter data which comes as a JSON format
msg = json.loads(twitter_stream)
# the 'text' in the JSON file contains the actual tweet.
# We will encode this with utf-8 which will clean out any emojis and stuff that may cause errors for us
# We can come back and change this later on if we need to
print(msg['text'].encode('utf-8'))
# the actual tweet data is sent to the client socket
client_socket.send(msg['text'].encode('utf-8'))
except BaseException as e:
# Error handling
print("Ahh! Look what is wrong : %s" % str(e))
print(status.text)
client_socket.close()
break
Jupyter 笔记本中的 PySpark 代码:
# Import your dependecies
import pyspark # run after findspark.init() if you need it
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col,split
# Start up your pyspark session as always
spark = SparkSession.builder.appName("TwitterStream").getorCreate()
spark
# read the tweet data from socket
tweet_df = spark \
.readStream \
.format("socket") \
.option("host","127.0.0.1") \
.option("port",3351) \
.load()
# type cast the column value
tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")
# Then split words based on space,filter out only hashtag (#) values and group them up.
tweets_tab = tweet_df_string.withColumn('word',explode(split(col('value'),' '))) \
.groupBy('word') \
.count() \
.sort('count',ascending=False). \
filter(col('word').contains('#'))
writeTweet = tweets_tab \
.writeStream \
.outputMode("complete") \
.format("memory") \
.queryName("tweetquery") \
.start()
print("----- streaming is running -------")
# This is the part where nothing shows up
spark.sql("select * from tweetquery").show()
选择 * 的输出:
+----+-----+
|word|count|
+----+-----+
+----+-----+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。