微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

火花给出要求失败:文字必须具有与字符串对应的值,但找到了类字符串

如何解决火花给出要求失败:文字必须具有与字符串对应的值,但找到了类字符串

我有一个 spark 2.4.6,数据帧写为

df
  .select((struct(df.columns.map(column): _*)).alias("value"))
  .write
  .format("kafka")
  .options(........)
  .save

但是在 3.0.2 版本中制作了一个 jar 并进行了 spark submit 之后,它给出了:

Exception in thread "main" java.lang.IllegalArgumentException: requirement Failed: Literal must have a corresponding value to string,but class String found.
    at scala.Predef$.require(Predef.scala:281)
    at org.apache.spark.sql.catalyst.expressions.Literal$.validateLiteralValue(literals.scala:215)
    at org.apache.spark.sql.catalyst.expressions.Literal.<init>(literals.scala:292)
    at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$validateQuery$2(KafkaWriter.scala:55)
    at scala.Option.getorElse(Option.scala:189)
    at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:50)
    at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:86)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:255)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runcommand$1(DataFrameWriter.scala:962)
    at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withNewExecutionId$5(sqlExecution.scala:100)
    at org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:160)
    at org.apache.spark.sql.execution.sqlExecution$.$anonfun$withNewExecutionId$1(sqlExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runcommand(DataFrameWriter.scala:962)
    at org.apache.spark.sql.DataFrameWriter.savetoV1Source(DataFrameWriter.scala:414)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
    at part4integrations.IntegratingKafka$.nimbleWrite(IntegratingKafka.scala:145)
    at part4integrations.IntegratingKafka$.main(IntegratingKafka.scala:154)
    at part4integrations.IntegratingKafka.main(IntegratingKafka.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

关于检查功能
我们在 3.0.2 中有这个验证功能 https://github.com/apache/spark/blob/648457905c4ea7d00e3d88048c63f360045f0714/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L292

但不是在以前的版本 https://github.com/apache/spark/blob/v2.4.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala

升级spark版本有什么解决办法吗?
因为使用 3.0.2 版重建 jar 工作正常。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。