微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

双重重新分区时的Spark shuffle

如何解决双重重新分区时的Spark shuffle

我正在尝试在 Spark 中加入一些数据集,并且我尝试在没有 shuffle 的情况下做到这一点。

不幸的是,在运行一些测试后,我看到了一些奇怪的东西。

假设我在 S3 中有数据集 A 作为明文(Json 字符串)。

首先,我读取数据集 A 并按特定字段对其进行重新分区。这是一个虚拟代码作为示例:

val rddA = sc.textFile(s3InputPath)
             .map(x => objectMapper.readValue(x,classOf[APojo]))
             .map(x => (x.getId(),x)) // "id" field is type of String
             .partitionBy(new HashPartitioner(10))
             .map(x => objectMapper.writeValueAsstring(x._2))

rddA.saveAsTextFile(s3OutputPath)

然后我尝试读取之前的数据集并运行完全相同的重新分区:

val rddAClone = sc.textFile(s3OutputPath)
                  .map(x => objectMapper.readValue(x,classOf[APojo]))
                  .map(x => (x.getId(),x))
                  .partitionBy(new HashPartitioner(10))
                  .map(x => objectMapper.writeValueAsstring(x._2))

rddAClone.collect //any operation to force the spark to process the RDD

我的期望是,因为我之前通过“id”重新分区了我的数据集,当我再次尝试重新分区时,操作将在没有任何shuffle的情况下完成。不幸的是,我仍然在 Spark 的 Application Monitor 中看到了一些 shuffle(

你们中有人遇到过同样的问题吗?这种行为是有意为之还是我的假设/期望有问题?

谢谢!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。