如何解决Spark Streaming抛出Socket数据流没有更多数据,并且在初始化时无法连接到所需的端口
我经历了很多类似的事情,其中大多数批准的答案是端口必须打开才能启动连接,因此spark无法连接。我已确保端口侦听文件在我的Spark流式传输文件流之前运行。有人可以帮忙吗!!
端口打开文件
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
import socket
import json
consumer_key=''
consumer_secret=''
access_token=''
access_secret=''
class TweetsListener(StreamListener):
def __init__(self,csocket):
self.client_socket = csocket
def on_data(self,data):
try:
msg=json.loads(data)
print(msg['text'].encode('utf-8'))
self.client_socket.send(msg['text'].encode('utf-8'))
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def on_error(self,status):
print(status)
return True
def sendData(self,c_socket):
auth=OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_secret)
twitter_stream = Stream(auth,TweetsListener(c_socket))
twitter_stream.filter(language=['en'])
s = socket.socket()
host = "127.0.0.1"
port = 7777
s.bind((host,port))
print("Listening on port: %s" % str(port))
s.listen(5)
c,addr=s.accept()
print("Received request from: " +str(addr))
def sending():
TweetsListener.sendData(c)
火花流文件
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc=SparkContext(appName="StreamingTwitteranalysis")
sc.setLogLevel("ERROR")
ssc=StreamingContext(sc,10)
socket_stream = ssc.socketTextStream("127.0.0.1",7777)
lines = socket_stream.window( 60 )
hashtags = lines.flatMap(lambda text: text.split ( " " ))
sorted_dstream=hashtags.transform(lambda foo:foo.sortBy(lambda x:x[0].lower())
sorted_dstream.pprint()
ssc.start()
ssc.awaitTermination()
现在,我遇到的错误很奇怪
据我所知,密钥是正确的,第一个错误很奇怪,因为它说Twitter API中没有数据。第二个似乎是一个简单的端口未打开错误,但是当我运行第一个文件时请相信我,它说在端口号上监听。 7777。
任何人都可以帮忙吗?
我依次运行了Pycharm中的两个文件。后来我在pycharm中运行了第一个文件,然后在Windows中通过命令提示符运行了第二个文件。...
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。