微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark Structured Streaming:如何在处理 Kafka 主题及其在 S3 上的备份时达到相同的顺序

如何解决Spark Structured Streaming:如何在处理 Kafka 主题及其在 S3 上的备份时达到相同的顺序

问题

如何在处理 Kafka 主题和使用 Spark Structured Streaming 的 S3 备份之间实现幂等性(相同的事件顺序)?

请参阅下面代码示例中的 # QUESTION: 注释。

用例

假设您有与 Uber 完全相同的用例 - 用于两者的流处理:低延迟和历史分析。有关实际示例,请参阅 Designing a Production-Ready Kappa Architecture for Timely Data Stream Processing 的“动机”部分:

您将数据存储在 Kafka 中,使用 Kafka Connect AWS S3 Sink 将它们存档到 S3。流处理由 Spark Structured Streaming 完成。您决定使用 "Combined approach" 部分解决问题:通过 S3(Kafka 备份主题)以后台回填模式运行代码

代码示例

def some_processing(spark: SparkSession,stream: DataFrame,backfilling_mode: bool):
  input_stream = read(spark,'input_topic',backfilling_mode)
  processed_stream = input_stream \
    .withWatermark('event_time','10 seconds') \
    .groupBy(window('event_time','10 seconds','10 seconds'),'user_id') \
    .agg(sum(col('price'))) \
    .join(...)
  write(processed_stream,'output_topic',backfilling_mode)

def read(spark: SparkSession,topic_name: str,backfilling_mode: bool) -> DataFrame:
  """Reads data from Kafka topic or its S3 backup"""
  # QUESTION: how Spark achieve the same order on backfilling_mode=True and backfilling_mode=False
  if backfilling_mode:
    stream = spark \
      .readStream \
      .schema(input_schema) \ 
      .format('com.databricks.spark.avro') \
      .option('maxFilesPerTrigger',20) \
      .load('/s3_base_bath/{}'.format(topic_name)) 
  else:
    stream = spark \
      .readStream \
      .format('kafka') \
      .option('kafka.bootstrap.servers','host1:port1,host2:port2') \
      .option('subscribe',topic_name) \
      .load() 


def write(stream: DataFrame,backfilling_mode: bool):
  """Writes data to Kafka topic or its S3 backup"""
  if backfilling_mode:
    stream \
      .writeStream \
      .trigger(processingTime='60 seconds') \
      .format('com.databricks.spark.avro') \
      .option('path','/s3_base_bath/{}'.format(topic_name)) \
      .start() \
      .awaitTermination()
  else:
    stream \
      .writeStream \
      .trigger(processingTime='10 seconds') \
      .option('kafka.bootstrap.servers',host2:port2') \
      .option('topic','output_topic')
      .format('kafka') \
      .start() \
      .awaitTermination()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?