拦截器
1、服务端拦截器
服务端一元拦截器类型为 grpc.UnaryServerInterceptor;流拦截器类型为 StreamServerInterceptor; 下面实现一个服务端一元拦截器:在调用服务前后各打印一条日志
func HelloInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println("Hello")
resp, err = handler(ctx, req)
log.Println("Bye bye")
return
}
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(interceptors.HelloInterceptor()),
}
server := grpc.NewServer(opts...)
2、使用多个拦截器
gRPC 默认只支持设置单个拦截器,设置过个拦截器会painc
panic: The unary server interceptor was already set and may not be reset.
设置多个拦截器可以使用 grpc-ecosystem/go-grpc-middleware
go get -u github.com/grpc-ecosystem/go-grpc-middleware@latest
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
interceptors.HelloInterceptor(),
interceptors.DurationInterceptor(),
)),
}
3、客户端拦截器 超时控制实现
超时拦截器
func TimeoutInterceptor(second) grpc.UnaryClientInterceptor {
return func(ctx context.Context,
method string,
req,
reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption) error {
if _, ok := ctx.Deadline(); !ok {
timeout := 3 * time.Second
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
return invoker(ctx, method, req, reply, cc, opts...)
}
}
设置客户端拦截器
opt := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(interceptors.TimeoutInterceptor(time.Second)),
}
conn, _ := grpc.Dial(":"+port, opt...)
4、服务端超时拦截器
服务端拦截器可以参考 go-zero 中拦截器的实现代码:定义done, paincChan 两个channel;单独开一个goroutine执行 handler 逻辑并处理panic;之后通过select 语句阻塞监听 ctx.Done;done 和panicChan;
func UnaryTimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
var cancel context.CancelFunc
if _, ok := ctx.Deadline(); !ok {
ctx, cancel = context.WithTimeout(ctx, timeout)
}
if cancel != nil {
defer cancel()
}
var resp interface{}
var err error
var lock sync.Mutex
done := make(chan struct{})
// create channel with buffer size 1 to avoid goroutine leak
panicChan := make(chan interface{}, 1)
go func() {
defer func() {
if p := recover(); p != nil {
// attach call stack to avoid missing in different goroutine
panicChan <- fmt.Sprintf("%+v\n\n%s", p, strings.Trimspace(string(debug.Stack())))
}
}()
lock.Lock()
defer lock.Unlock()
resp, err = handler(ctx, req)
close(done)
}()
select {
case p := <-panicChan:
panic(p)
case <-done:
lock.Lock()
defer lock.Unlock()
return resp, err
case <-ctx.Done():
err := ctx.Err()
if err == context.Canceled {
err = status.Error(codes.Canceled, err.Error())
} else if err == context.DeadlineExceeded {
err = status.Error(codes.DeadlineExceeded, err.Error())
}
return nil, err
}
}
Metadata
在gRPC中,Metadata实际上就是-一个 map结构:
type MD map [string][] string
本质.上也可以通过Header来传递数据
1、设置Metadata
md := Metadata.New(map[string]string{
"auth": "golang",
})
mdCtx := Metadata.NewOutgoingContext(ctx, md)
2、读取Metadata
md, b := Metadata.FromIncomingContext(ctx)
if b {
log.Printf("Metadata:%v", md)
if auth, ok := md["auth"]; ok {
log.Printf("auth:%v", auth)
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。