如何解决将 Scala 案例类转换为 PySpark 模式
给定一个像这样的简单 Scala 案例类:
package com.foo.storage.schema
case class Person(name: String,age: Int)
import org.apache.spark.sql._
import com.foo.storage.schema.Person
val schema = Encoders.product[Person].schema
我想知道是否可以从 Python/PySpark 中的案例类访问架构。我希望做这样的事情[Python]:
jvm = sc._jvm
py4j_class = jvm.com.foo.storage.schema.Person
jvm.org.apache.spark.sql.Encoders.product(py4j_class)
这会引发错误 com.foo.storage.schema.Person._get_object_id does not exist in the JVM
。 Encoders.product
是 Scala 中的泛型,我不完全确定如何使用 Py4J 指定类型。有没有办法使用 case 类来创建 PySpark 模式?
解决方法
我发现使用泛型没有干净/简单的方法来做到这一点,也不是作为纯 Scala 函数。我最终做的是为可以获取模式的案例类创建一个伴随对象。
解决方案
package com.foo.storage.schema
case class Person(name: String,age: Int)
object Person {
def getSchema = Encoders.product[Person].schema
}
这个函数可以从 Py4J 调用,但是会返回一个 JavaObject
。它可以用这样的辅助函数进行转换:
from pyspark.sql.types import StructType
import json
def java_schema_to_python(j_schema):
json_schema = json.loads(ddl.json())
return StructType.fromJson(json_schema)
最后,我们可以提取我们的架构:
j_schema = jvm.com.foo.storage.Person.getSchema()
java_schema_to_python(j_schema)
替代解决方案
我发现还有一种方法可以做到这一点,但我更喜欢第一种。您可以创建一个通用函数来推断 Scala 中参数的类型,并使用它来推断类型:
object SchemaConverter {
def getSchemaFromType[T <: Product: TypeTag](obj: T): StructType = {
Encoders.product[T].schema
}
}
可以这样调用:
val schema = SchemaConverter.getSchemaFromType(Person("Joe",42))
我不喜欢这种方法,因为它要求您创建案例类的虚拟实例。没有测试过,但我认为上面的函数也可以使用 Py4J 调用。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。