微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

从动态生成的案例类加载数据集

如何解决从动态生成的案例类加载数据集

所需条件:

数据库中表的数量正在快速变化,因此我不想编辑案例类,因此我通过SCALA代码动态生成它们并将其放入包中。但是现在无法动态读取它。如果可以,那么我将解析“ com.example.datasources.fileSystemSource.schema。{}”作为循环中的对象架构成员

已经完成的事情:

我有一些案例类是根据数据库表的架构动态生成的,如下所示:

object schema{
case class Users(name: String,favorite_color: String,favorite_numbers: Array[Int])

case class UserData(registration_dttm: Timestamp,id: Int,first_name: String,last_name: String,email: String,gender: String,ip_address: String,cc: String,country: String,birthdate: String,salary: Double,title: String,comments: String)
}

然后我将它们用作动态类型,以读取Loader.scala中的Load [T]函数,如下所示:

import org.apache.spark.sql.{Dataset,Encoder,SparkSession}

class Load[T <: Product: Encoder](val tableName: String,val inputPath: String,val spark: SparkSession,val saveMode: String,val outputPath: String,val Metadata: Boolean)
    extends Loader[T] {

  val fileSystemSourceInstance: FileSystem[T] =
    new FileSystem[T](inputPath,spark,saveMode,tableName)

  override def Load: Dataset[T] =
    fileSystemSourceInstance.provideData(Metadata,outputPath).as[T]

}

现在,通过使用reflect.api,我可以为案例类获取TypeTag。

def stringToTypeTag[A](name: String): TypeTag[A] = {
    val c = Class.forName(name)
    val mirror = runtimeMirror(c.getClassLoader)
    val sym = mirror.staticclass(name)
    val tpe = sym.selfType
    TypeTag(mirror,new api.TypeCreator {
      def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =

        if (m eq mirror) tpe.asInstanceOf[U # Type]
        else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
    })
  }

因此,如果我现在打印我的案例类类型标签,则会得到:

val typetagDynamic = stringToTypeTag("com.example.datasources.fileSystemSource.schema.Users")
println(typetags)
TypeTag[com.example.datasources.fileSystemSource.schema.Users]

问题:

需要阅读这些TypeTag或动态生成的案例类,以对我的数据集进行如下编码:

new Load[typetagDynamic](tableName,inputPath,outputPath + tableName,Metadata)(Encoders.product[typetagDynamic]).Load 

这给了我错误无法解析符号typetagDynamic

如果这样使用:

new Load[typetagDynamic.type](tableName,Metadata)(Encoders.product[typetagDynamic.type]).Load 

这给我一个错误类型参数[T]不符合方法产品的类型参数范围[T

解决方法

如果仅在运行时知道类型schema.Users,请尝试替换

new Load[schema.Users](tableName,inputPath,spark,saveMode,outputPath + tableName,metadata).Load

使用

import scala.reflect.runtime
import scala.reflect.runtime.universe._

val currentMirror = runtime.currentMirror

val loadType = typeOf[Load[_]]
val classSymbol = loadType.typeSymbol.asClass
val classMirror = currentMirror.reflectClass(classSymbol)
val constructorSymbol = loadType.decl(termNames.CONSTRUCTOR).asMethod
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
  
import scala.tools.reflect.ToolBox
val toolbox = ToolBox(currentMirror).mkToolBox()
val encoderType = appliedType(
  typeOf[Encoder[_]].typeConstructor.typeSymbol,currentMirror.staticClass("com.example.datasources.fileSystemSource.schema.Users").toType
)
val encoderTree = toolbox.inferImplicitValue(encoderType,silent = false)
val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))

constructorMirror(tableName,metadata,encoderInstance).asInstanceOf[Load[_]].Load

scala.tools.reflect.ToolBoxError:隐式搜索失败

您需要:

  1. 在其伴随对象中为org.apache.spark.sql.Encoder定义类型为Users的实例(这样该实例将处于隐式范围内)

    object Users {
      implicit val usersEnc: Encoder[Users] = spark.implicits.newProductEncoder[Users]
    }
    

  1. 可以通过Encoder为案例类导入import spark.implicits._的实例,但是您需要将它们不是导入到当前本地范围,而是导入到工具箱生成的本地范围中,因此在这种情况下,您应该替换

    val encoderTree = toolbox.inferImplicitValue(encoderType,silent = false)
    val encoderInstance = toolbox.eval(toolbox.untypecheck(encoderTree))
    

    使用

    val className = "com.example.datasources.fileSystemSource.schema.Users"
    val classType = currentMirror.staticClass(className).toType
    val encoderInstance = toolbox.eval(
      q"""import path.to.spark.implicits._
          import org.apache.spark.sql.Encoder
          implicitly[Encoder[$classType]]""")
    

查看完整代码: https://gist.github.com/DmytroMitin/2cad52c27f5360ae9b1e7503d6f6cd00

https://groups.google.com/g/scala-internals/c/ta-vbUT6JE8

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。