微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Flink JobListener无法正常工作

如何解决Apache Flink JobListener无法正常工作

我在flink 1.11.1中编写了一个flink批处理作业。工作成功完成后,我想执行一些操作,例如调用http服务。

添加一个简单的作业侦听器以挂钩作业状态。问题是当卡夫卡接收器运算符抛出错误时,作业监听器不会触发。我希望当我的工作失败时,它会触发我的工作监听器并打印失败日志。

如何确定作业是否成功完成?

任何帮助将不胜感激。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
      override def onJobSubmitted(jobClient: JobClient,throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }

      override def onJobExecuted(jobExecutionResult: JobExecutionResult,throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })

    env.createInput(input)
      .filter(r => Option(r.token).getorElse("").nonEmpty)
      .addSink(kafkaProducer)

解决方法

如果尝试在群集上运行作业,则可以使用作业ID在控制台中查看记录器消息和标准输出。请参阅所附的屏幕截图,

如果您在本地群集上运行,则默认URL可能为http:// localhost:8081。

同样,以下不是检查您的工作是否成功的正确方法。

if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }

enter image description here

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。