sortWithinPartitions 如何排序?

如何解决sortWithinPartitions 如何排序?

在将 sortWithinPartitions 应用于 df 并将输出写入表后,我得到了一个我不知道如何解释的结果。

df
.select($"type",$"id",$"time")
.sortWithinPartitions($"type",$"time")

结果文件看起来有点像

1 a 5
2 b 1
1 a 6
2 b 2
1 a 7
2 b 3
1 a 8
2 b 4

它实际上不是随机的,但也不是像我期望的那样排序。即,首先按类型,然后是 id,然后是时间。 如果我在排序之前尝试使用重新分区,那么我会得到我想要的结果。但由于某种原因,文件的重量增加了 5 倍(100gb 对 20gb)。

我正在写入一个压缩设置为 snappy 的 hive orc 表。

有谁知道为什么它是这样排序的,为什么重新分区的顺序正确,但尺寸更大?

使用 spark 2.2。

解决方法

sortWithinPartition 的文档说明

返回一个新的数据集,每个分区按给定的表达式排序

考虑这个函数的最简单方法是想象第四列(分区 id)用作主要排序标准。函数 spark_partition_id() 打印分区。

例如,如果您只有一个大分区(您作为 Spark 用户永远不会这样做!),sortWithinPartition 可以正常排序:

df.repartition(1)
  .sortWithinPartitions("type","id","time")
  .withColumn("partition",spark_partition_id())
  .show();

印刷品

+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
|   1|  a|   5|        0|
|   1|  a|   6|        0|
|   1|  a|   7|        0|
|   1|  a|   8|        0|
|   2|  b|   1|        0|
|   2|  b|   2|        0|
|   2|  b|   3|        0|
|   2|  b|   4|        0|
+----+---+----+---------+

如果有更多的分区,结果只在每个分区内排序:

df.repartition(4)
  .sortWithinPartitions("type",spark_partition_id())
  .show();

印刷品

+----+---+----+---------+
|type| id|time|partition|
+----+---+----+---------+
|   2|  b|   1|        0|
|   2|  b|   3|        0|
|   1|  a|   5|        1|
|   1|  a|   6|        1|
|   1|  a|   8|        2|
|   2|  b|   2|        2|
|   1|  a|   7|        3|
|   2|  b|   4|        3|
+----+---+----+---------+

为什么要使用 sortWithPartition 而不是 sortsortWithPartition 不会触发 shuffle,因为数据仅在执行程序内移动。 sort 但是会触发随机播放。因此 sortWithPartition 执行得更快。如果数据按有意义的列进行分区,则在每个分区内排序可能就足够了。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?