微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Spark、带有自定义接收器的 Java 流挂起

如何解决Apache Spark、带有自定义接收器的 Java 流挂起

我已经使用 java 流 API 编写了一个自定义接收器。 它的目的是读取加密文件,对其进行解密,然后推送行以进行进一步详细说明。 问题是,即使文件被正确读取、解密并推送到内存存储,我也无法开始流式传输。 使流生成输出的唯一方法是优雅地停止火花流上下文。

这是我的 java 自定义接收器的一部分,我将解密的行放入商店:

BufferedReader br = new BufferedReader(new InputStreamReader(zsIn,StandardCharsets.UTF_8));
for (int i = 0; !this.isstopped() && (decompressedLine = br.readLine()) != null; ++i) {
                this.store(decompressedLine);
}

这里是我访问接收到的流并尝试执行一些操作的地方:

SparkConf sparkConf = new SparkConf().setAppName("MyJavaStreaming");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.minutes(1));
JavaReceiverInputDStream<String> customreceiverStream = ssc.receiverStream(new SparkDecodingReceiver(args[0],args[1]));
customreceiverStream.foreachRDD((rdd,time) -> {
    SparkSession singletonSpark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
    JavaRDD<InputFileFlatStructure> rowRDD = rdd.map(line -> {
        InputFileFlatStructure inputRecord = new InputFileFlatStructure();
        if (!line.isEmpty()) {
            inputRecord.setValue(line);
        }
        return inputRecord;
    });
    
    Dataset<Row> wordsDataFrame = singletonSpark.createDataFrame(rowRDD,InputFileFlatStructure.class);
    wordsDataFrame.coalesce(1).write().format("csv").option("sep","|").save("/nfs/output");

});

ssc.start();
ssc.awaitTermination();

使用此代码,进程会挂起并且永远不会刷新文件系统上接收到的流。 除了自定义接收器和火花存储的使用之外,代码与此示例非常相似: https://github.com/apache/spark/blob/v3.1.1/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java 我的代码中是否缺少某些内容?如何启动流处理并将数据集写入文件系统? 感谢任何帮助,谢谢。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。