微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

grahql-ws订阅aiohttp:如何允许订阅?

如何解决grahql-ws订阅aiohttp:如何允许订阅?

全部! 我尝试使用lib graphql-ws https://github.com/graphql-python/graphql-ws 在网络套接字上创建订阅。 我需要在执行自定义Observable的subscription()时将执行选项allow_subscriptions设置为True

class Subscription(graphene.ObjectType):
solution = graphene.Field(
    SolutionType
)

def resolve_solution(self,info):
    return SolutionObserver()   
    
graphql_schema = graphene.Schema(query=Query,mutation=Mutations,subscription=Subscription)

class SolutionObserver(Observable):
is_ready = False
solution = None

async def subscribe(self,observer):
    counter = 0
    while True:
        if SolutionObserver.is_ready:
            logger.info("SolutionObserver: is ready")
            return SolutionObserver.solution
        elif counter > 2000:
            logger.error("SolutionObserver: no solution was created,timeout err")
            return None
        else:
            logger.info("SolutionObserver: zzz")
            await asyncio.sleep(10)
            counter += 1


@classmethod
def reset(cls):
    logger.info("SolutionObserver: reset solution")
    cls.is_ready = False
    cls.solution = None

@classmethod
def set_solution(cls,solution):
    logger.info("SolutionObserver: set solution")
    cls.is_ready = True
    cls.solution = solution

我尝试在Chrome中使用graphiql。

subscription{solution {
id}}

但成为

{ “错误”:[ { “ message”:“不允许订阅。您将需要使用订阅功能或通过allow_subscriptions = True” } ], “数据”:空 }

在服务器启动时任务中将调用订阅功能

async def subscribe_on_create_solution(solution_observer):
 await solution_observer.subscribe(None)

async def on_startup(app):
 import asyncio
 solution_observer = SolutionObserver()
 asyncio.get_event_loop().create_task(subscribe_on_create_solution(solution_observer))

有任何想法吗?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。