在 Apache Airflow 中管理分支和合并的最佳方式

如何解决在 Apache Airflow 中管理分支和合并的最佳方式

我有一个问题要解决

每当有人注册时,Web 应用程序都会在数据库中记录身份验证代码的电子邮件请求,并重定向到下一个屏幕,用户必须在其中输入发送到他/她的电子邮件的六位数代码

这是我设想的 DAG:

enter image description here

我的 DAG 代码

default_args = {
    "owner":"airflow","start_date": datetime(2021,4,2),"depends_on_past": False,"email_on_failure": False,"email_on_retry": False,"email":"rhonaldmoses@gmail.com","retries":1,"retries_delay":timedelta(minutes=5)
}
 
def email_request_exists(response):
    json_response_text = json.loads(response.text)
    
    #by default,email requests do not exists
    requests_exists = False
    
    if json_response_text['status']==200:
        if json_response_text['requests_exists'] > 0:
            requests_exists = True
 
    email_request_status = Variable.set('request_exists',requests_exists)
    return email_request_status
 
def get_request_processor():
    switch = Variable.get('request_exists')
    
    if switch:
        return 'anp_process_email_requests'
    else:
        return 'anp_no_email_to_process'
 
def check_is_request_processed(response):
    print(response.text)
    
    json_response_text = json.loads(response.text)
    print(json_response_text)
    
    if json_response_text['status']==200:
        if json_response_text['request_processed'] > 0:
            return 'anp_check_orphan_emails'
 
    return 'anp_email_request_done'
    
 
with DAG(dag_id="artnpics_auth_email_manager",schedule_interval="* * * * *",default_args=default_args,catchup=False) as dag:
    
    anp_is_email_request_available = HttpSensor(
        task_id="anp_is_email_request_available",method='GET',http_conn_id='artnpics_api_calls',endpoint='commsemail/api-email-request-processor/',response_check=lambda response: True if email_request_exists(response) is True else False,poke_interval=5,timeout=20
    )
    
    anp_has_emails_to_process = BranchPythonoperator(
        task_id='anp_has_emails_to_process',python_callable=get_request_processor,trigger_rule="one_success"
        #trigger_rule="all_success"
    )
    
    #execute this when the email request queue check returned success
    anp_process_email_requests = HttpSensor(
        task_id="anp_process_email_requests",endpoint='commsemail/api-email-process-requests/',response_check=lambda response: True if check_is_request_processed(response) is True else False,timeout=20
    )
    
    #execute this when the email request queue check returned failure
    anp_no_email_to_process = DummyOperator(
        task_id='anp_no_email_to_process',#trigger_rule="all_success"
    )
    
    #check records that are ommitted for a very long time and reprioritize them
    anp_check_orphan_emails = DummyOperator(
        task_id='anp_check_orphan_emails',trigger_rule="one_success"
    )
    
    anp_email_request_done = DummyOperator(
        task_id='anp_email_request_done',trigger_rule="one_success"
    )
    
    anp_is_email_request_available >> anp_has_emails_to_process >> anp_no_email_to_process >> anp_email_request_done
    anp_is_email_request_available >> anp_has_emails_to_process >> anp_process_email_requests >> anp_email_request_done

当我运行时, anp_is_email_request_available 任务被执行,然后它停止。但是,它不会转到下一个(分支运算符)。

基本上,这就是我想要完成的:

执行 anp_is_email_request_available 并查看 API 是否返回 true/false(如果存在 status=1 的电子邮件请求,则返回 True 否则返回 False)。

如果anp_is_email_request_available中的返回值为True,则执行anp_process_email_requests。否则,处理 anp_email_request_done。

如果anp_is_email_request_available中的retuend值为False,则执行anp_email_request_done

API 运行良好(我删除了所有条件分支并使其按顺序运行,并且运行完美)。

解决方法

尝试检查 anp_is_email_request_available 的日志以了解更多详细信息,但我认为您在 callablelambda 中使用的 response_check 函数可能有问题。

response_check=lambda response: True if email_request_exists(response) is True else False,

email_request_exists 返回 None 而不是 True 使 response_check 失败。稍后 check_is_request_processed 也会发生同样的情况。您可以更改这些函数的返回值或更改三元运算符以计算该值的truthy

response_check=lambda response: True if email_request_exists(response) else False,

您可以在此 answer 上找到可调用的示例。希望对你有用!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?