如何解决气流:从另一个操作员呼叫一个操作员
def func(**kwargs):
print(config['rds_conn_id'])
rds = PostgresHooklog(postgres_conn_id=config['rds_conn_id'])
rds.set_autocommit(rds.get_conn(),True)
conn = rds
print(conn)
# a = []
prevIoUs_e = None
for i in range(0,1):
a = TriggerDagRunoperator(
task_id='test_trigger_dagrun'+str(i),trigger_dag_id="example1_trigger_target_dag",provide_context=True,params={'message': 'Hello world'},trigger_rule="all_done",dag=dag,)
if prevIoUs_e:
prevIoUs_e >> a
a
prevIoUs_e = a
# a.execute(dict({ 'message': 'Hello world'}))
# if i not in [0]:
# a[i - 1] >> a[i]
default_args={"owner": "airflow","start_date": days_ago(2)}
# Define the DAG
dag = DAG(
dag_id="example1_trigger_controller_dag",default_args={"owner": "airflow","start_date": days_ago(2)},schedule_interval="@once",tags=['example']
)
with dag:
# Define the single task in this controller example DAG
bhuvitest = Pythonoperator(
task_id='python_task',python_callable=func,dag=dag)
我想通过函数调用 Pythonoperator 中的 TriggerDagRunoperator,这将从数据库中获取一些记录。根据记录遍历循环并调用 TriggerDagRunoperator。
- 如何使每次迭代顺序而不是并行。在这种情况下它不起作用,即使它没有调用 TriggerDagRunoperator。
- 在这种情况下,我想将 TriggerDagRunoperator 中的一些参数传递给目标 dag 变量消息作为参数,但这也不起作用。
非常感谢您的帮助。 谢谢
解决方法
Airflow 中不允许像您那样使用运算符。所有操作符都必须存在于 DAG 上下文中。如 Airflow official tutorial 中所述,DAG 定义“需要快速评估(秒,而不是分钟),因为调度程序将定期执行它以反映更改(如果有)”。因此,不建议通过查询数据库来“即时”创建 DAG。
话虽如此,您可以使用 Variables
动态定义 DAG,甚至是工作人员的本地文件(如果您有多个工作人员,那就更复杂了)。两者都不是“好的做法”,但您仍然可以使用它。对于 Variables
选项,您可以有两个 DAG,一个用于查询数据库并设置配置变量,另一个用于读取它、遍历它并创建任务。这也不是很推荐,因为您每次执行 Variable.get("variable_name")
时都会创建到元数据数据库的连接。
回答您的问题:
- 要使其连续,只需使用
for idx,val in enumerate(iterable)
进行迭代并执行以下操作:
if idx > 0:
iterable[idx - 1] >> iterable[idx]
- 要传递一些参数,您可以使用
TriggerDagRunOperator
中的conf
参数。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。