如何解决如何批量将csv文件加载到DStream中?
我对DStreams主题完全陌生-DStreams的基本抽象,用于批量接收数据流。我想做的是,我有一个包含1000多个记录的文本文件。我需要将文本文件发送到DStreams进行流处理。为此,我用python编写了一个创建DStream的代码,然后以10秒的间隔(批处理)将文本文件的路径传递到DStream中。批处理中没有任何数据。这是代码,
spark = SparkSession.builder.master("local[*]").appName("PysparkStreaming").getorCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc,10) #Batch duration 10 seconds
type(ssc)
lines = ssc.textFileStream('/home/Downloads/Dataset/data.txt') #create DStream
type(lines)
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x,1))\
.reduceByKey(lambda a,b: a+b)
type(counts)
counts.pprint()
ssc.start()
ssc.awaitTermination()
我得到的输出是
-------------------------------------------
Time: 2020-10-02 13:57:40
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:57:50
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:00
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:10
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:20
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:30
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:40
-------------------------------------------
-------------------------------------------
Time: 2020-10-02 13:58:50
-------------------------------------------
-------------------------------------------
我需要的是,在时间间隔内,必须显示文本文件中的数据。但是,批次之间没有任何显示。拜托,我需要您的帮助才能解决此问题...任何人告诉我要批量获取数据我该怎么做。我需要python中的代码。谢谢你。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。