如何解决Java spark使用reduceByKey aviod嵌套列表将对象连接成一个列表
我有一个带有 (key,value) 对的 java spark rdd (String,ArrayList)。 ArrayList 是一个 Object 数组(可以是 int、double、string 或 array 或任何其他类型)。
例如输入如下:
key1,[R1_Entry_1,R1_Entry_2,...,R1_Entry_n]
key1,[R2_Entry_1,R2_Entry_2,R2_Entry_n]
key1,[R3_Entry_1,R3_Entry_2,R3_Entry_n]
key2,[R4_Entry_1,R4_Entry_2,R4_Entry_n]
...
keyJ,[RK_Entry_1,RK_Entry_2,RK_Entry_n]
...
keyX,[RM_Entry_1,RM_Entry_2,RM_Entry_n]
那么 RI_entry_J
(行 I 和条目 J)是一个 java.lang.Object,可以是 int、double、string、ArrayList 或任何其他类型。
我使用了 JavaRDD.reduceByKey()
来减少新列表中的条目。我想要的输出应该是(假设 key1
只有三行对应的行(前三行:R1、R2、R3)。)
key1,[[R1_Entry_1,R2_Entry_1,R3_Entry_1],[R1_Entry_2,R3_Entry_2],[R1_Entry_n,R2_Entry_n,R3_Entry_n]]
...
keyJ,[[RK_Entry_1,R*_Entry_1,...],[RK_Entry_2,R*_Entry_2,[RK_Entry_n,R*_Entry_n,...]]
...
我的代码如下:
JavaPairRDD<String,ArrayList> AdjJavaRDD = JavaRDD.reduceByKey(new Function2<ArrayList,ArrayList,ArrayList>() {
@Override
public ArrayList call(ArrayList v1,ArrayList v2) throws Exception {
int v1Len = v1.size();
int v2Len = v2.size();
if (v1Len != v2Len) {
System.out.println(" \n The input size is incorrect. Please check! \n ");
System.exit(0);
}
List<Object> obj = new ArrayList<Object>(v1Len);
for (int i =0; i < v1Len; i++)
{
List<Object> obj_i = new ArrayList<>();
Object v1i = v1.get(i);
Object v2i = v2.get(i);
obj_i.add(v1i);
obj_i.add(v2i);
obj.add(i,obj_i);
}
return new ArrayList(obj);
}
});
我得到的结果如下:
key1,[[[R1_Entry_1,R2_Entry_1],[[R1_Entry_2,R2_Entry_2],[[R1_Entry_n,R2_Entry_n],R3_Entry_n]]
即条目嵌套在列表中,而不是不嵌套地写入列表。
例如,如果 Entry_1
是整数,并且 KeyJ
有五个对应的行,其中五个 Entry_1
为 1,2,3,4,5
。 KeyJ
使用我的代码的结果是
KeyJ,[[[[[1,2],3],4],5],...]
(在实际计算中,整数的顺序是随机的)。
然而,我想要的是
KeyJ,[[1,...]
有什么想法可以在 Java 的 Spark 代码中避免这种嵌套列表吗?
解决方法
我有解决办法。上面代码不起作用的原因是 Object v1i = v1.get(i);
将返回一个特定的对象,例如v1i
作为 String
(或任何其他数据类型)首次调用时。但是,在之后调用时,v1i
变成了 ArrayList
,这就是问题中的代码返回嵌套列表的原因。
更新后的解决方案如下:
JavaPairRDD<Tuple2,ArrayList> JavaRDDColumns = adjJavaRDD.reduceByKey(new Function2<ArrayList,ArrayList,ArrayList>() {
@Override
public ArrayList call(ArrayList v1,ArrayList v2) throws Exception {
int v1Len = v1.size();
int v2Len = v2.size();
Object[] objArr = new Object[v1Len];
for (int i = 0; i < v1Len; i++) {
ArrayList<Object> obj_i = new ArrayList<Object>();
Object obj1i = v1.get(i);
Object obj2i = v2.get(i);
List<Object> obj1Arr = new ArrayList<>();
List<Object> obj2Arr = new ArrayList<>();
if (obj1i instanceof List<?>) {
obj1Arr = (List) obj1i;
} else {
obj1Arr = Arrays.asList(obj1i); // not a Arraylist,convert to a Arraylist
}
if (obj2i instanceof List<?>) {
obj2Arr = (List) obj2i;
} else {
obj2Arr = Arrays.asList(obj2i);
}
List<Object> combineList = ListUtils.union(obj1Arr,obj2Arr);
objArr[i] = combineList;
}
ArrayList<Object> obj = new ArrayList<>(Arrays.asList(objArr));
return obj;
}
});
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。