如何解决Airflow2 在函数 Run
大家好,我正在处理气流,这是我正在尝试解决的场景 我想在函数运行后动态创建 DAG
try:
import os
import sys
from datetime import timedelta,datetime
from airflow import DAG
from airflow.operators.python_operator import Pythonoperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.task_group import TaskGroup
import pandas as pd
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
# ===============================================
default_args = {
"owner": "airflow","start_date": datetime(2021,1,1),"retries": 1,"retry_delay": timedelta(minutes=1),'email': ['shahsoumil519@gmail.com'],'email_on_failure': True,'email_on_retry': False,}
dag = DAG(dag_id="project",schedule_interval="@once",default_args=default_args,catchup=False)
# ================================================
class XcomHelper(object):
def __init__(self,**context):
self.context = context
def get(self,key=None):
""" Get the Value from XCOM"""
try:
return self.context.get("ti").xcom_pull(key=key)
except Exception as e: return "Error"
def push(self,key=None,value=None):
"""Push the value on session """
try:
self.context['ti'].xcom_push(key=key,value=value)
return True
except Exception as e: return False
def create_dag(dag_id,schedule,dag_number,default_args):
def hello_world_py():
print('Hello World')
dag = DAG(dag_id,schedule_interval=schedule,default_args=default_args)
with dag:
t1 = Pythonoperator(task_id=dag_id,python_callable=hello_world_py)
return dag
def simple_task(**context):
DATA = ["soumil","Shah"]
for n in range(1,len(DATA)):
try:
dag_id = 'hello_world_{}'.format(str(n))
print("DAG ID : {} ".format(dag_id))
default_args = {'owner': 'airflow','start_date': datetime(2018,1)}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id,default_args)
except Exception as e:
print("Error : {} ".format(e))
with DAG(dag_id="project",catchup=False) as dag:
simple_task = Pythonoperator(task_id="simple_task",python_callable=simple_task,provide_context=True)
simple_task
我想根据 DATA 变量的 len 创建这些 dag 数据来自数据库
我试着调查
- https://www.astronomer.io/guides/dynamically-generating-dags
- Can an Airflow task dynamically generate a DAG at runtime?
- https://medium.com/@flavio.mtps/making-use-of-python-globals-to-dynamically-create-airflow-dags-124e556b704e
任何帮助都会很棒
修订代码:
try:
import os
import sys
from datetime import timedelta,datetime
from airflow import DAG
from airflow.operators.python_operator import Pythonoperator
# from airflow.operators.email_operator import EmailOperator
# from airflow.utils.trigger_rule import TriggerRule
# from airflow.utils.task_group import TaskGroup
# import pandas as pd
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
def create_dag(dag_id,default_args):
def hello_world_py():
print('Hello World')
dag = DAG(dag_id,python_callable=hello_world_py)
return dag
def simple_task():
DATA = ["soumil","Shah","Shah2"]
for n in range(0,default_args)
except Exception as e:
print("Error : {} ".format(e))
def trigger_function():
print("HEREE")
simple_task()
with DAG(dag_id="project",default_args={'owner': 'airflow',1)},catchup=False) as dag:
trigger_function = Pythonoperator(task_id="trigger_function",python_callable=trigger_function,provide_context=True,)
trigger_function
解决方法
我从您的代码中删除了几行,以使答案切题。以下代码将根据 hello_world_0,hello_world_1...
的内容生成类似 DATA
的 DAG。
编辑 - 我使用了气流 v1.10.x,但代码应该适用于 v2.x
建议:
- 使任务名称与 DAG 名称不同。
-
dag_number
变量当前未被使用。那可以取下来。
DAG 将如下所示 -
try:
import os
import sys
from datetime import timedelta,datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# from airflow.operators.email_operator import EmailOperator
# from airflow.utils.trigger_rule import TriggerRule
# from airflow.utils.task_group import TaskGroup
# import pandas as pd
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
def create_dag(dag_id,schedule,dag_number,default_args):
def hello_world_py():
print('Hello World')
dag = DAG(dag_id,schedule_interval=schedule,default_args=default_args)
with dag:
t1 = PythonOperator(task_id=dag_id,python_callable=hello_world_py)
return dag
def simple_task():
DATA = ["soumil","Shah","Shah2"]
for n in range(0,len(DATA)):
try:
dag_id = 'hello_world_{}'.format(str(n))
print("DAG ID : {} ".format(dag_id))
default_args = {'owner': 'airflow','start_date': datetime(2018,1,1)}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id,default_args)
except Exception as e:
print("Error : {} ".format(e))
simple_task()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。