如何解决如何将grpc.ServerInterceptor转换为grcp.aio.ServerInterceptor
我正在尝试实现异步ServerInterceptor [grcp.aio.ServerInterceptor]。我当前的同步ServerInterceptor看起来像这样https://github.com/zhyon404/python-grpc-prometheus/blob/master/python_grpc_prometheus/prometheus_server_interceptor.py#L48。当我尝试使用grpc.aio.ServerInterceptor并启动服务器时, 我的服务器代码
from grpc_opentracing import open_tracing_server_interceptor
from grpc_opentracing.grpcext import intercept_server
import PromServerInterceptor
class MyServicer():
async def _start_async_server(self,tracer=None,service,grpc_port=8083,http_port=8080):
tracing_interceptor = open_tracing_server_interceptor(tracer)
server = aio.server(nterceptors=(PromServerInterceptor(),))
server = intercept_server(server,tracing_interceptor)
my_service_pb2_grpc.add_MyServicer_to_server(service,server)
server.add_insecure_port("[::]:" + str(grpc_port))
await server.start()
logger.info("Started prometheus server at port %s",http_port)
prometheus_client.start_http_server(http_port)
await server.wait_for_termination()
def async_serve(self,http_port=8080):
loop = asyncio.get_event_loop()
loop.create_task(self._start_async_server(service,tracer,grpc_port,http_port))
loop.run_forever()
Following are the lib versions i am using:
grpcio=1.32.0
grpcio-opentracing==1.1.4
I see the following error:
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi",line 646,in grpc._cython.cygrpc._handle_exceptions
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi",line 745,in _handle_rpc
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi",line 511,in _handle_unary_unary_rpc
File "src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi",line 368,in _finish_handler_with_unary_response
File "prometheus_server_interceptor.py",line 93,in new_behavior
rsp = await behavior(request_or_iterator,service_context)
File "/anaconda3/lib/python3.7/site-packages/grpc_opentracing/grpcext/_interceptor.py",line 272,in adaptation
_UnaryServerInfo(self._method),handler)
File "/anaconda3/lib/python3.7/site-packages/grpc_opentracing/_server.py",line 145,in intercept_unary
timeout=servicer_context.time_remaining(),AttributeError: 'grpc._cython.cygrpc._ServicerContext' object has no attribute 'time_remaining
以下是我的PromServerInterceptor实现:
from grpc import aio
import grpc
from timeit import default_timer
from python_grpc_prometheus.server_metrics import (SERVER_HANDLED_LATENCY_SECONDS,SERVER_HANDLED_COUNTER,SERVER_STARTED_COUNTER,SERVER_MSG_RECEIVED_TOTAL,SERVER_MSG_SENT_TOTAL)
from python_grpc_prometheus.util import type_from_method
from python_grpc_prometheus.util import code_to_string
def _wrap_rpc_behavior(handler,fn):
if handler is None:
return None
if handler.request_streaming and handler.response_streaming:
behavior_fn = handler.stream_stream
handler_factory = grpc.stream_stream_rpc_method_handler
elif handler.request_streaming and not handler.response_streaming:
behavior_fn = handler.stream_unary
handler_factory = grpc.stream_unary_rpc_method_handler
elif not handler.request_streaming and handler.response_streaming:
behavior_fn = handler.unary_stream
handler_factory = grpc.unary_stream_rpc_method_handler
else:
behavior_fn = handler.unary_unary
handler_factory = grpc.unary_unary_rpc_method_handler
return handler_factory(fn(behavior_fn,handler.request_streaming,handler.response_streaming),request_deserializer=handler.request_deserializer,response_serializer=handler.response_serializer)
def split_call_details(handler_call_details,minimum_grpc_method_path_items=3):
parts = handler_call_details.method.split("/")
if len(parts) < minimum_grpc_method_path_items:
return '','',False
grpc_service,grpc_method = parts[1:minimum_grpc_method_path_items]
return grpc_service,grpc_method,True
class PromServerInterceptor(aio.ServerInterceptor):
async def intercept_service(self,continuation,handler_call_details):
handler = await continuation(handler_call_details)
if handler is None:
return handler
# only support unary
if handler.request_streaming or handler.response_streaming:
return handler
grpc_service,ok = split_call_details(handler_call_details)
if not ok:
return continuation(handler_call_details)
grpc_type = type_from_method(handler.request_streaming,handler.response_streaming)
SERVER_STARTED_COUNTER.labels(
grpc_type=grpc_type,grpc_service=grpc_service,grpc_method=grpc_method).inc()
def latency_wrapper(behavior,request_streaming,response_streaming):
async def new_behavior(request_or_iterator,service_context):
start = default_timer()
SERVER_MSG_RECEIVED_TOTAL.labels(
grpc_type=grpc_type,grpc_method=grpc_method
).inc()
# default
code = code_to_string(grpc.StatusCode.UNKNOWN)
try:
rsp = await behavior(request_or_iterator,service_context)
if service_context._state.code is None:
code = code_to_string(grpc.StatusCode.OK)
else:
code = code_to_string(service_context._state.code)
SERVER_MSG_SENT_TOTAL.labels(
grpc_type=grpc_type,grpc_method=grpc_method
).inc()
return rsp
except grpc.RpcError as e:
if isinstance(e,grpc.Call):
code = code_to_string(e.code())
raise e
finally:
SERVER_HANDLED_COUNTER.labels(
grpc_type=grpc_type,grpc_method=grpc_method,grpc_code=code
).inc()
SERVER_HANDLED_LATENCY_SECONDS.labels(
grpc_type=grpc_type,grpc_method=grpc_method).observe(max(default_timer() - start,0))
return new_behavior
return _wrap_rpc_behavior(handler,latency_wrapper)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。