微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 Python Core API (Apache Spark) 加入三个 RDD?

如何解决如何使用 Python Core API (Apache Spark) 加入三个 RDD?

我正在尝试使用 Python Core API(通过 Apache Spark)将这些 RDD 连接在一起;但是,我没有运气尝试完成此操作。

目前,我有这三个具有共同属性的 RDD:

  • users_rdd:user_id
  • reviews_rdd:review_id、company_id 和 user_id
  • company_rdd:company_id

现在,当将两个 RDD 连接在一起时,它可以正常工作,没有任何问题:

user_rev_rdd = (users_rdd
  .keyBy(lambda user: user['user_id'])
  .join(
      reviews_rdd.keyBy(lambda rev: rev['user_id'])
  )
)

虽然,为了将所有三个连接在一起,我已经尝试过这个,但由于某种原因它对我根本不起作用:

user_rev_com_rdd = (users_rdd
  .keyBy(lambda user: user['user_id'])
  .join(
      reviews_rdd.keyBy(lambda rev: rev['user_id'])
  )
 .join(
      companies_rdd.keyBy(lambda com: com['company_id'])
  )
)

关于如何将我的三个 RDD 连接在一起的任何帮助都会非常有帮助,因为我不确定如何正确地做这样的事情。谢谢。

解决方法

第一次加入后,键是user_id,但是你加入到companies_rdd,键是company_id,所以加入键不正确。您需要将密钥更改为 company_id,例如

user_rev_com_rdd = (users_rdd
    .keyBy(lambda user: user['user_id'])
    .join(
        reviews_rdd.keyBy(lambda rev: rev['user_id'])
    )
    .map(lambda r: (r[1][1]['company_id'],r[1]))
    .join(
        companies_rdd.keyBy(lambda com: com['company_id'])
    )
)

要合并三个RDD中的元素并在join后删除join键,可以在末尾添加一个map

user_rev_com_rdd = (users_rdd
    .keyBy(lambda user: user['user_id'])
    .join(
        reviews_rdd.keyBy(lambda rev: rev['user_id'])
    )
    .map(lambda r: (r[1][1]['company_id'],r[1]))
    .join(
        companies_rdd.keyBy(lambda com: com['company_id'])
    )
    .map(lambda r: (*r[1][0],r[1][1]))
)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。