微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

gRPC快速体验3:拦截器 与 Metadata

拦截

根据RPC调用类型可以将gRPC拦截器分为两种:

1、服务端拦截

服务端一元拦截器类型为 grpc.UnaryServerInterceptor;流拦截器类型为 StreamServerInterceptor; 下面实现一个服务端一元拦截器:在调用服务前后各打印一条日志

func HelloInterceptor() grpc.UnaryServerInterceptor {
	return func(ctx context.Context,
		req interface{},
		info *grpc.UnaryServerInfo,
		handler grpc.UnaryHandler) (resp interface{}, err error) {
		log.Println("Hello")
		resp, err = handler(ctx, req)
		log.Println("Bye bye")
		return
	}
}

在服务端代码中配置拦截器;

opts := []grpc.ServerOption{
		grpc.UnaryInterceptor(interceptors.HelloInterceptor()),
	}
server := grpc.NewServer(opts...)

2、使用多个拦截

gRPC 认只支持设置单个拦截器,设置过个拦截器会painc

panic: The unary server interceptor was already set and may not be reset.

设置多个拦截器可以使用 grpc-ecosystem/go-grpc-middleware

go get -u github.com/grpc-ecosystem/go-grpc-middleware@latest

opts := []grpc.ServerOption{
		grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
			interceptors.HelloInterceptor(),
			interceptors.DurationInterceptor(),
		)),
	}

3、客户端拦截器 超时控制实现

超时拦截

func TimeoutInterceptor(second) grpc.UnaryClientInterceptor {
	return func(ctx context.Context,
		method string,
		req,
		reply interface{},
		cc *grpc.ClientConn,
		invoker grpc.UnaryInvoker,
		opts ...grpc.CallOption) error {
		if _, ok := ctx.Deadline(); !ok {
			timeout := 3 * time.Second
			var cancel context.CancelFunc
			ctx, cancel = context.WithTimeout(ctx, timeout)
			defer cancel()
		}
		return invoker(ctx, method, req, reply, cc, opts...)
	}
}

设置客户端拦截



opt := []grpc.DialOption{
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithUnaryInterceptor(interceptors.TimeoutInterceptor(time.Second)),
	}
	conn, _ := grpc.Dial(":"+port, opt...)

4、服务端超时拦截

服务端拦截器可以参考 go-zero拦截器的实现代码:定义done, paincChan 两个channel;单独开一个goroutine执行 handler 逻辑并处理panic;之后通过select 语句阻塞监听 ctx.Done;done 和panicChan;

func UnaryTimeoutInterceptor(timeout time.Duration) grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
		handler grpc.UnaryHandler) (interface{}, error) {
		var cancel context.CancelFunc
		if _, ok := ctx.Deadline(); !ok {
			ctx, cancel = context.WithTimeout(ctx, timeout)
		}
		if cancel != nil {
			defer cancel()
		}

		var resp interface{}
		var err error
		var lock sync.Mutex
		done := make(chan struct{})
		// create channel with buffer size 1 to avoid goroutine leak
		panicChan := make(chan interface{}, 1)
		go func() {
			defer func() {
				if p := recover(); p != nil {
					// attach call stack to avoid missing in different goroutine
					panicChan <- fmt.Sprintf("%+v\n\n%s", p, strings.Trimspace(string(debug.Stack())))
				}
			}()

			lock.Lock()
			defer lock.Unlock()
			resp, err = handler(ctx, req)
			close(done)
		}()

		select {
		case p := <-panicChan:
			panic(p)
		case <-done:
			lock.Lock()
			defer lock.Unlock()
			return resp, err
		case <-ctx.Done():
			err := ctx.Err()
			if err == context.Canceled {
				err = status.Error(codes.Canceled, err.Error())
			} else if err == context.DeadlineExceeded {
				err = status.Error(codes.DeadlineExceeded, err.Error())
			}
			return nil, err
		}
	}

Metadata

在gRPC中,Metadata实际上就是-一个 map结构:

type MD map [string][] string

本质.上也可以通过Header来传递数据
1、设置Metadata

md := Metadata.New(map[string]string{
		"auth": "golang",
	})
mdCtx := Metadata.NewOutgoingContext(ctx, md)

2、读取Metadata

md, b := Metadata.FromIncomingContext(ctx)
if b {
    log.Printf("Metadata:%v", md)
    if auth, ok := md["auth"]; ok {
        log.Printf("auth:%v", auth)
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐