微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何强制 celery 应用动态时间链任务?

如何解决如何强制 celery 应用动态时间链任务?

假设我有以下代码,我需要依赖于其他任务的任务,但有些任务是长时间运行的,并且需要某种类似的 .join asyncio,它会等待一个任务结束才能继续下一个任务,但我的工作流程如下,但一切都需要与入口点函数/任务异步,在那里它将成为我所有链接任务的调用者或另一种方法,以保持动态时间始终从其内部永远运行

daily_cleanup > runs each minute
fix_unkNown_emplids runs each  5 minutes
get_role_people runs each  minute
import_grads runs each 10 minute

示例

beat_schedule = {
    'daily_cleanup': {
        'task': 'celery_app.tasks.daily_cleanup','schedule': timedelta(seconds=60),},'fix_unkNown_emplids': {
        'task': 'celery_app.tasks.fix_unkNown_emplids','schedule': timedelta(seconds=300),'get_role_people': {
        'task': 'celery_app.tasks.get_role_people','import_grads': {
        'task': 'celery_app.tasks.import_grads','schedule': timedelta(seconds=600),}
}

def chain_demo():
    tasks = [
        daily_cleanup.si(),# 1 min
        fix_unkNown_emplids.si(),# 5 min
        get_role_people.si(),# 1 min
        import_grads.si(),# 10 min
    ]

    chain(*tasks).apply_async()

@app.task
def daily_cleanup():
    time.sleep(60)
    return '60'


@app.task
def fix_unkNown_emplids():
    time.sleep(300)
    return '300'


@app.task
def get_role_people():
    time.sleep(60)
    return '60'

@app.task
def import_grads():
    time.sleep(600)
    return '600'

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。