如何解决气流传感器 - 超时
tl;dr,问题框架:
假设我有一个用 timeout = 24*60*60
戳的传感器。由于连接偶尔会超时,因此必须允许 retries
。如果传感器现在重试,则 timeout
变量将应用于初始 24*60*60
的每次新尝试,因此,任务不会在 24 小时后按预期超时。
问题:
有没有办法限制任务的最大时间——比如元超时?
气流版本:1.10.14
详细介绍:
BASE_DIR = "/some/base/dir/"
FILE_NAME = "some_file.xlsx"
VOL_BASE_DIR = "/some/mounted/vol/"
default_args = {
"owner": "airflow","depends_on_past": False,"start_date": "2020-11-01","retries": 2,"retry_delay": timedelta(minutes=5),}
dag = DAG(
"supplier",default_args=default_args,description="ETL Process for supplier",schedule_interval=None,catchup=False,max_active_runs=1,)
file_sensor = FileSensor(
task_id="file_sensor",poke_interval=60*60,timeout=24*60*60,retries=4,mode="reschedule",filepath=os.path.join(BASE_DIR,FILE_NAME)
fs_conn_id='conn_filesensor',dag=dag,)
clean_docker_vol = InitCleanProcFolderOperator(
task_id="clean_docker_vol",folder=VOL_BASE_DIR,)
....
此 DAG 应运行并检查文件是否存在。如果存在,它应该继续。有时,由于文件提供得太晚(或者说,连接错误),传感器任务可能会被重新安排。 dag 的 MAX 总体“运行时间”不应超过 24 小时。但是,由于重试,如果任务失败并正在重新安排,时间确实会超过 24 小时超时。
示例:
- 运行 4 小时(应该还剩 18 小时)
- 失败
- up_for_retry
- 以 24 小时的超时时间重新开始,而不是 18 小时。
因为我需要允许重试,所以不能只将重试设置为 0 来避免这种行为。我宁愿寻找气流的元超时变量,提示如何在相关类或任何其他解决方法中实现这一点。
非常感谢。
解决方法
您可以使用 poke_interval
parameter 配置预定义超时内的戳频率。像这样:MySensor(...,retries=0,timeout=24*60*60,poke_interval=60*60)
。在这个例子中,传感器每小时会戳一次,如果一天内没有成功,它就会失败。
我实施了一个相当笨拙的解决方案,但对我有用。
- 向传感器类添加了一个新函数:
def _apply_meta_timeout(self,context):
if not self.meta_task_timeout:
return None
elif self.meta_task_timeout and self.retries == 0:
raise ValueError("'Meta_task_timeout' cannot be applied if 'retries' are set to 0. Use 'timeout' instead.")
if isinstance(self.meta_task_timeout,datetime.timedelta):
self.meta_task_timeout = meta_task_timeout.seconds
if not isinstance(self.meta_task_timeout,(int,float)):
raise ValueError("Cannot covert 'meta_task_timeout' to type(int) or type(float).")
if self.meta_task_timeout < self.timeout:
raise ValueError("'meta_task_timeout' cannot be less than 'timeout' variable.")
logging.info(f"Get current dagrun params: {context['ti'].task_id},{context['ti'].dag_id},{context['ti'].execution_date},{context['ti'].try_number}" )
pg_hook = PostgresHook(postgres_conn_id="airflow-metadata-db")
pg_cur = pg_hook.get_cursor()
if not context['ti'].try_number == 1:
try:
query = f"""
select start_date from task_fail
where task_id='{context['ti'].task_id}'
and dag_id='{context['ti'].dag_id}'
and execution_date ='{context['ti'].execution_date}'
order by start_date asc
LIMIT 1;"""
pg_cur.execute(query)
init_start_timestamp = pg_cur.fetchone()[0] #.isoformat()
except Exception as e:
raise ConnectionError("Connection failed with error: " + str(e) )
finally:
pg_cur.close(),pg_hook.get_conn().close()
else:
init_start_timestamp = context['ti'].start_date #.isoformat()
logging.info(f"Initial dag startup: {init_start_timestamp}")
if (timezone.utcnow() - init_start_timestamp).total_seconds() > self.meta_task_timeout:
if self.soft_fail:
self._do_skip_downstream_tasks(context)
raise AirflowSkipException('Snap. Maximal task runtime is UP.')
logging.info(f"Time left until 'meta_time_out' applies: {self.meta_task_timeout - (timezone.utcnow() - init_start_timestamp).total_seconds()} second(s).
- 覆盖/添加到 poke 功能:
def poke(self,context):
...
...
# check for meta-time-out
self._apply_meta_timeout(context)
-
将气流数据库连接添加为:
airflow-metadata-db
-
使用附加参数调用传感器操作员:
dummy_sensor = FileSensor(
task_id="file_sensor",remote_path=os.path.join(REMOTE_INPUT_PATH,REMOTE_INPUT_FILE),do_xcom_push=False,timeout= 60,retries=2,mode="reschedule",meta_task_timeout=5*60,soft_fail=True,#context=True,)
必须应用此变通方法的主要问题是,气流似乎覆盖了每个 DAG 尝试的初始 start_date。
请随时添加任何改进建议。谢谢
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。