微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在Python的Spark中使用具有10秒批处理间隔的套接字文本流从静态文件中读取数据?

如何解决如何在Python的Spark中使用具有10秒批处理间隔的套接字文本流从静态文件中读取数据?

我的本​​地驱动器(Windows)中有一些带有约10K记录的静态文件(log_file)。结构如下。

"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"
"2012-10-01","00:30:13",35165,"2.15.1","i686","linux-gnu","quadprog","1.5-4","AU",1

我想使用批处理间隔为10秒的套接字文本流读取此日志记录,之后我必须使用RDD或DF计算执行少量的火花操作。我读了下面的代码,只是为了按时间间隔读取数据,以RDD的形式拆分并显示

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext

conf = SparkConf().setMaster("local[*]").setAppName("Assignment4")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc,10)

data = ssc.socketTextStream("file:///SparkL2/log_file.txt",2222)
                            
linesrdd = data.map(lambda x: x.split(","))

linesrdd.pprint()
ssc.start()
ssc.awaitTermination()

我保存了这段代码,并从Anaconda命令提示符下进行了火花提交。我在socketTextStream函数中遇到错误,可能是因为我没有正确使用它。

(base) PS C:\Users\HP> cd c:\SparkL2
(base) PS C:\SparkL2> spark-submit Assignment5.py
20/09/09 21:42:42 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.net.UnkNownHostException: file:///SparkL2/log_file.txt
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:196)
        at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
        at java.net.socksSocketImpl.connect(SocksSocketImpl.java:394)
        at java.net.socket.connect(Socket.java:606)
        at java.net.socket.connect(Socket.java:555)
        at java.net.socket.<init>(Socket.java:451)
        at java.net.socket.<init>(Socket.java:228)
        at org.apache.spark.streaming.dstream.socketReceiver.onStart(SocketInputDStream.scala:61)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.$anonfun$startReceiver$1(ReceiverTracker.scala:596)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.$anonfun$startReceiver$1$adapted(ReceiverTracker.scala:586)
        at org.apache.spark.SparkContext.$anonfun$submitJob$1(SparkContext.scala:2242)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

有人可以帮助我吗?我是pyspark的新手,想自己学习。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。