微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

会话范围的夹具和异步的pytest问题

如何解决会话范围的夹具和异步的pytest问题

我有多个测试文件,每个文件都有一个如下所示的异步设备:


@pytest.fixture(scope="module")
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()


@pytest.fixture(scope="module")
async def some_fixture():
    return await make_fixture()

我正在使用xdist进行并行化。另外,我有这个装饰器:

@toolz.curry
def throttle(limit,f):
    semaphore = asyncio.Semaphore(limit)

    @functools.wraps(f)
    async def wrapped(*args,**kwargs):
        async with semaphore:
            return await f(*args,**kwargs)

    return wrapped

我有一个函数使用它:

@throttle(10)
def f():
    ...

现在从多个测试文件调用f,但我收到一个异常消息,告诉我不能使用来自不同事件循环的信号灯。

我尝试使用会话级事件循环夹具:



@pytest.fixture(scope="session",autouse=True)
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

但这只给了我

ScopeMismatch:您尝试使用涉及工厂的“模块”范围的请求对象访问“功能”范围的固定装置“ event_loop”

甚至有可能让xdist +异步夹具+信号灯一起工作吗?

解决方法

最终使用以下conftest.py使它起作用:

import asyncio

import pytest


@pytest.fixture(scope="session")
def event_loop():
    return asyncio.get_event_loop()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。