微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 Spark 集群模式下在 DStream Lambda 闭包中使用变量时,Java Spark DStream 中的空指针异常

如何解决在 Spark 集群模式下在 DStream Lambda 闭包中使用变量时,Java Spark DStream 中的空指针异常

我已将一个广播数组列表定义为公共静态,并且此数组列表(数组列表的名称:“qList”)在作业处理程序方法中开始新作业时填充了新值,然后在 DStream lambda 闭包中使用此数组列表,但是当在火花集群上运行,作业失败并显示消息“空指针异常”:

Caused by: org.apache.spark.SparkException: Job aborted due to stage 失败:阶段 17.0 中的任务 1 失败 4 次,最近失败: 阶段 17.0 中丢失的任务 1.3(TID 40、192.168.1.97、执行者 0): java.lang.NullPointerException 在 QProcessing.lambda$3(QProcessing.java:345) ...

我的代码

@Override
    public void onBatchSubmitted(StreamingListenerBatchSubmitted arg0) {
        // Todo Auto-generated method stub

        QProcessing.qList.value().clear();
        for(int i = 0; i < 2; i++)
        try {
            QProcessing.qList.value().add(i,QProcessing.bufferedReader.readLine());
        } catch (IOException e) {
            // Todo Auto-generated catch block
            e.printstacktrace();
        }
    }
...

private static JavaPairDStream<Long,List<String>> distributeSerach(
            JavaPairDStream<Long,BPlusTree<Integer,String>> inputRDD,int role,int accessControlType,boolean topkAttach,int i) {
        return inputRDD.mapToPair(index -> {
            List<String> searchResult = null;
            Instant startdistributedBPTSearch = Instant.Now();
            searchResult = index._2.searchRange(Integer.parseInt(QProcessing.qList.value()[i].split(",")[0]),BPlusTree.RangePolicy.INCLUSIVE,Integer.parseInt(QProcessing.qList.value()[i].split(",")[1]),role,accessControlType,topkAttach);
            Instant enddistributedBPTSearch = Instant.Now();
            Duration timeElapseddistributedBPTSearch = Duration.between(startdistributedBPTSearch,enddistributedBPTSearch);
            Tuple2<Long,List<String>> tuple = new Tuple2<Long,List<String>>(
                    timeElapseddistributedBPTSearch.toMillis(),searchResult);
            return tuple;
        });
    }

解决方法

使用spark执行指令的地方存在差异。 RDD 的定义(仅它们的实例化,而不是它们的使用)是在驱动程序中进行的,而对 RDD 的修改和操作是在执行程序(您的 lambda)中进行的。

这些部分中的每一个都运行在具有不同 JVM 的不同机器上。如果修改一个静态属性,它只会改变本地JVM,因此在驱动程序属性上添加元素是不够的。

我认为最好的解决方案不是使用 lambda,而是拥有一个对象并添加广播变量。类似的东西。

  // You must define what are your A,B and C types
  public class MapToPairFunction extends PairFunction<A,B,C> {
      private Broadcast<List<String>> broadcast;

      public void setBroadcast(Broadcast<List<String>> broadcast) {
        this.broadcast = broadcast;
      }

      @Override
      public Tuple2<B,C> call(final A parameter) {
        // Here the code in the lambda
      }

  }

  private static JavaPairDStream<Long,List<String>> DistributeSerach(
    JavaPairDStream<Long,BPlusTree<Integer,String>> inputRDD,int role,int accessControlType,boolean topkAttach,int i) {
    PairFunction<A,C> pairFunction = new MapToPairFunction();
    pairFunction.setBroadcast(QProcessing.qList);
    return inputRDD.mapToPair(pairFunction);
  }

很久没接触过spark了,希望对你有帮助

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。