微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark Streaming 无法消费来自 Kafka 特定偏移量的数据

如何解决Spark Streaming 无法消费来自 Kafka 特定偏移量的数据

主题有超过 100000 条消息。当我运行我的程序时,spark Streaming 只在第一个 RDD 中有数据,后面的 RDD 什么都没有,这让我很困惑。

另外,我发现即使我手动更改偏移量,流式传输也总是在第一个 RDD 打印相同的消息。

感谢任何建议或帮助!!!

代码如下:

object Main
{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("XXX")
      .setMaster("local[2]")

    val ssc = new StreamingContext(conf,Seconds(3))
    ssc.sparkContext.setLogLevel("ERROR")


    val topics = List("rd_cyg")
    val offSet = RedisUtil.getoffsetInfo(topics(0))
    val kafkaParams: Map[String,Object] = Map(
      "bootstrap.servers" -> "XXX","group.id" -> "XXX","key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer","value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer","serializer.class" -> "kafka.serializer.StringEncoder","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val stream = KafkaUtils.createDirectStream(
      ssc,locationStrategy = LocationStrategies.PreferConsistent,consumerStrategy = ConsumerStrategies.Subscribe[String,String](topics,kafkaParams,offSet)
    )

    stream.map(_.value())
      .print(2)

    ssc.start()
    ssc.awaitTermination()
    ssc.stop(true,true)

  }

}

具体偏移位置为

Map(XXX-0 -> 223,XXX-2 -> 224,XXX-1 -> 223)

控制台输出

-------------------------------------------
Time: 1623136371000 ms
-------------------------------------------
first-message
second-message
...

-------------------------------------------
Time: 1623136374000 ms
-------------------------------------------

-------------------------------------------
Time: 1623136377000 ms
-------------------------------------------

部分敏感信息被XXX替换。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。