如何解决双重重新分区时的Spark shuffle
我正在尝试在 Spark 中加入一些数据集,并且我尝试在没有 shuffle 的情况下做到这一点。
不幸的是,在运行一些测试后,我看到了一些奇怪的东西。
假设我在 S3 中有数据集 A 作为明文(Json 字符串)。
首先,我读取数据集 A 并按特定字段对其进行重新分区。这是一个虚拟代码作为示例:
val rddA = sc.textFile(s3InputPath)
.map(x => objectMapper.readValue(x,classOf[APojo]))
.map(x => (x.getId(),x)) // "id" field is type of String
.partitionBy(new HashPartitioner(10))
.map(x => objectMapper.writeValueAsstring(x._2))
rddA.saveAsTextFile(s3OutputPath)
然后我尝试读取之前的数据集并运行完全相同的重新分区:
val rddAClone = sc.textFile(s3OutputPath)
.map(x => objectMapper.readValue(x,classOf[APojo]))
.map(x => (x.getId(),x))
.partitionBy(new HashPartitioner(10))
.map(x => objectMapper.writeValueAsstring(x._2))
rddAClone.collect //any operation to force the spark to process the RDD
我的期望是,因为我之前通过“id”重新分区了我的数据集,当我再次尝试重新分区时,操作将在没有任何shuffle的情况下完成。不幸的是,我仍然在 Spark 的 Application Monitor 中看到了一些 shuffle(
你们中有人遇到过同样的问题吗?这种行为是有意为之还是我的假设/期望有问题?
谢谢!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。