如何解决在 Apache Airflow 中管理分支和合并的最佳方式
每当有人注册时,Web 应用程序都会在数据库中记录身份验证代码的电子邮件请求,并重定向到下一个屏幕,用户必须在其中输入发送到他/她的电子邮件的六位数代码。
这是我设想的 DAG:
我的 DAG 代码:
default_args = {
"owner":"airflow","start_date": datetime(2021,4,2),"depends_on_past": False,"email_on_failure": False,"email_on_retry": False,"email":"rhonaldmoses@gmail.com","retries":1,"retries_delay":timedelta(minutes=5)
}
def email_request_exists(response):
json_response_text = json.loads(response.text)
#by default,email requests do not exists
requests_exists = False
if json_response_text['status']==200:
if json_response_text['requests_exists'] > 0:
requests_exists = True
email_request_status = Variable.set('request_exists',requests_exists)
return email_request_status
def get_request_processor():
switch = Variable.get('request_exists')
if switch:
return 'anp_process_email_requests'
else:
return 'anp_no_email_to_process'
def check_is_request_processed(response):
print(response.text)
json_response_text = json.loads(response.text)
print(json_response_text)
if json_response_text['status']==200:
if json_response_text['request_processed'] > 0:
return 'anp_check_orphan_emails'
return 'anp_email_request_done'
with DAG(dag_id="artnpics_auth_email_manager",schedule_interval="* * * * *",default_args=default_args,catchup=False) as dag:
anp_is_email_request_available = HttpSensor(
task_id="anp_is_email_request_available",method='GET',http_conn_id='artnpics_api_calls',endpoint='commsemail/api-email-request-processor/',response_check=lambda response: True if email_request_exists(response) is True else False,poke_interval=5,timeout=20
)
anp_has_emails_to_process = BranchPythonoperator(
task_id='anp_has_emails_to_process',python_callable=get_request_processor,trigger_rule="one_success"
#trigger_rule="all_success"
)
#execute this when the email request queue check returned success
anp_process_email_requests = HttpSensor(
task_id="anp_process_email_requests",endpoint='commsemail/api-email-process-requests/',response_check=lambda response: True if check_is_request_processed(response) is True else False,timeout=20
)
#execute this when the email request queue check returned failure
anp_no_email_to_process = DummyOperator(
task_id='anp_no_email_to_process',#trigger_rule="all_success"
)
#check records that are ommitted for a very long time and reprioritize them
anp_check_orphan_emails = DummyOperator(
task_id='anp_check_orphan_emails',trigger_rule="one_success"
)
anp_email_request_done = DummyOperator(
task_id='anp_email_request_done',trigger_rule="one_success"
)
anp_is_email_request_available >> anp_has_emails_to_process >> anp_no_email_to_process >> anp_email_request_done
anp_is_email_request_available >> anp_has_emails_to_process >> anp_process_email_requests >> anp_email_request_done
当我运行时, anp_is_email_request_available 任务被执行,然后它停止。但是,它不会转到下一个(分支运算符)。
基本上,这就是我想要完成的:
执行 anp_is_email_request_available 并查看 API 是否返回 true/false(如果存在 status=1 的电子邮件请求,则返回 True 否则返回 False)。
如果anp_is_email_request_available中的返回值为True,则执行anp_process_email_requests。否则,处理 anp_email_request_done。
如果anp_is_email_request_available中的retuend值为False,则执行anp_email_request_done
API 运行良好(我删除了所有条件分支并使其按顺序运行,并且运行完美)。
解决方法
尝试检查 anp_is_email_request_available
的日志以了解更多详细信息,但我认为您在 callable
的 lambda
中使用的 response_check
函数可能有问题。
response_check=lambda response: True if email_request_exists(response) is True else False,
email_request_exists
返回 None
而不是 True
使 response_check
失败。稍后 check_is_request_processed
也会发生同样的情况。您可以更改这些函数的返回值或更改三元运算符以计算该值的truthy。
response_check=lambda response: True if email_request_exists(response) else False,
您可以在此 answer 上找到可调用的示例。希望对你有用!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。