如何解决默认情况下,Spark Dataframe 是如何分区的? 默认并行度
我知道使用 HashPartitioner 根据键值对 RDD 进行分区。但是 Spark Dataframe 默认是如何分区的,因为它没有 key/value 的概念。
解决方法
Dataframe 的分区取决于运行以创建它的任务数量。
没有应用“默认”分区逻辑。以下是如何设置分区的一些示例:
- 通过
val df = Seq(1 to 500000: _*).toDF()
创建的 Dataframe 将只有一个分区。 - 通过
val df = spark.range(0,100).toDF()
创建的 Dataframe 具有与可用内核数量一样多的分区(例如,当您的主服务器设置为local[4]
时为 4)。另外,请参阅下面关于“默认并行性”的备注,该操作对没有父 RDD 的parallelize
等操作生效。 - 从 RDD (
spark.createDataFrame(rdd,schema)
) 派生的 Dataframe 将具有与底层 RDD 相同数量的分区。就我而言,由于我在本地有 6 个内核,因此创建的 RDD 有 6 个分区。 - 从 Kafka 主题消费的数据帧将具有与主题的分区匹配的分区数量,因为它可以使用与主题有分区一样多的内核/插槽来消费主题。
- 通过读取文件创建的数据框,例如除非必须根据默认为 128MB 的
spark.sql.files.maxPartitionBytes
将单个文件拆分为多个分区,否则来自 HDFS 的分区数量将与文件匹配。 - 从需要 shuffle 的转换派生的 Dataframe 将具有由
spark.sql.shuffle.partitions
设置的可配置分区数量(默认为 200)。 - ...
RDD 和 Structured API 之间的主要区别之一是,您对分区的控制不如 RDD 多,您甚至可以定义自定义分区器。这对于 Dataframes 是不可能的。
默认并行度
Execution Behavior 配置 spark.default.parallelism
的文档说明:
对于没有父级 RDD 的并行化操作,这取决于集群管理器:
本地模式:本地机器上的内核数
Mesos 细粒度模式:8
其他:所有执行器节点上的内核总数或 2 个,以较大者为准
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。