微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

发生故障时,Spark中止流并且需要帮助来恢复故障

如何解决发生故障时,Spark中止流并且需要帮助来恢复故障

我正在运行一个流程,该流程利用以下sudo代码从kafka中读取内容,然后发布到弹性搜索中。

try {
  val outStreamES = spark.readStream
    .format("kafka")
    .option("subscribe",topics.keys.mkString(","))
    .options(kafkaConfig)
    .load()
    .select($"key".cast(StringType),$"value".cast(StringType),$"topic")
    // Convert untyped dataframe to dataset
    .as[(String,String,String)]
    // Merge all manifests for vehicle in minibatch
    .groupByKey(_._1)
    //Start of merge
    .flatMapGroupsWithState(OutputMode.Append,GroupStateTimeout.ProcessingTimeTimeout)(mergeGroup)

    // .select($"key".cast(StringType),from_json($"value",schema).as("manifest"))
    .select($"_1".alias("key"),$"_2".alias("manifest"))
    val inStreamManifestMain = outStreamES
     inStreamManifestMain
    .select("key","manifest.*")
    // Convert timestamp columns to strings - avoids conversion to longs otherwise
    .writeStream
    .outputMode("append")
    .format("org.elasticsearch.spark.sql")
    .trigger(Trigger.ProcessingTime(conf.getString("spark.trigger")))
    .option("mode","DROPMALFORMED")
    .options(configToMap(conf.getobject("esConf")))
    .start()

在mergeGroup内,我可以尝试/捕获与架构不匹配的任何不良记录。有没有办法拒绝与架构不匹配的不良记录,而不是杀死整个火花流?

我正在使用的try / catch的sudo代码,一条记录导致流连续失败,唯一清除记录的是清除整个主题

val manifests = rows.map(r => (
  try {
    read[ProductManifestDocument](r._2)
  } catch {
    case ex: MappingException => throw MappingException(ex.msg + "\n" + r._2  + "vRECORD Failed TO MAPv ",ex)
  },//all topics
  topics(r._3)
))
  .toList

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。