Airflow2 在函数 Run

如何解决Airflow2 在函数 Run

大家好,我正在处理气流,这是我正在尝试解决的场景 我想在函数运行后动态创建 DAG

try:
    import os
    import sys

    from datetime import timedelta,datetime
    from airflow import DAG

    from airflow.operators.python_operator import Pythonoperator
    from airflow.operators.email_operator import EmailOperator
    from airflow.utils.trigger_rule import TriggerRule
    from airflow.utils.task_group import TaskGroup
    import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


# ===============================================
default_args = {
    "owner": "airflow","start_date": datetime(2021,1,1),"retries": 1,"retry_delay": timedelta(minutes=1),'email': ['shahsoumil519@gmail.com'],'email_on_failure': True,'email_on_retry': False,}
dag = DAG(dag_id="project",schedule_interval="@once",default_args=default_args,catchup=False)
# ================================================


class XcomHelper(object):

    def __init__(self,**context):
        self.context = context

    def get(self,key=None):
        """ Get the Value from XCOM"""
        try:
            return self.context.get("ti").xcom_pull(key=key)
        except Exception as e: return "Error"

    def push(self,key=None,value=None):

        """Push the value on session """
        try:
            self.context['ti'].xcom_push(key=key,value=value)
            return True
        except Exception as e: return False



def create_dag(dag_id,schedule,dag_number,default_args):

    def hello_world_py():
        print('Hello World')

    dag = DAG(dag_id,schedule_interval=schedule,default_args=default_args)

    with dag:
        t1 = Pythonoperator(task_id=dag_id,python_callable=hello_world_py)

    return dag


def simple_task(**context):

    DATA = ["soumil","Shah"]
    
    for n in range(1,len(DATA)):
        try:
            dag_id = 'hello_world_{}'.format(str(n))
            print("DAG ID : {} ".format(dag_id))
            default_args = {'owner': 'airflow','start_date': datetime(2018,1)}
            schedule = '@daily'
            dag_number = n
            globals()[dag_id] = create_dag(dag_id,default_args)
        except Exception as e:
            print("Error : {} ".format(e))

with DAG(dag_id="project",catchup=False) as dag:

    simple_task = Pythonoperator(task_id="simple_task",python_callable=simple_task,provide_context=True)


simple_task


我想根据 DATA 变量的 len 创建这些 dag 数据来自数据库

我试着调查

任何帮助都会很棒

修订代码

try:
    import os
    import sys

    from datetime import timedelta,datetime
    from airflow import DAG

    from airflow.operators.python_operator import Pythonoperator

    # from airflow.operators.email_operator import EmailOperator
    # from airflow.utils.trigger_rule import TriggerRule
    # from airflow.utils.task_group import TaskGroup
    # import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


def create_dag(dag_id,default_args):
    def hello_world_py():
        print('Hello World')

    dag = DAG(dag_id,python_callable=hello_world_py)

    return dag


def simple_task():
    DATA = ["soumil","Shah","Shah2"]

    for n in range(0,default_args)
        except Exception as e:
            print("Error : {} ".format(e))

    def trigger_function():
        print("HEREE")
        simple_task()

    with DAG(dag_id="project",default_args={'owner': 'airflow',1)},catchup=False) as dag:


        trigger_function = Pythonoperator(task_id="trigger_function",python_callable=trigger_function,provide_context=True,)


    trigger_function

解决方法

我从您的代码中删除了几行,以使答案切题。以下代码将根据 hello_world_0,hello_world_1... 的内容生成类似 DATA 的 DAG。

编辑 - 我使用了气流 v1.10.x,但代码应该适用于 v2.x

建议:

  1. 使任务名称与 DAG 名称不同。
  2. dag_number 变量当前未被使用。那可以取下来。

DAG 将如下所示 -

enter image description here

try:
    import os
    import sys

    from datetime import timedelta,datetime
    from airflow import DAG

    from airflow.operators.python_operator import PythonOperator

    # from airflow.operators.email_operator import EmailOperator
    # from airflow.utils.trigger_rule import TriggerRule
    # from airflow.utils.task_group import TaskGroup
    # import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


def create_dag(dag_id,schedule,dag_number,default_args):
    def hello_world_py():
        print('Hello World')

    dag = DAG(dag_id,schedule_interval=schedule,default_args=default_args)

    with dag:
        t1 = PythonOperator(task_id=dag_id,python_callable=hello_world_py)

    return dag


def simple_task():
    DATA = ["soumil","Shah","Shah2"]

    for n in range(0,len(DATA)):
        try:
            dag_id = 'hello_world_{}'.format(str(n))
            print("DAG ID : {} ".format(dag_id))
            default_args = {'owner': 'airflow','start_date': datetime(2018,1,1)}
            schedule = '@daily'
            dag_number = n
            globals()[dag_id] = create_dag(dag_id,default_args)
        except Exception as e:
            print("Error : {} ".format(e))


simple_task()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?