微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在没有所有功能的情况下创建从 Twitter 到 PySpark 的结构化流管道?

如何解决如何在没有所有功能的情况下创建从 Twitter 到 PySpark 的结构化流管道?

我一直在尝试将以下 Tweet_listener.py 代码简化为更容易理解的内容,因为我需要将它教给没有深厚 Python 背景的初学者。我的目标是在没有类和函数的情况下(主要是)做同样的事情。 Tweet_Listener.py 是我到目前为止所拥有的,它确实流式传输推文,但我无法让数据出现在 PySpark(最后一个代码位)中。当我尝试显示数据时,在第一个 Tweet_Listener.py 代码工作的地方没有显示任何内容。我无法弄清楚我需要在 Tweet_Listener_simple.py 代码中更改什么才能实现这一点。任何建议将不胜感激!

Tweet_Listener.py

import tweepy 
from tweepy import OAuthHandler # to authenticate Twitter API
from tweepy import Stream 
from tweepy.streaming import StreamListener
import socket 
import json 


# Twitter developer Credentials to connect to twitter account
access_token = "get_your_own"
access_secret = "get_your_own"
consumer_key = "get_your_own" # API key
consumer_secret = "get_your_own" # API secret key


class TweetsListener(StreamListener):
    # initialized the constructor
    def __init__(self,csocket):
        self.client_socket = csocket

    def on_data(self,data):
        try:
            # read the Twitter data which comes as a JSON format
            msg = json.loads(data)

            # the 'text' in the JSON file contains the actual tweet.
            # We will encode this with utf-8 which will clean out any emojis and stuff that may cause errors for us
            # We can come back and change this later on if we need to
            print(msg['text'].encode('utf-8'))

            # the actual tweet data is sent to the client socket
            self.client_socket.send(msg['text'].encode('utf-8'))
            return True

        except BaseException as e:
            # Error handling
            print("Ahh! Look what is wrong : %s" % str(e))
            return True
    # If there actually is an error,we will print the status
    def on_error(self,status):
        print(status)
        return True


# Now let's set up our connection using the access tokens we got from twitter
def sendData(c_socket):
    # authentication
    auth = OAuthHandler(consumer_key,consumer_secret)
    auth.set_access_token(access_token,access_secret)
    # twitter_stream will get the actual live tweet data
    # This is a stream object
    twitter_stream = Stream(auth,TweetsListener(c_socket))
    # filter the tweet Feeds related to "corona"
    twitter_stream.filter(track=['corona'])
    # in case you want to pass multiple criteria
    # twitter_stream.filter(track=['DataScience','python','Iot'])



if __name__ == '__main__':
    # create a socket object
    s = socket.socket()

    # Get local machine name : host and port
    host = "127.0.0.1"
    # You'll want to make sure this port is being used elsewhere,otherwise you'll get an error
    port = 3350

    # Bind to the port
    s.bind((host,port))
    print("Listening on port: %s" % str(port))

    # Wait and Establish the connection with client.
    s.listen(5)
    # This sends us back a tuple with the data and the addresss where it came from
    c,addr = s.accept()

    # Let's print it so we can confirm that when we are at the command line
    print("Received request from: " + str(addr))

    # Keep the stream data available
    sendData(c) 

Tweet_Listener_simple.py

##### Twitter libraries #######
import tweepy 
from tweepy import OAuthHandler # to authenticate Twitter API
from tweepy import Stream 
from tweepy.streaming import StreamListener

###### socket library for testing ######
import socket 
####### json library for handling the data ########
import json 

#################################################################
########## Socket Basic set up (not specific to Twitter) ########
###################################################################

# create a socket object
s = socket.socket()#(socket.AF_INET,socket.soCK_STREAM) #do we need this?

# Get local machine name : host and port
host = "127.0.0.1"
# You'll want to make sure this port is being used elsewhere,otherwise you'll get an error
port = 3351

# Bind to the port
s.bind((host,port))
print("Listening on port: %s" % str(port))
# Okay so Now it will appear as if your code just hangs here when you run it
# Your code will not move to the next line until you establish a connection
# with the client in the next step,meaning your jupyter notebook needs to 
# connect to the same address and post to kick off the listening. 

# Wait and Establish the connection with client.
s.listen(5)

# This sends us back a tuple with the data and the address where it came from
client_socket,addr = s.accept()
# Let's print it so we can confirm that when we are at the command line
print("Received request from: " + str(addr)," connection created.")

############################################################
###### Now this next part is ALL Twitter Stuff ##########
############################################################
# This is logic from tweepy directly to check on the status of the stream
class MyStreamListener(tweepy.StreamListener):
    def on_status(self,status):
        print(status.text)

# Twitter developer Credentials to connect to twitter account
access_token = "get_your_own"
access_secret = "get_your_own"
consumer_key = "get_your_own" # API key
consumer_secret = "get_your_own" # API secret key

# Create your listen object
myStreamListener = MyStreamListener()

# authentication
auth = OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_secret)
# Create your stream object
twitter_stream = tweepy.Stream(auth = auth,listener=myStreamListener)
# You'll want to filter your results down otherwise you'll get EVERYTHING from twitter
# twitter_stream.filter(track=['corona'])
# in case you want to pass multiple criteria
twitter_stream.filter(track=['DataScience','Iot'])

# Now we can try to accept data! 
# This part will change a bit depending on your data source,# But this shows you the essentials of what you'll need to do
while True:
    try:

        # read the Twitter data which comes as a JSON format
        msg = json.loads(twitter_stream)

        # the 'text' in the JSON file contains the actual tweet.
        # We will encode this with utf-8 which will clean out any emojis and stuff that may cause errors for us
        # We can come back and change this later on if we need to
        print(msg['text'].encode('utf-8'))

        # the actual tweet data is sent to the client socket
        client_socket.send(msg['text'].encode('utf-8'))
    
    except BaseException as e:
        # Error handling
        print("Ahh! Look what is wrong : %s" % str(e))
        print(status.text)
        client_socket.close()
        break

Jupyter 笔记本中的 PySpark 代码

# Import your dependecies
import pyspark # run after findspark.init() if you need it
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col,split

# Start up your pyspark session as always
spark = SparkSession.builder.appName("TwitterStream").getorCreate()
spark

# read the tweet data from socket
tweet_df = spark \
    .readStream \
    .format("socket") \
    .option("host","127.0.0.1") \
    .option("port",3351) \
    .load()

# type cast the column value
tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")

# Then split words based on space,filter out only hashtag (#) values and group them up.
tweets_tab = tweet_df_string.withColumn('word',explode(split(col('value'),' '))) \
    .groupBy('word') \
    .count() \
    .sort('count',ascending=False). \
    filter(col('word').contains('#'))

writeTweet = tweets_tab \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("tweetquery") \
    .start()
print("----- streaming is running -------")

# This is the part where nothing shows up
spark.sql("select * from tweetquery").show()

选择 * 的输出

+----+-----+
|word|count|
+----+-----+
+----+-----+

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。