微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用Scala中的Spark Structured Streaming从kafka流式传输到kafka

如何解决使用Scala中的Spark Structured Streaming从kafka流式传输到kafka

我正在尝试运行来自官方 spark 教程和一本书“spark Streaming in action”中示例的简单变体。
异常的内容很奇怪。我的代码有什么问题?

首先我启动kafka zookeeper,服务器,生产者和2个消费者。然后我运行以下代码

// read from kafka
val df = sparkService.sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","localhost:9092")
    .option("subscribe",topic1)
    .load()

// write to kafka
import sparkService.sparkSession.implicits._

val query = df.selectExpr("CAST(key as STRING)","CAST(value as STRING)")
    .writeStream
    .outputMode(OutputMode.Append())
    .format("kafka")
    .option("kafka.bootstrap.servers","localhost:9092")
    .option("topic",topic2)
    .option("checkpointLocation","/home/pt/Dokumenty/tmp/")
    .option("failOnDataLoss","false") // only when testing
    .start()

query.awaitTermination(30000)

写入 kafka 时发生错误

线程“main” org.apache.spark.sql.streaming.StreamingQueryException 中的异常:预期,例如{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}},得到 1 1609627750463

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。