如何解决如何使用Connexion处理AioHttp上的错误
我想在我的python网络api中使用AioHttp和Connexion处理错误,就像Flask通过@app.errorhandler(Exception)
所做的一样
换句话说,假设我的服务引发了SomethingalreadyExists,并且我想返回409冲突,而不是在我的所有api中添加以下代码:
try:
myservice.create_something(..)
except SomethingalreadyExists as error: # Repeated code -> DRY
return json_response({"message": str(error)},status=409)
我只想在API层中调用myservice.create_something(..)
,错误处理将为SomethingalreadyExists
异常返回409或为SomethingNotFound
返回404。
注意: 在Flask土地上,如下图所示:
import connexion
def create_api_app(version='api'):
connexion_app = connexion.FlaskApp(__name__,specification_dir='../api/')
connexion_app.add_api('openapi.yaml',validate_responses=True)
app = connexion_app.app
# It intercepts the specific exception and returns the respective status_code
@app.errorhandler(InvalidValueException)
def bad_request_handler(error):
return 'Bad Request: {}'.format(str(error)),400
@app.errorhandler(NotFoundException)
def not_found_handler(error):
return 'Not found: {}'.format(str(error)),404
@app.errorhandler(AlreadyExistsException)
def conflict_handler(error):
return 'Conflict: {}'.format(str(error)),409
# my_service.py
def get_model(i):
model = get_model_or_none(id)
if btask is None:
raise NotFoundException(f"Model id:{id} not found.")
...
# api.py
def get_model(id):
model = my_service.get_model(id)
# handle errors not required ;)
return btask.to_dict()
我想在我的AioHttp连接应用程序中做同样的事情:
from connexion import AioHttpApp
def create_app():
connexion_app = AioHttpApp(__name__,port=8000,specification_dir="../",only_one_api=True)
connexion_app.add_api("openapi.yaml",pass_context_arg_name="request")
# Do something here.. like
# web.Application(middlewares=[handle_already_exists_errors]) --> doesn't work
# OR
# connex_app.add_error_handler(
# AlreadyExistsException,handle_already_exists_errors) --> doesn't work too
return connexion_app
干杯,我将不胜感激!
罗杰
解决方法
我正在研究connexion和aiohttp代码,并找到了一种使用中间件的方法:
import json
from aiohttp.web import middleware
from connexion.lifecycle import ConnexionResponse
from connexion import AioHttpApp
from .exceptions import NotFoundException,AlreadyExistsException
def create_app():
connexion_app = AioHttpApp(
__name__,port=8000,specification_dir="../",only_one_api=True
)
connexion_app.add_api("openapi.yaml",pass_context_arg_name="request")
connexion_app.app.middlewares.append(errors_handler_middleware)
return connexion_app
@middleware
async def errors_handler_middleware(request,handler):
""" Handle standard errors returning response following the connexion style messages"""
try:
response = await handler(request)
return response
except NotFoundException as error:
return json_error(message=str(error),title='Not Found',status_code=404)
except AlreadyExistsException as error:
return json_error(message=str(error),title='Conflict',status_code=409)
def json_error(message,title,status_code):
return ConnexionResponse(
body=json.dumps({
'title': title,'detail': message,'status': status_code,}).encode('utf-8'),status_code=status_code,content_type='application/json'
)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。