GRPC (2): 四种通信模式
上篇文章我们介绍了gRPC 的基本概念,今天实现一个完整的 gRPC 服务,包括 proto 文件的定义,客户端和服务端代码的生成以及业务逻辑代码的补充。
GRPC 四种通信模式
普通模式(unary RPC)
假如我们需要构建一个订单管理系统,这个系统为用户提供了订单查询的接口,每次用户输入订单号,便会返回对应的订单信息。每个请求独立,且响应和请求一一对应,这就是简单的 RPC 模式,对于大多数业务场景均可以适用。
service MysqlService {
…
rpc SelectRecord(MysqlReq) returns (MysqlRes) {};
…
}
服务端流模式(server-streaming RPC)
假如我们要创建一个订单的信息的缓存池,实现订单的高效查询,缓存服务启动的时候就需要请求服务端,同步服务端的订单信息。这种情况下,我们只需要请求一次,服务端便会持续的为我们提供同步订单信息,这就是服务端流 RPC 的场景。
和简单的 RPC 模式不同,服务端流 RPC 模式,客户端发起一个 RPC 请求,服务端会返回一系列的响应结果,发送完所有的响应后,服务端在流结尾标记服务状态详情作为结束的元数据。
service MysqlService {
…
rpc DeleteRecord(MysqlReq) returns (stream MysqlRes) {}
…
}
客户端流模式(client-streaming RPC)
假设以下场景,有一个服务每天需要定时和订单管理系统更新订单的最终状态,服务端只需要在最后告诉该服务,最终的处理结果(哪些更新成功,哪些失败)。不需要频繁和服务端建立和断开连接。从而可以降低服务端的并发节省连接资源。
与服务端流模式类似,客户端流会发送连续的更新请求给服务端,服务端在收到请求后不会立马给到客户端响应结果,直到请求结束了,才会返回一个单独的响应。
service MysqlService {
…
rpc InsertRecord(stream MysqlReq) returns (MysqlRes) {};
…
}
双向流模式(bidirectional-streaming RPC)
同样以上面的订单更新的服务为例子,每天定时向管理系统推送一批订单进行更新,假如这个批订单有上千万的量,而且订单更新服务之后还需要和其他系统进行数据同步,这个时候使用客户端流模式显然不是那么合适,我们可以采用双向流模式,不断地推送订单请求给管理系统,服务端在收到请求,处理完立马返回处理结果给客户端,直到所有的请求处理结束。这种场景就是双向流模式。
service MysqlService {
…
rpc UpdateRecord(stream MysqlReq) returns (stream MysqlRes) {};
…
}
一个简单的 GRPC 服务实现
这个服务为客户端提供 MYSQL 增删改查的 gRPC 接口。
项目目录结构:
_01_grpc_demo:
-> conf # 数据库连接配置
-> cmd # 测试样例
-> pb # proto && pb.go 文件
-> client.go # grpc 客户端代码
-> server.go # grpc 服务端代码
-> Makefile # 命令脚本
代码实现
mysql.proto
syntax = “proto3”;
package _01_grpc_demo;
option go_package = “./;pb”;
message MysqlReq {
string sql = 1;
}
message MysqlRes {
uint32 code = 1;
string msg = 2;
string data = 3;
}
service MysqlService {
rpc SelectRecord(MysqlReq) returns (MysqlRes) {};
rpc InsertRecord(stream MysqlReq) returns (MysqlRes) {};
rpc DeleteRecord(MysqlReq) returns (stream MysqlRes) {}
rpc UpdateRecord(stream MysqlReq) returns (stream MysqlRes) {};
}
server.go
package _01_grpc_demo
import (
“context”
“encoding/json”
“errors”
“fmt”
“io”
“log”
“_01_grpc_demo/conf”
“_01_grpc_demo/pb”
)
type MysqlServer struct {}
func NewMysqlServer() *MysqlServer {
return &MysqlServer{}
}
func (*MysqlServer) SelectRecord(ctx context.Context, req *pb.MysqlReq) (*pb.MysqlRes, error) {
sql := req.GetSql()
log.Printf(“receives a select record request with sql : %s”, sql)
// test sleep
// time.Sleep(time.Second * 5)
if err := ctx.Err(); err != nil {
return nil, err
}
users, res := make([]*conf.User, 0), &pb.MysqlRes{}
if err := conf.DB.Raw(sql).Scan(&users).Error; err != nil {
return nil, err
}
rows, err := json.Marshal(users)
if err != nil {
return nil, err
}
res.Code = 200
res.Msg = “select success”
res.Data = string(rows)
return res, nil
}
func (*MysqlServer) InsertRecord(stream pb.MysqlService_InsertRecordServer) error {
var sql string
for {
req, err := stream.Recv()
if err == io.EOF || req == nil{
log.Printf(“read ends”)
break
}
if len(sql) == 0 {
sql = req.Sql
} else {
sql = fmt.Sprintf(“%s;%s”, sql, req.Sql)
}
}
log.Printf(“receives a insert record request with sql : %s”, sql)
// test sleep
// time.Sleep(time.Second * 5)
res := &pb.MysqlRes{
Code: 200,
Msg: fmt.Sprintf(“insert records sql [%s] succ”, sql),
Data: _empty,
}
if err := conf.DB.Exec(sql).Error; err != nil {
res.Code = 500
res.Msg = fmt.Sprintf(“exec %s err [%s]”, sql, err)
return err
}
// send res
if err := stream.SendAndClose(res); err != nil {
errs := fmt.Sprintf(“send res [%s] err [%s]”, res.Msg, err)
return errors.New(errs)
}
return nil
}
func (*MysqlServer) DeleteRecord(
req *pb.MysqlReq,
stream pb.MysqlService_DeleteRecordServer,
) error {
sql := req.GetSql()
log.Printf(“receives a delete record request with sql : %s”, sql)
// test sleep
// time.Sleep(time.Second * 5)
res := &pb.MysqlRes{
Code: 200,
Msg: fmt.Sprintf(“delete records sql [%s] succ”, sql),
Data: _empty,
}
if err := conf.DB.Exec(sql).Error; err != nil {
res.Code = 500
res.Msg = fmt.Sprintf(“exec %s err [%s]”, sql, err)
}
// send res
if err := stream.Send(res); err != nil {
resBytes, _ := json.Marshal(res)
return fmt.Errorf(“send res [%s] to stream err [%s]”, string(resBytes), err)
}
return nil
}
func (*MysqlServer) UpdateRecord(stream pb.MysqlService_UpdateRecordServer) error {
var sql string
for {
req, err := stream.Recv()
if err == io.EOF || req == nil{
log.Printf(“read ends\n”)
break
}
if len(sql) == 0 {
sql = req.Sql
} else {
sql = fmt.Sprintf(“%s;%s”, sql, req.Sql)
}
}
log.Printf(“receives a update record request with sql : %s\n”, sql)
res := &pb.MysqlRes{
Code: 200,
Msg: fmt.Sprintf(“update records sql [%s] succ”, sql),
Data: _empty,
}
if err := conf.DB.Exec(sql).Error; err != nil {
res.Code = 500
res.Msg = fmt.Sprintf(“exec %s err [%s]”, sql, err)
}
if err := stream.Send(res); err != nil {
resBytes, _ := json.Marshal(res)
return fmt.Errorf(“send res [%s] to stream err [%s]”, string(resBytes), err)
}
return nil
}
client.go
package _01_grpc_demo
import (
“context”
“encoding/json”
“fmt”
“io”
“log”
“time”
“google.golang.org/grpc”
“_01_grpc_demo/pb”
)
const (
_empty = “”
)
type MysqlClient struct {
service pb.MysqlServiceClient
}
func NewMysqlClient(cc *grpc.ClientConn) *MysqlClient {
service := pb.NewMysqlServiceClient(cc)
return &MysqlClient{service}
}
func (mysqlClient *MysqlClient) SelectRecord(sql string) string {
req := &pb.MysqlReq{
Sql: sql,
}
// set timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
res, err := mysqlClient.service.SelectRecord(ctx, req)
if err != nil {
log.Fatal(“select record err “, err)
return _empty
}
return fmt.Sprintf(“select record success %s”, res.GetData())
}
func (mysqlClient *MysqlClient) InsertRecord(sql string) string {
req := &pb.MysqlReq{
Sql: sql,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := mysqlClient.service.InsertRecord(ctx)
if err != nil {
log.Fatal(“insert record err “, err)
return _empty
}
if err = stream.Send(req); err != nil {
log.Fatal(“send req err “, err)
return _empty
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatal(“receive res err “, err)
return _empty
}
resBytes, _ := json.Marshal(res)
return string(resBytes)
}
func (mysqlClient *MysqlClient) DeleteRecord(sql string) string {
req := &pb.MysqlReq{
Sql: sql,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := mysqlClient.service.DeleteRecord(ctx, req)
if err != nil {
log.Fatal(“delete record err “, err)
return _empty
}
resBytes := make([]byte, 0)
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(“cannot receive response: “, err)
return _empty
}
resBytes, _ = json.Marshal(res)
}
return string(resBytes)
}
func (mysqlClient *MysqlClient) UpdateRecord(sql string) string {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := mysqlClient.service.UpdateRecord(ctx)
if err != nil {
log.Fatal(“update record err “, err)
return _empty
}
waitResponse := make(chan error)
// go routine to receive responses
go func() {
for {
res, err := stream.Recv()
if err == io.EOF {
log.Print(“no more responses”)
waitResponse <- nil
return
}
if err != nil {
waitResponse <- fmt.Errorf("cannot receive stream response: %v", err)
return
}
log.Print("received response: ", res)
}
}()
req := &pb.MysqlReq{
Sql: sql,
}
err = stream.Send(req)
if err != nil {
return fmt.Sprintf("send stream err: %s", err)
}
err = stream.CloseSend()
if err != nil {
return fmt.Sprintf("cannot close send: %s", err)
}
err = <-waitResponse
return fmt.Sprintf("%s0000", err)
}
makeFile
clean:
rm pb/*.go
gen:
protoc --plugin=protoc-gen-go=/d/workspace/golang/bin/protoc-gen-go.exe --go_out=plugins=grpc:pb --proto_path=pb pb/*.proto
server:
go run cmd/server/main.go -port 8080
client:
go run cmd/client/main.go -address 0.0.0.0:8080