如何解决相同作业的Spark和Spark流输出差异
我正在为我的项目使用Spark和Spark流进行一些POC。因此,我要做的就是从Topic中读取文件名。从“ src / main / sresource”下载文件并执行通常的“ WordCount”频率应用程序。
@KafkaListener(topics = Constants.ABCWordTopic,groupId = Constants.ABC_WORD_COMSUMER_GROUP_ID)
public void processtask(@Payload String fileResourcePath) {
log.info("ABC Receiving task from WordProducer filepath {} at time {}",fileResourcePath,LocalDateTime.Now());
// Spark job
/*
* JavaRDD wordRDD =
* sparkContext.parallelize(Arrays.asList(extractFile(fileResourcePath).split(" ")));
* log.info("ABC Map Contents : {}",wordRDD.countByValue().toString());
* wordRDD.coalesce(1,* true).saveAsTextFile("ResultSparklog_"+ System.currentTimeMillis());
*/
// Spark Streaming job
JavaPairDStream wordPairstream = streamingContext
.textFileStream(extractFile(fileResourcePath))
.flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
.mapToPair(s -> new Tuple2(s,1)).reduceByKey((i1,i2) -> i1 + i2);
wordPairstream.foreachRDD(wordRDD -> {
// javaFunctions(wordTempRDD).writerBuilder("vocabulary","words",mapToRow(String.class))
// .savetoCassandra();
log.info("ABC Map Contents : {}",wordRDD.keys().countByValue().toString());
wordRDD.coalesce(1,true)
.saveAsTextFile("SparkStreamResultlog_" + System.currentTimeMillis());
});
streamingContext.start();
try {
streamingContext.awaitTerminationorTimeout(-1);
} catch (InterruptedException e) {
log.error("Terminated streaming context {}",e);
}
}
- 在上面的代码中,我正在从Kafka Topic(“ ABCtopic”)和
处理它。 “ Spark job”注释的代码工作得很好。
它会计算单词并给出预期的结果,但是“
流作业”代码的行为不符合预期,并且输出null。
- 第
log.info("ABC Map Contents : {}",wordRDD.keys().countByValue().toString());
行给出'{}'作为输出。 写入文件为空。成为Spark流媒体新手 鲜为人知的“火花流”是一个额外的库 从任何来源(例如 文件,主题等。
- 上面的代码中缺少用于火花流输出的内容 在突出显示的日志行以及输出数据文件中为“ null” 正在写入磁盘,而Spark作业执行相同的操作 工作很好。
解决方法
为可能停留在这一点上的其他人添加他的答案。乍一看似乎应该可以,但是在这里阅读Spark上的文档是结论。
“ streamingContext.textFileStream(..)” API不会从任何目录读取静态内容。因此,它无法从目录中读取文件或对其进行处理。它旨在读取流数据,因此必须在监视目录中添加或更新数据。因此,我在网上阅读的一个快速技巧是一旦程序开始执行(即StreamingContext.start已执行),便将文件移动或将文件更新到Windows目录(使用Windows 10的Iam)中。
请注意,即使尝试了所有这些技巧,我也无法执行它,但是鉴于这可能不是流媒体的正确用例(通过Spark作业可以轻松实现从文件夹读取和处理,这就是我代码演示),我必须保留它。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。