如何解决使用Scala中的Spark Structured Streaming从kafka流式传输到kafka
我正在尝试运行来自官方 spark 教程和一本书“spark Streaming in action”中示例的简单变体。
异常的内容很奇怪。我的代码有什么问题?
首先我启动kafka zookeeper,服务器,生产者和2个消费者。然后我运行以下代码:
// read from kafka
val df = sparkService.sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe",topic1)
.load()
// write to kafka
import sparkService.sparkSession.implicits._
val query = df.selectExpr("CAST(key as STRING)","CAST(value as STRING)")
.writeStream
.outputMode(OutputMode.Append())
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("topic",topic2)
.option("checkpointLocation","/home/pt/Dokumenty/tmp/")
.option("failOnDataLoss","false") // only when testing
.start()
query.awaitTermination(30000)
写入 kafka 时发生错误:
线程“main” org.apache.spark.sql.streaming.StreamingQueryException 中的异常:预期,例如{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}},得到 1 1609627750463
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。