如何解决orderby在Spark SQL中未给出正确的结果
我有大约60列和3000行的数据集。 我正在使用orderby对数据集中的行进行排序并写入文件 但是它并没有给出正确的结果。
dataset.orderBy(new Column(col_name).desc())
.coalesce(4)
.write()
.format("com.databricks.spark.csv")
.option("delimiter",",")
.option("header","false")
.mode(SaveMode.Overwrite)
.save("hdfs://" + filePath);
请让我知道我在这里想念的东西
Row[] rows = dataset.take(3000);
for ( Row row : rows){
// here i am writing in a file row by row
System.out.println(row);
}
解决方法
问题在于coalesce
将以未排序的方式合并您现有的分区(不会,coalesce
不会造成混乱)。
如果要4个文件并在文件内排序,则需要在spark.sql.suffle.partitions
之前更改orderBy
,这将导致随机播放具有4个分区。
spark.sql("set spark.sql.shuffle.partitions=4")
dataset.orderBy(new Column(col_name).desc())
.write()
.format("com.databricks.spark.csv")
.option("delimiter",",")
.option("header","false")
.mode(SaveMode.Overwrite)
.save("hdfs://" + filePath);
如果您只关心文件内的排序,则也可以使用sortWithinPartitions(new Column(col_name).desc())
因为您的.coalesce(4)
拖延了数据框顺序
先行然后排序。
dataset
.coalesce(4)
.orderBy(new Column(col_name).desc())
.write()
.format("com.databricks.spark.csv")
.option("delimiter",")
.option("header","false")
.mode(SaveMode.Overwrite)
.save("hdfs://" + filePath);
在火花上下文中,您还应该将spark.sql.suffle.partitions
设置为4,因为按顺序还可以使用先驱顺序。
根据您在注释中的澄清,您需要将ordered
输出包含在一个文件中。
只有火花,只有spark.sql("set spark.sql.shuffle.partitions=1")
后跟orderBy
并写入才有可能。但是缺点是won't scale
不能用于大数据,因为它不会并行化。
一种解决方法是:
- 通过最大程度的并行化工作(例如,不要
orderBy
或coalesce
)来"set spark.sql.shuffle.partitions=1"
,并拥有n
个文件。 - 在文件合并代码中添加一些额外的逻辑处理
- 列出所有文件,获取
col_name
的值并维护[(col_name value),filepath]
的映射 - 通过键(
col_name
的值)对地图进行排序 - 然后执行合并
这将保持您的订购。
想法是,合并部分将大部分为单线程,至少以分布式方式进行排序:)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。