如何解决捕获作业执行期间发生的异常
import functools
import aioschedule as schedule
import asyncio
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args,**kwargs):
try:
return job_func(*args,**kwargs)
except:
import traceback
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
async def bad_task():
print(1 / 0)
def main():
schedule.every().minute.do(bad_task)
loop = asyncio.get_event_loop()
while True:
loop.run_until_complete(schedule.run_pending())
if __name__ == '__main__':
main()
我想捕获作业中将发生的异常并停止作业。
这里我使用 aioschedule aioschedule。
此解决方案选自exception handling。
运行上述代码后 ZeroDivisionError: division by zero
错误不断发生。
当我使用 schedule schedule 而不是 aioschedule 上面的代码工作得很好。
但是我的方法是使用 await 的异步方法。
任何帮助将不胜感激。
谢谢
解决方法
我在 await
的末尾添加了一个 bad_task()
语句,它将等待您的函数调用完成。因此,如果您的函数因错误而终止,则结果将在 while 循环中捕获并且程序将终止。
async def bad_task():
print(1/0)
await asyncio.sleep(0.1) # waits for your function to execute completely
def main():
loop = asyncio.get_event_loop()
while True:
loop.run_until_complete(bad_task())
if __name__ == '__main__':
main()
更新:这里有一个更好的方法,您可以避免在函数末尾使用 await
语句。
async def bad_task():
print(1/0)
def main():
loop = asyncio.get_event_loop()
while True:
# collect the promises
result = asyncio.gather(bad_task())
loop.run_until_complete(result)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。