微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

相同作业的Spark和Spark流输出差异

如何解决相同作业的Spark和Spark流输出差异

我正在为我的项目使用Spark和Spark流进行一些POC。因此,我要做的就是从Topic中读取文件名。从“ src / main / sresource”下载文件并执行通常的“ WordCount”频率应用程序。

 
@KafkaListener(topics = Constants.ABCWordTopic,groupId = Constants.ABC_WORD_COMSUMER_GROUP_ID) 
public void processtask(@Payload String fileResourcePath) {
        log.info("ABC Receiving task from WordProducer filepath {} at time {}",fileResourcePath,LocalDateTime.Now());
        // Spark job
        /*
         * JavaRDD wordRDD =
         * sparkContext.parallelize(Arrays.asList(extractFile(fileResourcePath).split(" ")));
         * log.info("ABC Map Contents : {}",wordRDD.countByValue().toString());
         * wordRDD.coalesce(1,* true).saveAsTextFile("ResultSparklog_"+ System.currentTimeMillis());
         */
        // Spark Streaming job
        JavaPairDStream wordPairstream = streamingContext
                .textFileStream(extractFile(fileResourcePath))
                .flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
                .mapToPair(s -> new Tuple2(s,1)).reduceByKey((i1,i2) -> i1 + i2);
        wordPairstream.foreachRDD(wordRDD -> {
        //  javaFunctions(wordTempRDD).writerBuilder("vocabulary","words",mapToRow(String.class))
        //                  .savetoCassandra();
            log.info("ABC Map Contents : {}",wordRDD.keys().countByValue().toString());
            wordRDD.coalesce(1,true)
                    .saveAsTextFile("SparkStreamResultlog_" + System.currentTimeMillis());
        });
        streamingContext.start();
        try {
            streamingContext.awaitTerminationorTimeout(-1);
        } catch (InterruptedException e) {
            log.error("Terminated streaming context {}",e);
        }
    }
  • 在上面的代码中,我正在从Kafka Topic(“ ABCtopic”)和 处理它。 “ Spark job”注释的代码工作得很好。 它会计算单词并给出预期的结果,但是“ 流作业”代码的行为不符合预期,并且输出null。
  • log.info("ABC Map Contents : {}",wordRDD.keys().countByValue().toString());行给出'{}'作为输出。 写入文件为空。成为Spark流媒体新手 鲜为人知的“火花流”是一个额外的库 从任何来源(例如 文件主题等。
  • 上面的代码中缺少用于火花流输出内容 在突出显示的日志行以及输出数据文件中为“ null” 正在写入磁盘,而Spark作业执行相同的操作 工作很好。

解决方法

为可能停留在这一点上的其他人添加他的答案。乍一看似乎应该可以,但是在这里阅读Spark上的文档是结论。
“ streamingContext.textFileStream(..)” API不会从任何目录读取静态内容。因此,它无法从目录中读取文件或对其进行处理。它旨在读取流数据,因此必须在监视目录中添加或更新数据。因此,我在网上阅读的一个快速技巧是一旦程序开始执行(即StreamingContext.start已执行),便将文件移动或将文件更新到Windows目录(使用Windows 10的Iam)中。
请注意,即使尝试了所有这些技巧,我也无法执行它,但是鉴于这可能不是流媒体的正确用例(通过Spark作业可以轻松实现从文件夹读取和处理,这就是我代码演示),我必须保留它。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。