微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在 spark scala 中加入 2 rdd

如何解决如何在 spark scala 中加入 2 rdd

我有 2 个 RDD 如下

val rdd1 = spark.sparkContext.parallelize(Seq((123,List(("000000011119",20),("000000011120",30),("000000011121",50))),(234,50)))))
val rdd2 = spark.sparkContext.parallelize(Seq((123,List("000000011119","000000011120")),List("000000011121","000000011120"))))

我想根据 rdd2 中的密钥对执行 rdd1 中的值相加。

需要输出

RDD[(123,50),80)]

任何帮助将不胜感激。

解决方法

实际上这是行的第一个元素和每个内容的第一个元素的连接。

所以我会把它分解成多行并以这种方式加入

val flat1 = rdd1.flatMap(r => r._2.map(e => ((r._1,e._1),e._2))) // looks like ((234,000000011119),20)
val flat2 = rdd2.flatMap(r => r._2.map(e => ((r._1,e),true))) // looks like ((234,000000011121),true)

val res =  flat1.join(flat2)
  .map(r => (r._1._1,r._2._1))  // looks like (123,30)
  .reduceByKey(_ + _)  // total each key group

带有 .foreach(println) 的结果

scala> :pas
// Entering paste mode (ctrl-D to finish)

flat1.join(flat2)
  .map(r => (r._1._1,30)
  .reduceByKey(_ + _)  // total each key group
  .foreach(println)

// Exiting paste mode,now interpreting.

(123,50)
(234,80)

像往常一样,使用 Dataset 会更简单,所以这将是我对未来的建议。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。