如何解决Spark 左外连接和 RDD 上的重复键
我有两个(键,值)的 RDD。我的第二个 RDD 比我的第一个 RDD 短。我想将我的第一个 RDD 的每个值与第二个 RDD 中的对应值相关联,关于键。
val (rdd1: RDD[(key,A)])
val (rdd2: RDD[(key,B)])
val (rdd3: RDD[R])
with rdd1.count() >> rdd2.count(),并且rdd1的多个元素具有相同的key。
现在,我知道当在 rdd2 中找不到相应的键时,我想对 b 使用 常量 值。我认为 leftOuterJoin 将是在这里使用的自然方法:
val rdd3 = rdd1.leftOuterJoin(rdd2).map{
case (key,(a,None)) => R(a,c)
case (key,Some(b)) => R(a,b)
}
这里有什么可能让你觉得错的吗?加入这样的元素时,我得到了意想不到的结果。
解决方法
不完全确定您的问题是什么,但这里是:
方法一
val rdd1 = sc.parallelize(Array((1,100),(2,200),(3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))
object Example {
val c = -999
def myFunc = {
val enclosedC = c
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map ( x => x match {
case (x._1,(x._2._1,None)) => (x._1,(Some(x._2._1),Some(enclosedC)))
case _ => (x._1,x._2._2 ))
}).sortByKey()
//rdd4.foreach(println)
}
}
Example.myFunc
方法 2
val rdd1 = sc.parallelize(Array((1,100)))
object Example {
val c = -999
def myFunc = {
val enclosedC = c
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1,Some(enclosedC)) )) else ( (x._1,x._2._2)) ) }).sortByKey()
//rdd4.foreach(println)
}
}
Example.myFunc
方法 3
val rdd1 = sc.parallelize(Array((1,100)))
object Example extends Serializable {
val c = -999
val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1,Some(c)) )) else ( (x._1,x._2._2)) ) }).sortByKey()
//rdd4.collect
//rdd4.foreach(println)
}
Example
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。