如何解决使用checkpointLocation偏移量从Kafka主题读取流的正确方法
我正在尝试开发一个小型Spark应用程序(使用Scala)以从Kafka(汇合)中读取消息,并将其(插入)写入Hive表中。除一项重要功能外,一切都按预期方式运行-重新启动(提交)应用程序时管理偏移。这让我感到困惑。
def main(args: Array[String]): Unit = {
val sparkSess = SparkSession
.builder
.appName("Kafka_to_Hive")
.config("spark.sql.warehouse.dir","/user/hive/warehouse/")
.config("hive.metastore.uris","thrift://localhost:9083")
.config("hive.exec.dynamic.partition","true")
.config("hive.exec.dynamic.partition.mode","nonstrict")
.enableHiveSupport()
.getorCreate()
sparkSess.sparkContext.setLogLevel("ERROR")
// don't consider this code block please,it's just a part of Confluent avro message deserializing adventures
sparkSess.udf.register("deserialize",(bytes: Array[Byte]) =>
DeserializerWrapper.deserializer.deserialize(bytes)
)
val kafkaDataFrame = sparkSess
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",'localhost:9092')
.option("group.id",'kafka-to-hive-1')
// ------> which Kafka options do I need to set here for starting from last right offset to ensure completenes of data and "exactly once" writing? <--------
.option("failOnDataLoss",(false: java.lang.Boolean))
.option("subscribe",'some_topic')
.load()
import org.apache.spark.sql.functions._
// don't consider this code block please,it's just a part of Confluent avro message deserializing adventures
val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")
val df = valueDataFrame.select(
from_json(col("message"),sparkSchema.dataType).alias("parsed_value"))
.select("parsed_value.*")
df.writeStream
.foreachBatch((batchDataFrame,batchId) => {
batchDataFrame.createOrReplaceTempView("`some_view_name`")
val sqlText = "SELECT * FROM `some_view_name` a where some_field='some value'"
val batchDataFrame_view = batchDataFrame.sparkSession.sql(sqlText);
batchDataFrame_view.write.insertInto("default.some_hive_table")
})
.option("checkpointLocation","/user/some_user/tmp/checkpointLocation")
.start()
.awaitTermination()
}
问题(这些问题彼此相关):
- 我需要为每个
readStream.format("kafka")
申请哪个Kafka选项,以便在每次提交Spark应用程序时从最后一个右偏移量开始? - 我是否需要手动读取checkpointLocation / offsets / latest_batch文件的第三行以查找要从Kafka读取的最后一个偏移量?我的意思是这样的:
readStream.format("kafka").option("startingOffsets","""{"some_topic":{"2":35079,"5":34854,"4":35537,"1":35357,"3":35436,"0":35213}}""")
- 从Kafka(汇合)主题中读取流的正确/便捷方式是什么? (我不考虑Kafka的偏移量存储引擎)
解决方法
“在每次提交Spark应用时,我需要在readStream.format(“ kafka”)上应用哪些Kafka选项以从最后一个右偏移量开始?”
您需要设置startingOffsets=latest
并清理检查点文件。
“我需要手动读取checkpointLocation / offsets / latest_batch文件的第三行以找到要从Kafka读取的最后一个偏移量吗?我的意思是这样:readStream.format(” kafka“)。option(” startingOffsets“,” “” {“ some_topic”:{“ 2”:35079,“ 5”:34854,“ 4”:35537,“ 1”:35357,“ 3”:35436,“ 0”:35213}}“”“”“
类似于第一个问题,如果将startingOffsets设置为json字符串,则需要删除检查点文件。否则,spark应用程序将始终获取存储在检查点文件中的信息,并覆盖startingOffsets
选项中给出的设置。
“从Kafka(汇合)主题中读取流的正确/便捷方式是什么?(我不考虑使用Kafka的偏移量存储引擎)”
提出“正确的方法”可能会导致基于意见的答案,因此在Stackoverflow上是不合时宜的。无论如何,根据我的经验,使用Spark结构化流媒体已经是一种成熟且可用于生产的方法。但是,始终值得研究KafkaConnect。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。