微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

kafkashaded.org.apache.kafka.common.errors.TimeoutException:60000 毫秒后无法更新元数据

如何解决kafkashaded.org.apache.kafka.common.errors.TimeoutException:60000 毫秒后无法更新元数据

我目前正在研究用例,我正在将 pyspark 数据帧写入 confluent-kafka 主题

def write_data(rows):
rows.selectExpr("to_json(struct(*)) AS value") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers","xxx.aws.confluent.cloud:9092") \
.option("topic","test_topic") \
.save()
dataframe.foreachPartition(write_data)

以下是我遇到的错误

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
      File "<command-2315>",line 32,in write_data
    AttributeError: 'itertools.chain' object has no attribute 'selectExpr'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:514)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:650)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:633)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:468)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)

主题启用的身份验证是`SASL PLAIN。我想知道,我将数据帧写入 confluent-kafka 主题方法是否正确?还是我还需要添加其他配置。

我是新来的火花。任何帮助将不胜感激。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。