气流传感器 - 超时

如何解决气流传感器 - 超时

tl;dr,问题框架:

假设我有一个timeout = 24*60*60 戳的传感器。由于连接偶尔会超时,因此必须允许 retries。如果传感器现在重试,则 timeout 变量将应用于初始 24*60*60 的每次新尝试,因此,任务不会在 24 小时后按预期超时。

问题:

有没有办法限制任务的最大时间——比如元超时?

气流版本:1.10.14

详细介绍:

BASE_DIR = "/some/base/dir/"
FILE_NAME = "some_file.xlsx"
VOL_BASE_DIR = "/some/mounted/vol/"

default_args = {
    "owner": "airflow","depends_on_past": False,"start_date": "2020-11-01","retries": 2,"retry_delay": timedelta(minutes=5),}

dag = DAG(
    "supplier",default_args=default_args,description="ETL Process for supplier",schedule_interval=None,catchup=False,max_active_runs=1,)

file_sensor =  FileSensor(
    task_id="file_sensor",poke_interval=60*60,timeout=24*60*60,retries=4,mode="reschedule",filepath=os.path.join(BASE_DIR,FILE_NAME)
    fs_conn_id='conn_filesensor',dag=dag,)

clean_docker_vol = InitCleanProcFolderOperator(
    task_id="clean_docker_vol",folder=VOL_BASE_DIR,)

....

此 DAG 应运行并检查文件是否存在。如果存在,它应该继续。有时,由于文件提供得太晚(或者说,连接错误),传感器任务可能会被重新安排。 dag 的 MAX 总体“运行时间”不应超过 24 小时。但是,由于重试,如果任务失败并正在重新安排,时间确实会超过 24 小时超时。

示例:

  1. 运行 4 小时(应该还剩 18 小时)
  2. 失败
  3. up_for_retry
  4. 以 24 小时的超时时间重新开始,而不是 18 小时。

因为我需要允许重试,所以不能只将重试设置为 0 来避免这种行为。我宁愿寻找气流的元超时变量,提示如何在相关类或任何其他解决方法中实现这一点。

非常感谢。

解决方法

您可以使用 poke_interval parameter 配置预定义超时内的戳频率。像这样:MySensor(...,retries=0,timeout=24*60*60,poke_interval=60*60)。在这个例子中,传感器每小时会戳一次,如果一天内没有成功,它就会失败。

,

我实施了一个相当笨拙的解决方案,但对我有用。

  • 向传感器类添加了一个新函数:
    def _apply_meta_timeout(self,context):

        if not self.meta_task_timeout:
            return None
        elif self.meta_task_timeout and self.retries == 0:
            raise ValueError("'Meta_task_timeout' cannot be applied if 'retries' are set to 0. Use 'timeout' instead.")

        if isinstance(self.meta_task_timeout,datetime.timedelta):
            self.meta_task_timeout = meta_task_timeout.seconds
        if not isinstance(self.meta_task_timeout,(int,float)):
            raise ValueError("Cannot covert 'meta_task_timeout' to type(int) or type(float).")

        if self.meta_task_timeout < self.timeout:
             raise ValueError("'meta_task_timeout' cannot be less than 'timeout' variable.")
        
        logging.info(f"Get current dagrun params: {context['ti'].task_id},{context['ti'].dag_id},{context['ti'].execution_date},{context['ti'].try_number}" )
        pg_hook = PostgresHook(postgres_conn_id="airflow-metadata-db")
        pg_cur = pg_hook.get_cursor()
        if not context['ti'].try_number == 1:
            try:  
                query = f"""
                select start_date from task_fail
                    where task_id='{context['ti'].task_id}' 
                    and dag_id='{context['ti'].dag_id}' 
                    and execution_date ='{context['ti'].execution_date}' 
                    order by start_date asc 
                    LIMIT 1;"""
                pg_cur.execute(query)
                init_start_timestamp = pg_cur.fetchone()[0] #.isoformat()
            except Exception as e:
                raise ConnectionError("Connection failed with error: " + str(e) )
            finally:
                pg_cur.close(),pg_hook.get_conn().close()
        else:
            init_start_timestamp = context['ti'].start_date #.isoformat()

        logging.info(f"Initial dag startup: {init_start_timestamp}")


        if (timezone.utcnow() - init_start_timestamp).total_seconds() > self.meta_task_timeout:
            if self.soft_fail:
                self._do_skip_downstream_tasks(context)
            raise AirflowSkipException('Snap. Maximal task runtime is UP.')

        logging.info(f"Time left until 'meta_time_out' applies: {self.meta_task_timeout - (timezone.utcnow() - init_start_timestamp).total_seconds()} second(s).
  • 覆盖/添加到 poke 功能:
 def poke(self,context):
...
...
        # check for meta-time-out
        self._apply_meta_timeout(context)
  • 将气流数据库连接添加为: airflow-metadata-db

  • 使用附加参数调用传感器操作员:

    dummy_sensor = FileSensor(
        task_id="file_sensor",remote_path=os.path.join(REMOTE_INPUT_PATH,REMOTE_INPUT_FILE),do_xcom_push=False,timeout= 60,retries=2,mode="reschedule",meta_task_timeout=5*60,soft_fail=True,#context=True,)

必须应用此变通方法的主要问题是,气流似乎覆盖了每个 DAG 尝试的初始 start_date

请随时添加任何改进建议。谢谢

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?