如何解决Spark Streaming 无法消费来自 Kafka 特定偏移量的数据
该主题有超过 100000 条消息。当我运行我的程序时,spark Streaming 只在第一个 RDD 中有数据,后面的 RDD 什么都没有,这让我很困惑。
另外,我发现即使我手动更改偏移量,流式传输也总是在第一个 RDD 打印相同的消息。
感谢任何建议或帮助!!!
代码如下:
object Main
{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("XXX")
.setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(3))
ssc.sparkContext.setLogLevel("ERROR")
val topics = List("rd_cyg")
val offSet = RedisUtil.getoffsetInfo(topics(0))
val kafkaParams: Map[String,Object] = Map(
"bootstrap.servers" -> "XXX","group.id" -> "XXX","key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer","value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer","serializer.class" -> "kafka.serializer.StringEncoder","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream(
ssc,locationStrategy = LocationStrategies.PreferConsistent,consumerStrategy = ConsumerStrategies.Subscribe[String,String](topics,kafkaParams,offSet)
)
stream.map(_.value())
.print(2)
ssc.start()
ssc.awaitTermination()
ssc.stop(true,true)
}
}
具体偏移位置为
Map(XXX-0 -> 223,XXX-2 -> 224,XXX-1 -> 223)
控制台输出是
-------------------------------------------
Time: 1623136371000 ms
-------------------------------------------
first-message
second-message
...
-------------------------------------------
Time: 1623136374000 ms
-------------------------------------------
-------------------------------------------
Time: 1623136377000 ms
-------------------------------------------
部分敏感信息被XXX替换。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。