微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

优雅地等待redis队列中的一个工作完成,不用忙等待?

如何解决优雅地等待redis队列中的一个工作完成,不用忙等待?

我正在尝试在当前系统中实现 redis queuejob 将被发送到另一个模块,它应该等到作业完成并返回结果 job.result,然后继续:

with Connection(redis_connection):
    job = job_queue.enqueue(worker_func,func_input1,func_input2)

print("waiting for result")
print(datetime.datetime.Now())
while job.result is None:
    pass
print(datetime.datetime.Now())
print("got result")

# next step
next_step_func(job.result)

...

在这里面临两个问题:

  1. 忙碌的等待,while job.result is None 需要很长时间。我在 worker_func 中的处理时间大约为 2-3 秒,这涉及调用另一台服务器上的 API,但繁忙的等待 while job.result is None 本身又需要 >= 3 秒,总共需要 >= 5 秒。我确定等待发生在 while job.result is None 执行之后,因为我为 worker_funcwhile job.result is None 添加了日志:
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009

如上所示,忙等待 while 循环发生在 worker_func 完成之后。

2,这里有没有其他优雅的方式来实现这个同步等待而不是繁忙的循环?我认为这里的busy loop绝对不是最好的实现,因为它会消耗大量的cpu资源。

谢谢!

-- 编辑我上面的代码以提供更清晰的上下文

我需要从调用 next_step_func(job.result) 的位置返回 job_queue.enqueue 的值。所以更清晰的结构是:

def endpoint():
    with Connection(redis_connection):
        job = job_queue.enqueue(worker_func,func_input2)

    print("waiting for result")
    print(datetime.datetime.Now())
    while job.result is None:
        pass
    print(datetime.datetime.Now())
    print("got result")

    # next step
    return next_step_func(job.result)

...

所以痛点是我需要能够在 job.result 中返回 endpoint(),但作业回调会将我的作业带到 on_success 处的不同上下文。

解决方法

文档建议使用 job callbacks 作为选项:

def job_succeeded(job,connection,result,*args,**kwargs):
    next_step_func(job.result)

def job_failed(job,type,value,traceback):
    # react to the error
    pass

with Connection(redis_connection):
    args = (func_input1,func_input2)
    job_queue.enqueue(worker_func,args=args,on_success=job_succeeded,on_failure=job_failed)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。