摘要
跟我一起学Go系列产品:gRPC,解决全局数据传输和请求超时问题。使用Go SDK的Context包在多个GoRoutine之间传递数据。但是,gRPC应用是跨进程的数据传输,需要在启用链上传递上下文。
正文
gRPC 在好几个 GoRoutine 中间传递数据应用的是 Go SDK 给予的 Context 包。有关 Context 的应用能够 看着我以前的一篇文章:Context 应用。
可是 Context 的应用情景是同一个过程内,gRPC 应用全是跨过程的数据传输,假如在某一启用链上 A 服务项目当今要启用 B 服务项目传送一些前后文主要参数而且也期待 B 服务项目再次向下传送该怎样完成呢?
跨过程的全局性传输数据
再度追忆一下 gRPC 是根据 HTTP/2 协议书的。那大家是否能够 再请求头里将这一部分数据信息 set 进来,而不是放到数据文件里边。
gRPC 也是这般完成的。过程间传送界定了一个 metadata 目标,该目标放到 Request-Headers 内:
Requests
Request → Request-Headers *Length-Prefixed-Message EOS
Request-Headers are delivered as HTTP2 headers in HEADERS CONTINUATION frames.
Request-Headers → Call-Definition *Custom-Metadata
Call-Definition → Method Scheme Path TE [Authority] [Timeout] Content-Type [Message-Type] [Message-Encoding] [Message-Accept-Encoding] [User-Agent]
Method → ":method POST"
Scheme → ":scheme " ("http" / "https")
Path → ":path" "/" Service-Name "/" {method name} # But see note below.
Service-Name → {IDL-specific service name}
......
......
......
Custom-Metadata → Binary-Header / ASCII-Header
......
Custom-Metadata 字段名内即是我们要传送的全局性目标。实际文本文档能够 看看吧:PROTOCOL-HTTP2。
因此根据 metadata 我们可以将上一个过程中的全局性目标透传入下一个被启用的过程。查询源代码能够 发觉 metadata 內部事实上是根据一个 map 阿里云oss数据信息:
type MD map[string][]string
metadata 和 Context 一起并用的应用方法以下:
推送方假如想推送一些全局性字段名给接受方,最先从自身端 metadata set 数据信息:
//set 数据信息到 metadata
md := metadata.Pairs("key", "val")
// 新创建一个有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)
留意上边的 NewOutgoingContext() 方式,取名很品牌形象,向外輸出 Context。那麼对端接受的情况下毫无疑问有一个相匹配的方式,大家再次往下看。这一新的 Context 就可以用于推送出来 ,例如或是大家上原文中的实例方式:
//set 数据信息到 metadata
md := metadata.Pairs("key", "val")
// 新创建一个有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)
c = NewTokenServiceClient(conn)
hello, err := c.SayHello(ctx, &PingMessage{Greeting: "hahah"})
if err != nil {
fmt.Printf("could not greet: %v", err)
}
针对接受方而言,只不过便是分析 metadata 中的数据信息。gRPC 早已帮大家将数据信息分析到 context 中,因此必须 从 Context 中取下 MD 目标。
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
fmt.Printf("get metadata error")
}
if t, ok := md["key"]; ok {
fmt.Printf("key from metadata:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
}
这儿取数的逻辑性应用了 metadata 的 FromIncomingContext() 方式。跟存数据信息的 NewOutgoingContext() 方式息息相通。
跨过程的请求超时终止
同过程下跨 Goroutine 大家或是能够 应用 Context 来设定当今 Context 管理方法下子 Goroutine 的有效期限:
//请求超时截至
context.WithTimeout(context.Background(), 100*time.Millisecond)
//限定截至
deadline, c2 := context.WithDeadline(context.Background(), deadline time.Time)
gRPC 中一样完成了这一作用,即跨过程间的 Context 传送完成过程间的 Context 生命期管理方法。大家看一个简易的事例:
服务器端:
package normal
import (
"context"
"fmt"
"Google.golang.org/grpc"
"google.golang.org/grpc/reflection"
pb "gorm-demo/models/pb"
"net"
"testing"
"time"
)
type server struct{}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
time.Sleep(3 * time.Second)
return &pb.HelloReply{Message: "Hello " in.Name}, nil
}
//拦截器 - 打印出日志
func LoggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
fmt.Printf("gRPC method: %s, %v", info.FullMethod, req)
resp, err := handler(ctx, req)
fmt.Printf("gRPC method: %s, %v", info.FullMethod, resp)
return resp, err
}
func TestGrpcServer(t *testing.T) {
// 监视当地的8972端口号
lis, err := net.Listen("tcp", ":8972")
if err != nil {
fmt.Printf("failed to listen: %v", err)
return
}
//申请注册拦截器
s := grpc.NewServer(grpc.UnaryInterceptor(LoggingInterceptor)) // 建立gRPC网络服务器
pb.RegisterGreeterServer(s, &server{}) // 在gRPC服务器端申请注册服务项目
reflection.Register(s) //在给出的gRPC网络服务器上申请注册网络服务器反射面服务项目
// Serve方式在lis上接纳传到联接,为每一个联接建立一个ServerTransport和server的goroutine。
// 该goroutine载入gRPC要求,随后启用已申请注册的程序处理来回应他们。
err = s.Serve(lis)
if err != nil {
fmt.Printf("failed to serve: %v", err)
return
}
}
服务器端编码我们在 SayHello() 方式中提升了 3s 的sleep。手机客户端编码以下:
package normal
import (
"fmt"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "gorm-demo/models/pb"
)
func TestGrpcClient(t *testing.T) {
// 连接网络
conn, err := grpc.Dial(":8972", grpc.WithInsecure())
if err != nil {
fmt.Printf("faild to connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
//timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second*2)
//defer cancelFunc()
m, _ := time.ParseDuration("1s")
result := time.Now().Add(m)
deadline, c2 := context.WithDeadline(context.Background(), result)
defer c2()
// 启用服务器端的SayHello
r, err := c.SayHello(deadline, &pb.HelloRequest{Name: "CN"})
if err != nil {
fmt.Printf("could not greet: %v", err)
}
fmt.Printf("Greeting: %s !\n", r.Message)
}
对于二种情景的请求超时:
//timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second*2)
//defer cancelFunc()
m, _ := time.ParseDuration("1s")
result := time.Now().Add(m)
deadline, c2 := context.WithDeadline(context.Background(), result)
defer c2()
各自干了检测,大伙儿能够 运作一下编码看一下实际效果。都是会见到出错信息内容:
code = DeadlineExceeded desc = context deadline exceeded
因此请求超时操纵能够 根据 Context 来实际操作,无须你自己再去附加敲代码。
关注不迷路
扫码下方二维码,关注宇凡盒子公众号,免费获取最新技术内幕!
评论0