微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java – Apache Spark Lambda表达式 – 序列化问题

我试图在spark任务中使用lambda表达式,并抛出“ java.lang.IllegalArgumentException:无效的lambda反序列化”异常.当代码如“transform(pRDD-> pRDD.map(t-> t._2))”时抛出此异常.代码片段如下.
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>,JavaRDD<Integer>>)pRDD-> pRDD.map( 
(Function<Tuple2<String,Integer>)t->t._2));


JavaPairDStream<String,JavaRDD<Integer>> & Serializable)pRDD-> pRDD.map( 
(Function<Tuple2<String,Integer> & Serializable)t->t._2));

以上两个选项没有奏效.好像我将对象“f”作为参数传递而不是lambda表达式“t-> t_.2”.有用.

Function f = new Function<Tuple2<String,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
};

我可以知道将该函数表示为lambda表达式的正确格式是什么.

public static void main(String[] args) {

            Function f = new Function<Tuple2<String,Integer>(){

                @Override
                public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
                    return paramT1._2;
                }

            };

            JavaStreamingContext ssc = JavaStreamingFactory.getInstance();

            JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost",9999);
            JavaDStream<String> words =  lines.flatMap(s->{return Arrays.asList(s.split(" "));});
            JavaPairDStream<String,Integer> pairRDD =  words.mapToPair(x->new Tuple2<String,Integer>(x,1));
            JavaPairDStream<String,y)->x+y);
            JavaDStream<Integer> con = aggregate.transform(
                    (Function<JavaPairRDD<String,JavaRDD<Integer>>)pRDD-> pRDD.map( 
                            (Function<Tuple2<String,Integer>)t->t._2));
          //JavaDStream<Integer> con = aggregate.transform(pRDD-> pRDD.map(f)); It works
            con.print();

            ssc.start();
            ssc.awaitTermination();


        }

解决方法

我不知道为什么lambda不起作用.也许问题是lambda嵌套在lambda中.这似乎被Spark文档认可.

对比http://spark.apache.org/docs/latest/programming-guide.html#basics的例子:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a,b) -> a + b);

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation为例:

import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
final JavaPairRDD<String,Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String,Integer> cleanedDStream = wordCounts.transform(
  new Function<JavaPairRDD<String,JavaPairRDD<String,Integer>>() {
    @Override public JavaPairRDD<String,Integer> call(JavaPairRDD<String,Integer> rdd) throws Exception {
      rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
      ...
    }
  });

第二个示例使用Function子类而不是lambda,可能是因为您发现了同样的问题.

我不知道这对你是否有用,但嵌套的lambdas肯定适用于Scala.考虑前一个示例的Scala版本:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

原文地址:https://www.jb51.cc/java/128832.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐