如何解决Spark UDF重载
我有一个必须将Spark UDF重载的要求,我知道Spark不支持UDF重载。因此,为了克服这种火花限制,我尝试创建一个可以接受任何类型的UDF,并在UDF内找到实际的数据类型,并调用各自的方法进行计算并相应地返回值。这样做的时候我得到了一个错误
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Any is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:789)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:720)
at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:213)
at com.experian.spark_jobs.Test$.main(Test.scala:9)
at com.experian.spark_jobs.Test.main(Test.scala)
下面是示例代码:
import org.apache.spark.sql.SparkSession
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
spark.udf.register("testudf",testudf _)
spark.sql("create temporary view testView as select testudf(1,2) as a").show()
spark.sql("select testudf(a,5) from testView").show()
}
def testudf(a: Any,b: Any) = {
if (a.isInstanceOf[Integer] && b.isInstanceOf[Integer]) {
add(a.asInstanceOf[Integer],b.asInstanceOf[Integer])
} else if (a.isInstanceOf[java.math.BigDecimal] && b.isInstanceOf[java.math.BigDecimal]) {
add(a.asInstanceOf[java.math.BigDecimal],b.asInstanceOf[java.math.BigDecimal])
}
}
def add(decimal: java.math.BigDecimal,decimal1: java.math.BigDecimal): java.math.BigDecimal = {
decimal.add(decimal1)
}
def add(integer: Integer,integer1: Integer): Integer = {
integer + integer1
}
}
是否可以使上述要求成为可能?如果没有,请给我建议一种更好的方法。
注意: Spark版本-2.4.0
解决方法
使用Dataframe(untyped)的问题在于,在编译时执行某种多态性之类的工作非常痛苦。理想情况下,拥有列类型将允许使用特定的“添加函数”实现来构建udfs,就像使用Monoids一样。但是Spark Dataframe API与这个世界相距甚远。使用数据集或使用Frameless会有很大帮助。
在您的示例中,要在运行时检查类型,您将需要AnyRef而不是Any。应该可以。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。