如何解决spark 3.0- spark聚合函数给出的表达式与预期不同
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell
20/09/23 10:58:45 WARN Utils: Your hostname,byte-nihal resolves to a loopback address: 127.0.1.1; using 192.168.2.103 instead (on interface enp2s0)
20/09/23 10:58:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/09/23 10:58:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR,use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.2.103:4040
Spark context available as 'sc' (master = local[*],app id = local-1600838949311).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.1
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM,Java 1.8.0_265)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> println(countDistinct("x"))
count(x)
scala> println(sumDistinct("x"))
sum(DISTINCT x)
scala> println(sum("x"))
sum(x)
scala> println(count("x"))
count(x)
问题:
- 用于sumDistinct表达式-> sum(DISTINCT x)
- 但对于countDistinct表达式-> count(x)
这是某种错误还是功能?
注意:countDistinct给出正确的表达式-> spark版本
解决方法
正如@Shaido在评论部分中提到的...我已经验证了几件事,指出最新版本的toString中的spark代码存在一些错误。 (这可能是我不确定的错误或功能)
火花代码版本
import org.apache.spark.sql.functions._
println(countDistinct("x")) ---> gives output as count(x)
如果我们特别检查countDistinct(“ x”)的源代码
def countDistinct(columnName: String,columnNames: String*): Column =
countDistinct(Column(columnName),columnNames.map(Column.apply) : _*)
def countDistinct(expr: Column,exprs: Column*): Column = {
withAggregateFunction(Count.apply((expr +: exprs).map(_.expr)),isDistinct = true)
}
在第二个重载方法中可以看到 Count.apply 聚合函数被使用,而 isDistinct = true 被视为不同的值
private def withAggregateFunction(
func: AggregateFunction,isDistinct: Boolean = false): Column = {
Column(func.toAggregateExpression(isDistinct))
}
如果您特别检查 withAggregateFunction 签名,它将返回Column类型,并且如果您检查Column的toString方法
def toPrettySQL(e: Expression): String = usePrettyExpression(e).sql
它在AggregateExpression上调用 .sql 方法
AggregateExpression按照下面的代码回叫aggregateFunction的sql方法override def sql: String = aggregateFunction.sql(isDistinct)
在我们的例子中, AggregateFuncion为计数。
def sql(isDistinct: Boolean): String = {
val distinct = if (isDistinct) "DISTINCT " else ""
s"$prettyName($distinct${children.map(_.sql).mkString(",")})"
}
按照上面的代码,它应该返回count(DISTINCT x)
现在,在Spark版本中,> = 3.X 我检查了源代码,toString的行为几乎没有什么不同。
@scala.annotation.varargs
def countDistinct(expr: Column,exprs: Column*): Column =
// For usage like countDistinct("*"),we should let analyzer expand star and
// resolve function.
Column(UnresolvedFunction("count",(expr +: exprs).map(_.expr),isDistinct = true))
现在它正在使用UnresolvedFunction而不是withAggregateFunction。
在UnresolvedFunction中,toString方法非常简单,如下所示
override def toString: String = s"'$name(${children.mkString(",")})"
它打印count(x)..这就是为什么要输出为count(x)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。