微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Exchange 分区的 Spark 物理计划 false/true

如何解决Exchange 分区的 Spark 物理计划 false/true

repartitionedDF.explain

显示此为物理计划

== Physical Plan ==
Exchange hashpartitioning(purchase_month#25,10),false,[id=#6]
+- LocalTableScan [item#23,price#24,purchase_month#25]

我注意到在某些情况下错误也可能为真。

这是什么意思?我曾经知道,但忘记了。

解决方法

经过一番挖掘,我相信它指的是 noUserSpecifiedNumPartition 变量。如果您进行重新分区,则此布尔变量将为 false,因为您指定了分区数。否则为 true。尝试做一个简单的 orderBy,我认为你应该得到 true

我发现了这一点

println(df.repartition('series).orderBy('series).queryExecution.executedPlan.prettyJson)

灵感来自this answer。它给出了输出(仅截断到相关部分):

{
  "class" : "org.apache.spark.sql.execution.exchange.ShuffleExchangeExec","num-children" : 1,"outputPartitioning" : [ {
    "class" : "org.apache.spark.sql.catalyst.plans.physical.RangePartitioning","ordering" : [ 0 ],"numPartitions" : 200
  },{
    "class" : "org.apache.spark.sql.catalyst.expressions.SortOrder","child" : 0,"direction" : {
      "object" : "org.apache.spark.sql.catalyst.expressions.Ascending$"
    },"nullOrdering" : {
      "object" : "org.apache.spark.sql.catalyst.expressions.NullsFirst$"
    },"sameOrderExpressions" : {
      "object" : "scala.collection.immutable.Set$EmptySet$"
    }
  },{
    "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference","num-children" : 0,"name" : "series","dataType" : "string","nullable" : true,"metadata" : { },"exprId" : {
      "product-class" : "org.apache.spark.sql.catalyst.expressions.ExprId","id" : 16,"jvmId" : "35ee1aa5-f899-4fca-a8a6-a06c3eaabe5c"
    },"qualifier" : [ ]
  } ],"noUserSpecifiedNumPartition" : true
},{
  "class" : "org.apache.spark.sql.execution.exchange.ShuffleExchangeExec","outputPartitioning" : [ {
    "class" : "org.apache.spark.sql.catalyst.plans.physical.HashPartitioning","expressions" : [ 0 ],"noUserSpecifiedNumPartition" : false
}

其中 truefalse 与物理计划很好地对应:

df.repartition('series).orderBy('series).explain
== Physical Plan ==
*(1) Sort [series#16 ASC NULLS FIRST],true,0
+- Exchange rangepartitioning(series#16 ASC NULLS FIRST,200),[id=#192]
   +- Exchange hashpartitioning(series#16,false,[id=#190]
      +- FileScan csv [series#16,timestamp#17,value#18] Batched: false,DataFilters: [],Format: CSV,Location: InMemoryFileIndex[file:/tmp/df.csv],PartitionFilters: [],PushedFilters: [],ReadSchema: struct<series:string,timestamp:string,value:string>

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。