微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在flink映射中动态解析json

如何解决在flink映射中动态解析json

我正在使用flink动态分析json类型的数据,通过keyby并与给定的列求和,在我的mapFunction中,我将json转换为case类,但结果流没有在keyBy函数中得到编译器,得到错误 Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key.。我的代码是这样

//conf.properties
columns=a:String,b:Int,c:String,d:Long
declusteringColumns=a,c
statsColumns=b
//main function
stream.map(new MapFunc)
      .keyBy(declusteringColumns(0),declusteringColumns.drop(0).toSeq: _*)
      .sum(statsColumns)
class MapFunc extends RichMapFunction[String,Any]{
var clazz:Class[_]=_
override def open(parameters: Configuration): Unit = {
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(universe.getClass.getClassLoader).mkToolBox() 
clazz = tb.compile(tb.parse(
"""|case class Test(a:String,d:Long){}
   |scala.reflect.classtag[Test].runtimeClass"""
.stripMargin)).apply.asInstanceOf[Class[_]] 
}

override def map(value: String) {
val tmp = JSON.parSEObject(value)
val values = Utils.loadProperties("columns").split(",").map(y => {
val name = y.substring(0,y.indexOf(":"))
val tpe = y.substring(y.indexOf(":") + 1)
tpe.toLowerCase match {
case "string" => tmp.getString(name)
case "int" => tmp.getInteger(name)
case "long" => tmp.getLong(name)
case _ => null}}).toSeq
clazz.getConstructors()(0).newInstance(values: _*) 
}}

如何将json转换为case类或元组

解决方法

实际上,似乎是例外情况

org.apache.flink.api.common.InvalidProgramException: 
This type (GenericType<Test>) cannot be used as key 

即使对于普通情况下的类,也仍然存在(不是通过反射生成的)

case class Test(a: String,b: Int,c: String,d: Long)

第一个问题是该案例类不是POJO

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos

POJO

Flink将Java和Scala类视为特殊的POJO数据 输入是否满足以下要求:

  • 该类必须是公共的。

  • 它必须具有不带参数的公共构造函数(默认构造函数)。

  • 所有字段都是公共字段,或者必须可以通过getter和setter函数访问。对于名为foo的字段,使用getter和setter 方法必须命名为getFoo()和setFoo()。

  • 注册的序列化程序必须支持字段的类型。

所以您应该更换

case class Test(a: String,d: Long)

使用

import scala.beans.BeanProperty

case class Test(
                 @BeanProperty var a: String,@BeanProperty var b: Int,@BeanProperty var c: String,@BeanProperty var d: Long) {
  def this() = {
    this(null,null,0)
  }
}

第二个问题可能是Flink不允许不是静态内部类而是反射工具箱生成嵌套在方法中的本地类的内部类POJO

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-for-pojo-types

POJO类型规则

Flink将数据类型识别为POJO类型(并允许“按名称” 字段引用)是否满足以下条件:

  • 该类是公共的和独立的(没有非静态内部类)
  • 该类具有公共的无参数构造函数
  • 该类(和所有超类)中的所有非静态,非瞬态字段都是公共的(并且是非最终的)或具有公共的 遵循Java bean命名的getter-和setter-方法 getter和setter的约定。

这是工具箱生成的代码的反编译版本

public final class __wrapper$1$a077cb72a4ee423291aac7dfb47454b9$ {

   public Object wrapper() {
      new LazyRef();

      class Test$1 implements Product,Serializable {
         private String a;
         private int b;
         private String c;
         private long d;

         ...
      }

      return scala.reflect.package..MODULE$.classTag(scala.reflect.ClassTag..MODULE$.apply(Test$1.class)).runtimeClass();
   }

   ...
}

完整的反编译代码:

https://gist.github.com/DmytroMitin/f1554ad833ea1bb9eb97947ae872d220

因此,如果确实有必要为Flink生成类,则应该手动生成而不是通过工具箱生成

https://www.reddit.com/r/scala/comments/gfcmul/compile_scala_source_from_string_and/

https://www.reddit.com/r/scala/comments/jckld2/is_there_a_way_to_load_scala_code_at_runtime/

How to eval code that uses InterfaceStability annotation (that fails with "illegal cyclic reference involving class InterfaceStability")?

How do I programmatically compile and instantiate a Java class?

Dynamic compilation of multiple Scala classes at runtime

Tensorflow in Scala reflection

但是带有手动生成的类的代码

https://gist.github.com/DmytroMitin/e33cd244b37f9b33b67f7ac3e6609d39

仍然抛出This type (GenericType<java.lang.Object>) cannot be used as key

我认为其原因如下(这是第三期)。

具有普通案例类(未生成)的代码似乎可以正常工作

https://gist.github.com/DmytroMitin/af426d4578dd5e76c9e0d344e6f079ce

但是如果我们将类型Test替换为Any,则会抛出This type (GenericType<java.lang.Object>) cannot be used as key

https://gist.github.com/DmytroMitin/a23e45a546790630e838e60c7206adcd

经过反思,我们只能返回Any.


现在我正在生成的代码中创建TypeInformation[Test],这似乎可以解决This type (GenericType<java.lang.Object>) cannot be used as key,但是现在我已经

org.apache.flink.api.common.InvalidProgramException: UTF-8 is not serializable. 
The object probably contains or references non serializable fields.

https://gist.github.com/DmytroMitin/16d312dbafeae54518f7ac2c490426b0


我用InvalidProgramException: UTF-8 is not serializableMapFunc注释@transient的字段来解决了这个问题

https://gist.github.com/DmytroMitin/f2f859273075370c4687a30e0c3a2431


实际上,如果我们在生成的代码中创建TypeInformation,那么工具箱就足够了

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.reflect.runtime
import scala.reflect.runtime.universe._
import scala.tools.reflect.ToolBox

object App {
  val toolbox = ToolBox(runtime.currentMirror).mkToolBox()

  class MapFunc extends RichMapFunction[String,Any] {
    var typeInfo: TypeInformation[_] = _
    @transient var classSymbol: ClassSymbol = _

    override def open(parameters: Configuration): Unit = {
      val code =
        """|case class Test(
           |                 @scala.beans.BeanProperty var a: String,|                 @scala.beans.BeanProperty var b: Int,|                 @scala.beans.BeanProperty var c: String,|                 @scala.beans.BeanProperty var d: Long) {
           |  def this() = {
           |    this(null,0)
           |  }
           |}""".stripMargin

      val tree = toolbox.parse(code)
      classSymbol = toolbox.define(tree.asInstanceOf[ImplDef]).asClass
      typeInfo = toolbox.eval(
        q"org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[${classSymbol.toType}])"
      ).asInstanceOf[TypeInformation[_]]
    }

    override def map(value: String): Any = {
      val values = Seq("aaa",1,"ccc",2L) //hardcoded for now
      createClassInstance(classSymbol,values: _*)
    }
  }


  def main(args: Array[String]): Unit = {
    val func = new MapFunc
    func.open(new Configuration)
    val classInstance = func.map("""{a: "aaa",b: 1,c: "ccc",d: 2}""")
    println(classInstance) //Test(aaa,ccc,2)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("localhost",9999)
    val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]
    println(typeInfo)//PojoType<__wrapper$1$75434c8e32f541f7a87513a2ad2aa0ce.Test,fields = [a: String,b: Integer,d: Long]>
    val res = stream.map(func)(typeInfo).keyBy("a","c").sum("b")
    println(res)//org.apache.flink.streaming.api.scala.DataStream@5927f904
  }

  def createClassInstance(classSymbol: ClassSymbol,args: Any*): Any = {
    val runtimeMirror = toolbox.mirror
    val classType = classSymbol.typeSignature
    val constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethod
    val classMirror = runtimeMirror.reflectClass(classSymbol)
    val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
    constructorMirror(args: _*)
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。