微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java-无法在Spark上序列化任务

我有这样的转变:

JavaRDD<Tuple2<String, Long>> mappedRdd = myRDD.values().map(
    new Function<Pageview, Tuple2<String, Long>>() {
      @Override
      public Tuple2<String, Long> call(Pageview pageview) throws Exception {
        String key = pageview.getUrl().toString();
        Long value = getDay(pageview.getTimestamp());
        return new Tuple2<>(key, value);
      }
    });

浏览量是一种类型:Pageview.java

然后将此类注册到Spark中,如下所示:

Class[] c = new Class[1];
c[0] = Pageview.class;
sparkConf.registerKryoClasses(c);

Exception in thread “main” org.apache.spark.SparkException: Task not
serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at
org.apache.spark.rdd.RDD.map(RDD.scala:286) at
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89)
at
org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46)
at
org.apache.gora.tutorial.log.ExampleSpark.run(ExampleSpark.java:100)
at
org.apache.gora.tutorial.log.ExampleSpark.main(ExampleSpark.java:53)
Caused by: java.io.NotSerializableException:
org.apache.gora.tutorial.log.ExampleSpark Serialization stack:
– object not serializable (class: org.apache.gora.tutorial.log.ExampleSpark, value:
org.apache.gora.tutorial.log.ExampleSpark@1a2b4497)
– field (class: org.apache.gora.tutorial.log.ExampleSpark$1, name: this$0, type: class org.apache.gora.tutorial.log.ExampleSpark)
– object (class org.apache.gora.tutorial.log.ExampleSpark$1, org.apache.gora.tutorial.log.ExampleSpark$1@4ab2775d)
– field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
name: fun$1, type: interface
org.apache.spark.api.java.function.Function)
– object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1,
) at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
… 7 more

当我调试代码时,即使有一个名为KryoSerializer的类,我也会看到JavaSerializer.scala被调用.

PS 1:我不想使用Java Serializer,但在Pageview上实现Serializer并不能解决问题.

PS 2:这不会解决问题:

...
//String key = pageview.getUrl().toString();
//Long value = getDay(pageview.getTimestamp());
String key = "Dummy";
Long value = 1L;
return new Tuple2<>(key, value);
...

解决方法:

我已经用Java代码多次遇到这个问题.尽管我使用的是Java序列化,但我会将包含该代码的类设置为Serializable,或者如果您不想这样做,则将Function设为该类的静态成员.

这是解决方案的代码片段.

public class Test {
   private static Function s = new Function<Pageview, Tuple2<String, Long>>() {

     @Override
     public Tuple2<String, Long> call(Pageview pageview) throws Exception {
       String key = pageview.getUrl().toString();
       Long value = getDay(pageview.getTimestamp());
       return new Tuple2<>(key, value);
      }
  };
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐