微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在python的ThreadPoolExecutor中添加调度

如何解决在python的ThreadPoolExecutor中添加调度

我想同时运行它的功能很少。因此,使用 from concurrent.futures import ThreadPoolExecutor 适合我。因为它是一种异步方法,通过允许不同的函数进入不同的线程来同时运行不同的函数

注意到所有的函数都会是一个死循环函数(while True),因为我想继续运行它。

之后,我想检查所有这些功能是否运行良好,例如每小时每 05 分钟检查一次(10:05、11:05、12:05 等) 因此,from apscheduler.schedulers.blocking import BlockingScheduler 和 cron 方法听起来很适合我。

但是,有人知道将 scheduler.start() 添加ThreadPoolExecutor

的正确方法是什么吗?

我的代码是这样的:

from concurrent.futures import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler

def function_1():
        while true:
                // do sth for example:
                print ("HI")

def function_2():
        while true:
                // do sth for example:
                print ("bye")

def checker(futures):
        for future in futures:
                if future.running():
                        ...
                        ...
                        ...
                        ...

futures = []

pool = ThreadPoolExecutor(max_workers = 10)
future_1 = pool.submit(function_1(),1)
futures.append(future_1)
future_2 = pool.submit(function_1(),1)
futures.append(future_2)

scheduler = BackgroundScheduler()
scheduler.add_job(job,'cron',minutes=5)
pool.submit(scheduler.start())

上面的代码似乎不像我预期的那样工作,有人可以帮助我吗?谢谢!!

解决方法

对于简单的代码流,您可以尝试 simple-scheduler。虽然在每个函数中都不需要 while 循环,但调度程序会在内部为您执行此操作。此外,您不需要 ThreadPoolExecutor,因为简单调度程序使用多处理,这比池线程提供更好的结果。

from simple_scheduler.recurring import recurring_scheduler
from simple_scheduler.event import event_scheduler

def function_1():
    print ("HI")
def function_2():
    print ("bye")
def are_all_functions_running():
    # find out pid(s) of all jobs through job_summary()
    for p in jobs:
        if p.is_alive():
            print(f"{p.name} is running")
        else:
            print(f"{p.name} has stopped")

recurring_scheduler.add_job(target=function_1,period_in_seconds=5*60,job_name="function_1")
recurring_scheduler.add_job(target=function_2,job_name="function_2")
event_scheduler.add_job(target= are_all_functions_running,when = ["*|**:05"],tz = "Asia/Kolkata",job_name = "are_all_functions_running")
recurring_scheduler.job_summary()
recurring_scheduler.run()
event_scheduler.run()
recurring_scheduler.job_summary()

要检查函数是否正在运行,您可以调用 are_all_functions_running()。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。