如何解决Spark Structured Streaming:如何在处理 Kafka 主题及其在 S3 上的备份时达到相同的顺序
问题
如何在处理 Kafka 主题和使用 Spark Structured Streaming 的 S3 备份之间实现幂等性(相同的事件顺序)?
请参阅下面代码示例中的 # QUESTION:
注释。
用例
假设您有与 Uber 完全相同的用例 - 用于两者的流处理:低延迟和历史分析。有关实际示例,请参阅 Designing a Production-Ready Kappa Architecture for Timely Data Stream Processing 的“动机”部分:
您将数据存储在 Kafka 中,使用 Kafka Connect AWS S3 Sink 将它们存档到 S3。流处理由 Spark Structured Streaming 完成。您决定使用 "Combined approach" 部分解决问题:通过 S3(Kafka 备份主题)以后台回填模式运行代码。
代码示例
def some_processing(spark: SparkSession,stream: DataFrame,backfilling_mode: bool):
input_stream = read(spark,'input_topic',backfilling_mode)
processed_stream = input_stream \
.withWatermark('event_time','10 seconds') \
.groupBy(window('event_time','10 seconds','10 seconds'),'user_id') \
.agg(sum(col('price'))) \
.join(...)
write(processed_stream,'output_topic',backfilling_mode)
def read(spark: SparkSession,topic_name: str,backfilling_mode: bool) -> DataFrame:
"""Reads data from Kafka topic or its S3 backup"""
# QUESTION: how Spark achieve the same order on backfilling_mode=True and backfilling_mode=False
if backfilling_mode:
stream = spark \
.readStream \
.schema(input_schema) \
.format('com.databricks.spark.avro') \
.option('maxFilesPerTrigger',20) \
.load('/s3_base_bath/{}'.format(topic_name))
else:
stream = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers','host1:port1,host2:port2') \
.option('subscribe',topic_name) \
.load()
def write(stream: DataFrame,backfilling_mode: bool):
"""Writes data to Kafka topic or its S3 backup"""
if backfilling_mode:
stream \
.writeStream \
.trigger(processingTime='60 seconds') \
.format('com.databricks.spark.avro') \
.option('path','/s3_base_bath/{}'.format(topic_name)) \
.start() \
.awaitTermination()
else:
stream \
.writeStream \
.trigger(processingTime='10 seconds') \
.option('kafka.bootstrap.servers',host2:port2') \
.option('topic','output_topic')
.format('kafka') \
.start() \
.awaitTermination()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。