微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Java中带有Spark文件流的检查点

如何解决Java中带有Spark文件流的检查点

我想在Spark文件流应用程序中实现检查点,以在任何情况下我的Spark流媒体应用程序停止/终止时处理hadoop中所有未处理的文件。我正在关注以下内容streaming programming guide,但未找到JavaStreamingContextFactory。请帮我该怎么办。

我的代码

public class StartAppWithCheckPoint {

    public static void main(String[] args) {
        
        try {
            
            String filePath = "hdfs://Master:9000/mmi_traffic/listenerTransaction/2020/*/*/*/"; 
            String checkpointDirectory = "hdfs://Mongo1:9000/probeAnalysis/checkpoint";
            SparkSession sparkSession = JavaSparkSessionSingleton.getInstance();

            JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
                  @Override public JavaStreamingContext create() {
                      
                    SparkConf sparkConf = new SparkConf().setAppName("ProbeAnalysis");
                    JavaSparkContext sc = new JavaSparkContext(sparkConf);  
                    JavaStreamingContext jssc = new JavaStreamingContext(sc,Durations.seconds(300));
                    JavaDStream<String> lines = jssc.textFileStream(filePath).cache();
                    
                    jssc.checkpoint(checkpointDirectory);
                    return jssc;
                  }
                };
                
            JavaStreamingContext context = JavaStreamingContext.getorCreate(checkpointDirectory,contextFactory);
            
            context.start();
            context.awaitTermination();
            context.close();
            sparkSession.close();
            
        } catch(Exception e) {
            e.printstacktrace();
        }   
    }
}

解决方法

您必须使用Checkpointing

对于检查点,请使用updateStateByKeyreduceByKeyAndWindow stateful 转换。 spark-examples中提供了很多示例,以及git-hub中的预构建spark和spark源。有关您的具体信息,请参见JavaStatefulNetworkWordCount.java;

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。