通过使用流 stream ,我们可以向服务器或者客户端发送批量数据,服务器和客户端在接收这些数据的时候,可以不必等所有的消息全接收后才开始响应,而是接收到第一条消息的时候就可以及时的响应。
1. 服务端流 RPC
在简单 RPC 模式中, gRPC 服务器端和 gRPC 客户端在通信时始终只有一个请求和一个响应。在服务器端流 RPC 模式中,服务器端在接收到客户端的请求消息后,会发回一个响应的序列。这种多个响应所组成的序列也被称为“流”。在将所有的服务器端响应发送完毕之后,服务器端会以 trailer 元数据的形式将其状态发送给客户端,从而标记流的结束。
假设需要实现一个订单搜索功能,利用该功能,只要提供一个搜索词就能得到匹配的结果,如图所 示。订单管理服务不会将所有匹配的订单一次性地发送给客户端,而是在找到匹配的订单时,逐步将其发送出去。这意味着当订单服务的客户端发出一个请求之后,会接收到多条响应消息。
2. 客户端流 RPC
在客户端流 RPC 模式中,客户端会发送多个请求给服务器端,而不再是单个请求。服务器端则会发送一个响应给客户端。但是,服务器端不一定要等到从客户端接收到所有消息后才发送响应。基于这样的逻辑,我们可以在接收到流中的一条消息或几条消息之后就发送响应,也可以在读取完流中的所有消息之后再发送响应。
假设希望在订单管理服务中添加新的 updateOrders 方法,从而更新一个订单集合,如图所示。在这 里,我们想以消息流的形式发送订单列表到服务器端,服务器端会处理这个流并发送一条带有已更新订单状态的消息给客户端。
3. 双向流 RPC
在双向流 RPC 模式中,客户端以消息流的形式发送请求到服务器端,服务器端也以消息流的形式进行响应。调用必须由客户端发起,但在此之后,通信完全基于 gRPC 客户端和服务器端的应用程序逻辑。
下面通过一个示例来进一步了解双向流 RPC 模式。如图所示,在订单管理服务服务用例中,假设需要一个订单处理功能,通过该功能,用户可以发送连续的订单集合(订单流),并根据投递地址对它们进行组合发货,也就是说,订单要根据投递目的地进行组织和发货。
双向流 RPC 和客户端流 RPC 调用的区别在于:
- 客户端流
RPC 调用会先将客户端的请求以流的形式发送完毕,再获得服务端的响应; - 双向流
RPC 调用中,客户端发送一个请求数据后,立马就可以获取对应的服务器端响应;
4. 目录结构
wohu@wohu-dev:~/goProject/rpcDemo$ tree
.
├── bin
│ ├── client
│ └── server
├── client
│ └── client.go
├── go.mod
├── go.sum
├── pb
│ ├── run.sh
│ ├── string.pb.go
│ └── string.proto
├── server
│ └── server.go
└── service
└── service.go
5 directories, 10 files
wohu@wohu-dev:~/goProject/rpcDemo$
5. proto 定义
string.proto 定义:
syntax = "proto3";
package pb;
option go_package = "./;pb";
service StringService{
rpc Concat(StringRequest) returns (StringResponse) {}
rpc Diff(StringRequest) returns (StringResponse) {}
rpc LotsOfServerStream(StringRequest) returns (stream StringResponse) {}
rpc LotsOfClientStream(stream StringRequest) returns (StringResponse) {}
rpc LotsOfServerAndClientStream(stream StringRequest) returns (stream StringResponse) {}
}
message StringRequest {
string A = 1;
string B = 2;
}
message StringResponse {
string Ret = 1;
string err = 2;
}
生成代码命令
#!/bin/bash
protoc -I=./ --go_out=plugins=grpc:. ./string.proto
6. 服务端代码
server.go 内容
package main
import (
"flag"
"log"
"rpcDemo/pb"
"rpcDemo/service"
"net"
"google.golang.org/grpc"
)
func main() {
flag.Parse()
lis, err := net.Listen("tcp", "127.0.0.1:1234")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
stringService := new(service.StringService)
pb.RegisterStringServiceServer(grpcServer, stringService)
grpcServer.Serve(lis)
}
service.go 内容
package service
import (
"context"
"errors"
"io"
"log"
"strings"
"time"
stream_pb "rpcDemo/pb"
)
const (
StrMaxSize = 1024
)
var (
ErrMaxSize = errors.New("maximum size of 1024 bytes exceeded")
ErrStrValue = errors.New("maximum size of 1024 bytes exceeded")
)
type StringService struct{}
func (s *StringService) Concat(ctx context.Context, req *stream_pb.StringRequest) (*stream_pb.StringResponse, error) {
if len(req.A)+len(req.B) > StrMaxSize {
response := stream_pb.StringResponse{Ret: ""}
return &response, nil
}
response := stream_pb.StringResponse{Ret: req.A + req.B}
return &response, nil
}
func (s *StringService) Diff(ctx context.Context, req *stream_pb.StringRequest) (*stream_pb.StringResponse, error) {
if len(req.A) < 1 || len(req.B) < 1 {
response := stream_pb.StringResponse{Ret: ""}
return &response, nil
}
res := ""
if len(req.A) >= len(req.B) {
for _, char := range req.B {
if strings.Contains(req.A, string(char)) {
res = res + string(char)
}
}
} else {
for _, char := range req.A {
if strings.Contains(req.B, string(char)) {
res = res + string(char)
}
}
}
response := stream_pb.StringResponse{Ret: res}
return &response, nil
}
func (s *StringService) LotsOfServerStream(req *stream_pb.StringRequest, qs stream_pb.StringService_LotsOfServerStreamServer) error {
response := stream_pb.StringResponse{Ret: req.A + req.B}
for i := 0; i < 10; i++ {
time.Sleep(2 * time.Second)
qs.Send(&response)
}
return nil
}
func (s *StringService) LotsOfClientStream(qs stream_pb.StringService_LotsOfClientStreamServer) error {
var params []string
for {
in, err := qs.Recv()
if err == io.EOF {
qs.SendAndClose(&stream_pb.StringResponse{Ret: strings.Join(params, "-")})
return nil
}
if err != nil {
log.Printf("failed to recv: %v", err)
return err
}
params = append(params, in.A, in.B)
}
}
func (s *StringService) LotsOfServerAndClientStream(qs stream_pb.StringService_LotsOfServerAndClientStreamServer) error {
for {
in, err := qs.Recv()
if err == io.EOF {
return nil
}
if err != nil {
log.Printf("failed to recv %v", err)
return err
}
qs.Send(&stream_pb.StringResponse{Ret: in.A + in.B})
}
}
7. 客户端代码
client.go 代码
package main
import (
"context"
"fmt"
"io"
"log"
"strconv"
stream_pb "rpcDemo/pb"
"google.golang.org/grpc"
)
func main() {
serviceAddress := "127.0.0.1:1234"
conn, err := grpc.Dial(serviceAddress, grpc.WithInsecure())
if err != nil {
panic("connect error")
}
defer conn.Close()
sendClientAndServerStreamRequest(conn)
}
func simpleRPC(conn *grpc.ClientConn) {
bookClient := stream_pb.NewStringServiceClient(conn)
stringReq := &stream_pb.StringRequest{A: "A", B: "B"}
reply, _ := bookClient.Concat(context.Background(), stringReq)
fmt.Printf("StringService Concat : %s concat %s = %s\n", stringReq.A, stringReq.B, reply.Ret)
}
func sendServerStreamRequest(conn *grpc.ClientConn) {
fmt.Printf("test sendServerStreamRequest \n")
stringClient := stream_pb.NewStringServiceClient(conn)
stringReq := &stream_pb.StringRequest{A: "A", B: "B"}
stream, _ := stringClient.LotsOfServerStream(context.Background(), stringReq)
for {
item, stream_error := stream.Recv()
if stream_error == io.EOF {
break
}
if stream_error != nil {
log.Printf("failed to recv: %v", stream_error)
}
fmt.Printf("StringService Concat : %s concat %s = %s\n", stringReq.A, stringReq.B, item.GetRet())
}
}
func sendClientStreamRequest(conn *grpc.ClientConn) {
fmt.Printf("test sendClientStreamRequest \n")
client := stream_pb.NewStringServiceClient(conn)
stream, err := client.LotsOfClientStream(context.Background())
for i := 0; i < 10; i++ {
if err != nil {
log.Printf("failed to call: %v", err)
break
}
stream.Send(&stream_pb.StringRequest{A: strconv.Itoa(i), B: strconv.Itoa(i + 1)})
}
reply, err := stream.CloseAndRecv()
if err != nil {
fmt.Printf("failed to recv: %v", err)
}
log.Printf("sendClientStreamRequest ret is : %s", reply.Ret)
}
func sendClientAndServerStreamRequest(conn *grpc.ClientConn) {
fmt.Printf("test sendClientAndServerStreamRequest \n")
var err error
client := stream_pb.NewStringServiceClient(conn)
stream, err := client.LotsOfServerAndClientStream(context.Background())
if err != nil {
log.Printf("failed to call: %v", err)
return
}
var i int
for {
err1 := stream.Send(&stream_pb.StringRequest{A: strconv.Itoa(i), B: strconv.Itoa(i + 1)})
if err1 != nil {
log.Printf("failed to send: %v", err)
break
}
reply, err2 := stream.Recv()
if err2 != nil {
log.Printf("failed to recv: %v", err)
break
}
log.Printf("sendClientAndServerStreamRequest Ret is : %s", reply.Ret)
i++
}
}
8. 编译运行
编译运行服务端
$ go build -o ./bin/server ./server/
$ ./bin/server
编译运行客户端
$ go build -o ./bin/client ./client/
$ ./bin/client
|