微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark 左外连接和 RDD 上的重复键

如何解决Spark 左外连接和 RDD 上的重复键

我有两个(键,值)的 RDD。我的第二个 RDD 比我的第一个 RDD 短。我想将我的第一个 RDD 的每个值与第二个 RDD 中的对应值相关联,关于键。

val (rdd1: RDD[(key,A)])
val (rdd2: RDD[(key,B)])
val (rdd3: RDD[R])

with rdd1.count() >> rdd2.count(),并且rdd1的多个元素具有相同的key。

现在,我知道当在 rdd2 中找不到相应的键时,我想对 b 使用 常量 值。我认为 leftOuterJoin 将是在这里使用的自然方法

val rdd3 = rdd1.leftOuterJoin(rdd2).map{
case (key,(a,None)) => R(a,c)
case (key,Some(b)) => R(a,b)
}

这里有什么可能让你觉得错的吗?加入这样的元素时,我得到了意想不到的结果。

解决方法

不完全确定您的问题是什么,但这里是:

方法一

val rdd1 = sc.parallelize(Array((1,100),(2,200),(3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))

object Example {
  
  val c = -999
  def myFunc = {
    val enclosedC = c
    val rdd3 = rdd1.leftOuterJoin(rdd2)
    val rdd4 = rdd3.map ( x => x match {
          case (x._1,(x._2._1,None)) => (x._1,(Some(x._2._1),Some(enclosedC)))   
          case _                       => (x._1,x._2._2  ))
        }).sortByKey()
    //rdd4.foreach(println)
  }
}
Example.myFunc

方法 2

val rdd1 = sc.parallelize(Array((1,100)))

object Example {
  
  val c = -999
  def myFunc = {
    val enclosedC = c
    val rdd3 = rdd1.leftOuterJoin(rdd2)
    val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1,Some(enclosedC)) )) else (  (x._1,x._2._2)) ) }).sortByKey() 
    //rdd4.foreach(println)
  }
}
Example.myFunc

方法 3

val rdd1 = sc.parallelize(Array((1,100)))

object Example extends Serializable {
  
val c = -999

val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1,Some(c)) )) else (  (x._1,x._2._2)) ) }).sortByKey() 

//rdd4.collect
//rdd4.foreach(println)
}
Example

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。