Java spark使用reduceByKey aviod嵌套列表将对象连接成一个列表

如何解决Java spark使用reduceByKey aviod嵌套列表将对象连接成一个列表

我有一个带有 (key,value) 对的 java spark rdd (String,ArrayList)。 ArrayList 是一个 Object 数组(可以是 int、double、string 或 array 或任何其他类型)。

例如输入如下:

key1,[R1_Entry_1,R1_Entry_2,...,R1_Entry_n]
key1,[R2_Entry_1,R2_Entry_2,R2_Entry_n]
key1,[R3_Entry_1,R3_Entry_2,R3_Entry_n]
key2,[R4_Entry_1,R4_Entry_2,R4_Entry_n]
... 
keyJ,[RK_Entry_1,RK_Entry_2,RK_Entry_n]
... 
keyX,[RM_Entry_1,RM_Entry_2,RM_Entry_n]

那么 RI_entry_J(行 I 和条目 J)是一个 java.lang.Object,可以是 int、double、string、ArrayList 或任何其他类型。

我使用了 JavaRDD.reduceByKey() 来减少新列表中的条目。我想要的输出应该是(假设 key1 只有三行对应的行(前三行:R1、R2、R3)。)

key1,[[R1_Entry_1,R2_Entry_1,R3_Entry_1],[R1_Entry_2,R3_Entry_2],[R1_Entry_n,R2_Entry_n,R3_Entry_n]]
... 
keyJ,[[RK_Entry_1,R*_Entry_1,...],[RK_Entry_2,R*_Entry_2,[RK_Entry_n,R*_Entry_n,...]]
... 

我的代码如下:

JavaPairRDD<String,ArrayList> AdjJavaRDD =  JavaRDD.reduceByKey(new Function2<ArrayList,ArrayList,ArrayList>() {
@Override
public ArrayList call(ArrayList v1,ArrayList v2) throws Exception {

    int v1Len = v1.size();
    int v2Len = v2.size();
    if (v1Len != v2Len) {
        System.out.println(" \n The input size is incorrect. Please check! \n  ");
        System.exit(0);
    }

    List<Object> obj = new ArrayList<Object>(v1Len);
    for (int i =0; i < v1Len; i++)
    {
        List<Object> obj_i = new ArrayList<>();
        Object v1i = v1.get(i);
        Object v2i = v2.get(i);
        obj_i.add(v1i);
        obj_i.add(v2i);
        obj.add(i,obj_i);
    }
    return new ArrayList(obj);
}
});

我得到的结果如下: key1,[[[R1_Entry_1,R2_Entry_1],[[R1_Entry_2,R2_Entry_2],[[R1_Entry_n,R2_Entry_n],R3_Entry_n]]

即条目嵌套在列表中,而不是不嵌套地写入列表。

例如,如果 Entry_1 是整数,并且 KeyJ 有五个对应的行,其中五个 Entry_11,2,3,4,5KeyJ 使用我的代码的结果是 KeyJ,[[[[[1,2],3],4],5],...](在实际计算中,整数的顺序是随机的)。 然而,我想要的是 KeyJ,[[1,...]

有什么想法可以在 Java 的 Spark 代码中避免这种嵌套列表吗?

解决方法

我有解决办法。上面代码不起作用的原因是 Object v1i = v1.get(i); 将返回一个特定的对象,例如v1i 作为 String (或任何其他数据类型)首次调用时。但是,在之后调用时,v1i 变成了 ArrayList,这就是问题中的代码返回嵌套列表的原因。

更新后的解决方案如下:

  JavaPairRDD<Tuple2,ArrayList> JavaRDDColumns = adjJavaRDD.reduceByKey(new Function2<ArrayList,ArrayList,ArrayList>() {
            @Override
            public ArrayList call(ArrayList v1,ArrayList v2) throws Exception {
                int v1Len = v1.size();
                int v2Len = v2.size();

                Object[] objArr = new Object[v1Len];
                for (int i = 0; i < v1Len; i++) {
                    ArrayList<Object> obj_i = new ArrayList<Object>();
                    Object obj1i = v1.get(i);
                    Object obj2i = v2.get(i);
                    List<Object> obj1Arr = new ArrayList<>();
                    List<Object> obj2Arr = new ArrayList<>();
                    if (obj1i instanceof List<?>) {
                        obj1Arr = (List) obj1i;
                    } else {
                        obj1Arr = Arrays.asList(obj1i);  // not a Arraylist,convert to a Arraylist
                    }

                    if (obj2i instanceof List<?>) {
                        obj2Arr = (List) obj2i;
                    } else {
                        obj2Arr = Arrays.asList(obj2i);
                    }
                    List<Object> combineList = ListUtils.union(obj1Arr,obj2Arr);
                    objArr[i] = combineList;
                }
                ArrayList<Object> obj = new ArrayList<>(Arrays.asList(objArr));
                return obj;
            }
        });

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?