gRPC(Go)入门教程(八)—使用context进行超时控制
1. 概述
gRPC 系列相关代码见 Github
通过 ctx 完成 cancel 和 deadline 功能。
Go 语言中可以通过 ctx 来控制各个 Goroutine,调用 cancel 函数,则该 ctx 上的各个子 Goroutine 都会被一并取消。
gRPC 中同样实现了该功能,在调用方法的时候可以传入 ctx 参数。
gRPC 会通过 HTTP2 HEADERS Frame 来传递相关信息。
2. deadline
gRPC 提倡TL;DR: Always set a deadline
Deadlines 允许gRPC 客户端设置自己等待多长时间来完成 RPC 操作,直到出现这个错误 DEADLINE_EXCEEDED。但是在正常情况下默认设置是一个很大的数值。
如果不设置截止日期时,如果出现阻塞,那么所有的请求可能在最大请求时间过后才超时,最终可能导致资源被耗尽。
由于类似的问题,在高并发的时候导致了一次事故,具体看数据库连接池该设置多大?记一次由连接池引发的事故
Server
如果客户端传来的消息时 delay 则 sleep 两秒,如果是带[propagate me]前缀的消息则由服务端在延迟 800ms 后发起一次 RPC 调用。
func (s *server) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { message := req.Message if strings.HasPrefix(message, "[propagate me]") { time.Sleep(800 * time.Millisecond) message = strings.TrimPrefix(message, "[propagate me]") return s.client.UnaryEcho(ctx, &pb.EchoRequest{Message: message}) } if message == "delay" { time.Sleep(2 * time.Second) } return &pb.EchoResponse{Message: req.Message}, nil } func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error { for { req, err := stream.Recv() if err == io.EOF { return status.Error(codes.InvalidArgument, "request message not received") } if err != nil { return err } message := req.Message if strings.HasPrefix(message, "[propagate me]") { time.Sleep(800 * time.Millisecond) message = strings.TrimPrefix(message, "[propagate me]") res, err := s.client.UnaryEcho(stream.Context(), &pb.EchoRequest{Message: message}) if err != nil { return err } stream.Send(res) } if message == "delay" { time.Sleep(2 * time.Second) } stream.Send(&pb.EchoResponse{Message: message}) } }
Client
客户端则是为每次 RPC 调用都指定超时时间为 1秒。
package main
import ( "context" "flag" "fmt" "log" "time" pb "github.com/lixd/grpc-go-example/features/proto/echo" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) var addr = flag.String("addr", "localhost:50051", "the address to connect to") func unaryCall(c pb.EchoClient, requestID int, message string, want codes.Code) { // 每次都指定1秒超时 ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() req := &pb.EchoRequest{Message: message} _, err := c.UnaryEcho(ctx, req) got := status.Code(err) fmt.Printf("[%v] wanted = %v, got = %v\n", requestID, want, got) } func streamingCall(c pb.EchoClient, requestID int, message string, want codes.Code) { // 每次都指定1秒超时 ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() stream, err := c.BidirectionalStreamingEcho(ctx) if err != nil { log.Printf("Stream err: %v", err) return } err = stream.Send(&pb.EchoRequest{Message: message}) if err != nil { log.Printf("Send error: %v", err) return } _, err = stream.Recv() got := status.Code(err) fmt.Printf("[%v] wanted = %v, got = %v\n", requestID, want, got) } func main() { flag.Parse() conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewEchoClient(conn) unaryCall(c, 1, "world", codes.OK) unaryCall(c, 2, "delay", codes.DeadlineExceeded) unaryCall(c, 3, "[propagate me]world", codes.OK) unaryCall(c, 4, "[propagate me][propagate me]world", codes.DeadlineExceeded) streamingCall(c, 5, "[propagate me]world", codes.OK) streamingCall(c, 6, "[propagate me][propagate me]world", codes.DeadlineExceeded) }
Run
lixd@17x:~/17x/projects/grpc-go-example/features/deadline/server$ go run main.go server listening at port [::]:50051
lixd@17x:~/17x/projects/grpc-go-example/features/deadline/client$ go run main.go [1] wanted = OK, got = OK [2] wanted = DeadlineExceeded, got = DeadlineExceeded [3] wanted = OK, got = OK [4] wanted = DeadlineExceeded, got = DeadlineExceeded [5] wanted = OK, got = OK [6] wanted = DeadlineExceeded, got = DeadlineExceeded
其中请求 2 是传递的 delay 消息服务端会 sleep 两秒,所以触发 deadline,请求4和6 由于有两个[propagate me]前缀,所以会传递两轮,每次 sleep 800ms,再第二轮的时候也会触发 deadline。
请求1为正常请求,请求3和5只传递一轮,只 sleep 800ms 所以没有触发 deadline。
3. cancel
除了等待 deadline 超时之外,客户端还可以主动调用 cancel 取消本次请求。
比如在某次调用中,客户端某个环节报错导致本次请求已经可以直接返回了,这时候在等待服务端返回已经没有意义了。此时就可以直接调用 cancel 取消本次请求,而不是让服务端一直等待到超时才返回。
Server
package main import ( "flag" "fmt" "io" "log" "net" pb "github.com/lixd/grpc-go-example/features/proto/echo" "google.golang.org/grpc" ) var port = flag.Int("port", 50051, "the port to serve on") type server struct { pb.UnimplementedEchoServer } func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error { for { in, err := stream.Recv() if err != nil { fmt.Printf("server: error receiving from stream: %v\n", err) if err == io.EOF { return nil } return err } fmt.Printf("echoing message %q\n", in.Message) stream.Send(&pb.EchoResponse{Message: in.Message}) } } func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } fmt.Printf("server listening at port %v\n", lis.Addr()) s := grpc.NewServer() pb.RegisterEchoServer(s, &server{}) s.Serve(lis) }
Client
package main import ( "context" "flag" "fmt" "log" "time" pb "github.com/lixd/grpc-go-example/features/proto/echo" "google.golang.org/grpc" ) var addr = flag.String("addr", "localhost:50051", "the address to connect to") func sendMessage(stream pb.Echo_BidirectionalStreamingEchoClient, msg string) error { fmt.Printf("sending message %q\n", msg) return stream.Send(&pb.EchoRequest{Message: msg}) } func recvMessage(stream pb.Echo_BidirectionalStreamingEchoClient) { res, err := stream.Recv() if err != nil { fmt.Printf("stream.Recv() returned error %v\n", err) return } fmt.Printf("received message %q\n", res.GetMessage()) } func main() { flag.Parse() // 建立连接 conn, err := grpc.Dial(*addr, grpc.WithInsecure()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewEchoClient(conn) // 初始化一个带取消功能的ctx ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) stream, err := c.BidirectionalStreamingEcho(ctx) if err != nil { log.Fatalf("error creating stream: %v", err) } // 正常发送消息 if err := sendMessage(stream, "hello"); err != nil { log.Fatalf("error sending on stream: %v", err) } if err := sendMessage(stream, "world"); err != nil { log.Fatalf("error sending on stream: %v", err) } // 正常接收消息 recvMessage(stream) recvMessage(stream) // 这里调用cancel方法取消 ctx fmt.Println("cancelling context") cancel() // 再次发送消息 这里是否会报错取决于ctx是否检测到前面发送的取消命令(cancel()) if err := sendMessage(stream, "world"); err != nil { log.Printf("error sending on stream: %v", err) } // 这里一定会报错 recvMessage(stream) }
4. 小结
不管是 cancel 和 deadline 都只需调用方传递对应的 ctx 即可。gRPC 中已经做了对应的实现,所以使用起来和在 Goroutine 中传递 ctx 没有太大的区别。
ctx 可以使用context.WithDeadline()或者context.WithTimeout(),二者效果类似,只是传递的参数不一样。
timeout 只能设置在某一段时间后超时,比如3秒后超时,deadline 则可以设置到具体某个时间点,比如在8点10分20秒的时候返回。类似于 Redis 中的 Expire 和 ExpireAt。