#coding=utf-8 from datetime import datetime,timedelta from airflow import DAG from airflow.operators.python_operator import Pythonoperator import airflow.utils # 定义默认参数 default_args = { ‘owner‘: ‘airflow‘,# 拥有者名称 ‘start_date‘: airflow.utils.dates.days_ago(1),# 第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期 ‘email‘: [‘[email protected]‘],# 接收通知的email列表 ‘email_on_failure‘: True,# 是否在任务执行失败时接收邮件 ‘email_on_retry‘: True,# 是否在任务重试时接收邮件 ‘retries‘: 3,# 失败重试次数 ‘retry_delay‘: timedelta(seconds=5) # 失败重试间隔 } # 定义DAG dag = DAG( dag_id=‘hello_world_args‘,# dag_id default_args=default_args,# 指定默认参数 # schedule_interval="00,*,*" # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行 # schedule_interval=timedelta(minutes=1) # 执行周期,表示每分钟执行一次 ) # 定义要执行的Python函数1 def hello_world_args_1(**context): current_time = str(datetime.today()) with open(‘/tmp/hello_world_args_1.txt‘,‘a‘) as f: f.write(‘%s\n‘ % current_time) assert 1 == 1 # 可以在函数中使用assert断言来判断执行是否正常,也可以直接抛出异常 context[‘task_instance‘].xcom_push(key=‘sea1‘,value="seaseseseaaa") return "t2" # 定义要执行的Python函数2 def hello_world_args_2(**context): sea = context[‘task_instance‘].xcom_pull(key="sea1") current_time = str(datetime.today()) with open(‘/tmp/hello_world_args_2.txt‘,‘a‘) as f: f.write(‘%s\n‘ % current_time+sea) # 定义要执行的task 1 t1 = Pythonoperator( task_id=‘hello_world_args_1‘,# task_id python_callable=hello_world_args_1,# 指定要执行的函数 dag=dag,# 指定归属的dag retries=2,# 重写失败重试次数,如果不写,则默认使用dag类中指定的default_args中的设置 provide_context=True,) # 定义要执行的task 2 t2 = Pythonoperator( task_id=‘hello_world_args_2‘,# task_id python_callable=hello_world_args_2,# 指定归属的dag provide_context=True,) t1>>t2
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。