微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 Java 中将 JavaRDD<Tuple2<Object, long[]>> 转换为 Spark Dataset<Row> 评论更新:

如何解决在 Java 中将 JavaRDD<Tuple2<Object, long[]>> 转换为 Spark Dataset<Row> 评论更新:

在 Java(不是 Scala!)Spark 3.0.1 中有一个 JavaRDD 实例对象 neighborIdsRDD,它的类型是 JavaRDD<Tuple2<Object,long[]>>

我与 JavaRDD 生成相关的部分代码如下:

GraphOps<String,String> graphOps = new GraphOps<>(graph,stringTag,stringTag);
JavaRDD<Tuple2<Object,long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();

我不得不使用 toJavaRDD() 获取 JavaRDD,因为 collectNeighborIds 返回一个 org.apache.spark.graphx.VertexRDD<long[]> 对象 (VertexRDD doc)。

但是,在我的应用程序的其余部分中,我需要从 Dataset<Row> 对象构建一个 Spark collectNeighborIds

JavaRDD<Tuple2<Object,long[]>> 转换为 Dataset<Row> 的可能性和最佳方法有哪些?


评论更新:

我根据评论调整了代码

        GraphOps<String,stringTag);
        JavaRDD<Tuple2<Object,long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
        System.out.println("VertexRDD neighborIdsRDD is:");
        for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
            System.out.println(
                    ((Tuple2<Object,long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
                            Arrays.toString(((Tuple2<Object,long[]>) neighborIdsRDD.collect().get(i))._2())
            );
        }

        Dataset<Row> dr = spark_session.createDataFrame(neighborIdsRDD.rdd(),Tuple2.class);
        System.out.println("converted Dataset<Row> is:");
        dr.show();

但我得到一个空的数据集如下:

VertexRDD neighborIdsRDD is:
4 -- [3]
1 -- [2,3]
5 -- [3,2]
2 -- [1,3,5]
3 -- [1,2,5,4]
converted Dataset<Row> is:
++
||
++
||
||
||
||
||
++

解决方法

我遇到了和你一样的情况,但幸运的是我找到了一个解决方案来取回 Dataframe

解决方案代码在步骤 [1][2][3] 中进行了注释。

GraphOps<String,String> graphOps = new GraphOps<>(graph,stringTag,stringTag);
System.out.println("VertexRDD neighborIdsRDD is:");
JavaRDD<Tuple2<Object,long[]>> neighborIdsRDD = graphOps.collectNeighborIds(EdgeDirection.Either()).toJavaRDD();
for (int i = 0; i < neighborIdsRDD.collect().size(); i++) {
    System.out.println(
            ((Tuple2<Object,long[]>) neighborIdsRDD.collect().get(i))._1() + " -- " +
                    Arrays.toString(((Tuple2<Object,long[]>) neighborIdsRDD.collect().get(i))._2())
    );
}

// [1] Define encoding schema
StructType graphStruct =  new StructType(new StructField[]{
        new StructField("father",DataTypes.LongType,false,Metadata.empty()),new StructField("children",DataTypes.createArrayType(DataTypes.LongType),});

// [2] Build a JavaRDD<Row> from a JavaRDD<Tuple2<Object,long[]>>
JavaRDD<Row> dr = neighborIdsRDD.map(tupla -> RowFactory.create(tupla._1(),tupla._2()));
        
// [3] Finally build the reqired Dataframe<Row>
Dataset<Row> dsr = spark_session.createDataFrame(dr.rdd(),graphStruct);

System.out.println("DATASET IS:");
dsr.show();

打印输出:

VertexRDD neighborIdsRDD is:
4 -- [3]
1 -- [2,3]
5 -- [3,2]
2 -- [1,3,5]
3 -- [1,2,5,4]
DATASET IS:
+------+------------+
|father|    children|
+------+------------+
|     4|         [3]|
|     1|      [2,3]|
|     5|      [3,2]|
|     2|   [1,5]|
|     3|[1,4]|
+------+------------+

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。