引言
gRPC应用程序共有四种基础的通信模式,分别为:一元RPC,服务端流RPC,客户端流RPC以及双向流RPC 注:阅读本文,如果没有gRPC基础的同学可以看我之前关于如何简单构建gRPC客户端和服务端的文章:
gRPC开发:go语言构建简单的服务端和客户端
一元RPC模式
服务定义:
syntax="proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
option go_package = "../ecommerce";
service OrderManagement{
rpc getOrder(google.protobuf.StringValue) returns (Order);
}
message Order{
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
服务端实现
package main
import (
"context"
wrapper "github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"net"
pb "orderServiceUnaryGRPC/service/ecommerce"
)
type server struct {
orderMap map[string]*pb.Order
}
const (
port = ":50051"
)
var orderMap = make(map[string]pb.Order)
func initSampleData() {
orderMap["102"] = pb.Order{Id: "102", Items: []string{"Google Pixel 3A", "Mac Book Pro"}, Destination: "Mountain View, CA", Price: 1800.00}
orderMap["103"] = pb.Order{Id: "103", Items: []string{"Apple Watch S4"}, Destination: "San Jose, CA", Price: 400.00}
orderMap["104"] = pb.Order{Id: "104", Items: []string{"Google Home Mini", "Google Nest Hub"}, Destination: "Mountain View, CA", Price: 400.00}
orderMap["105"] = pb.Order{Id: "105", Items: []string{"Amazon Echo"}, Destination: "San Jose, CA", Price: 30.00}
orderMap["106"] = pb.Order{Id: "106", Items: []string{"Amazon Echo", "Apple iPhone XS"}, Destination: "Mountain View, CA", Price: 300.00}
}
func (s *server) GetOrder(ctx context.Context, orderId *wrapper.StringValue) (*pb.Order, error) {
ord, exists := orderMap[orderId.Value]
if exists {
return &ord, status.New(codes.OK, "").Err()
}
return nil, status.Errorf(codes.NotFound, "Order does not exist.:", orderId)
}
func main() {
initSampleData()
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen:%v", err)
}
log.Printf("Starting gRPC listener on port:%s", port)
s := grpc.NewServer()
pb.RegisterOrderManagementServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve:%v", err)
}
}
客户端实现
package main
import (
"context"
wrapper "github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
pb "orderServiceUnaryGRPC/client/ecommerce"
"time"
)
const (
address = "localhost:50051"
)
func main() {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Did not connect:%v", err)
}
defer conn.Close()
client := pb.NewOrderManagementClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
ord, err := client.GetOrder(ctx, &wrapper.StringValue{Value: "105"})
log.Printf("GetOrder Response ->:%v", ord)
}
服务器端流RPC模式
在一元 RPC 模式中,gRPC 服务器端和 gRPC 客户端在通信时始终只有 一个请求和一个响应。在服务器端流 RPC 模式中,服务器端在接收到 客户端的请求消息后,会发回一个响应的序列。这种多个响应所组成的 序列也被称为“流”。在将所有的服务器端响应发送完毕之后,服务器端 会以 trailer 元数据的形式将其状态发送给客户端,从而标记流的结束。
服务定义
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
option go_package = "../ecommerce";
service OrderManagement{
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
}
message Order{
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
服务端实现
package main
import (
"fmt"
wrapper "github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"log"
"net"
pb "orderServeStream/service/ecommerce"
"strings"
)
type serve struct {
orderMap map[string]*pb.Order
}
const (
port = ":50051"
)
var orderMap = make(map[string]pb.Order)
func initSampleData() {
orderMap["102"] = pb.Order{Id: "102", Items: []string{"Google Pixel 3A", "Mac Book Pro"}, Destination: "Mountain View, CA", Price: 1800.00}
orderMap["103"] = pb.Order{Id: "103", Items: []string{"Apple Watch S4"}, Destination: "San Jose, CA", Price: 400.00}
orderMap["104"] = pb.Order{Id: "104", Items: []string{"Google Home Mini", "Google Nest Hub"}, Destination: "Mountain View, CA", Price: 400.00}
orderMap["105"] = pb.Order{Id: "105", Items: []string{"Amazon Echo"}, Destination: "San Jose, CA", Price: 30.00}
orderMap["106"] = pb.Order{Id: "106", Items: []string{"Amazon Echo", "Apple iPhone XS"}, Destination: "Mountain View, CA", Price: 300.00}
}
func (s *serve) SearchOrders(searchQuery *wrapper.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {
for key, order := range orderMap {
log.Print(key, order)
for _, itemStr := range order.Items {
log.Print(itemStr)
if strings.Contains(itemStr, searchQuery.Value) {
err := stream.Send(&order)
if err != nil {
return fmt.Errorf("error sending message to stream:%v", err)
}
log.Printf("Matching Order Found : %v", key)
break
}
}
}
return nil
}
func main() {
initSampleData()
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen:%v", err)
}
s := grpc.NewServer()
pb.RegisterOrderManagementServer(s, &serve{})
log.Printf("Starting gRPC listener on port:%v", port)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve:%v", err)
}
}
客户端实现
package main
import (
"context"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
pb "orderServeStream/client/ecommerce"
"time"
)
const (
address = "localhost:50051"
)
func main() {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Did not connect:%v", err)
}
defer conn.Close()
client := pb.NewOrderManagementClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
ordStream, err := client.SearchOrders(ctx, &wrappers.StringValue{Value: "Google"})
for {
searchOrder, err := ordStream.Recv()
if err != nil {
break
}
log.Print("Search Result:", searchOrder)
}
}
客户端流RPC模式
在客户端流 RPC 模式中,客户端会发送多个请求给服务器端,而不再 是单个请求。服务器端则会发送一个响应给客户端。但是,服务器端不 一定要等到从客户端接收到所有消息后才发送响应。基于这样的逻辑, 我们可以在接收到流中的一条消息或几条消息之后就发送响应,也可以 在读取完流中的所有消息之后再发送响应。
服务定义
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
option go_package = "../ecommerce";
service OrderManagement{
rpc updateOrders(stream Order) returns(google.protobuf.StringValue);
}
message Order{
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
服务端实现
package main
import (
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"io"
"log"
"net"
pb "orderClientStream/service/ecommerce"
)
const (
port = ":50051"
)
type serve struct {
orderMap map[string]*pb.Order
}
var orderMap = make(map[string]pb.Order)
func initSampleData() {
orderMap["102"] = pb.Order{Id: "102", Items: []string{"Google Pixel 3A", "Mac Book Pro"}, Destination: "Mountain View, CA", Price: 1800.00}
orderMap["103"] = pb.Order{Id: "103", Items: []string{"Apple Watch S4"}, Destination: "San Jose, CA", Price: 400.00}
orderMap["104"] = pb.Order{Id: "104", Items: []string{"Google Home Mini", "Google Nest Hub"}, Destination: "Mountain View, CA", Price: 400.00}
orderMap["105"] = pb.Order{Id: "105", Items: []string{"Amazon Echo"}, Destination: "San Jose, CA", Price: 30.00}
orderMap["106"] = pb.Order{Id: "106", Items: []string{"Amazon Echo", "Apple iPhone XS"}, Destination: "Mountain View, CA", Price: 300.00}
}
func (s *serve) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
ordersStr := "Updated Order IDs :"
for {
order, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&wrappers.StringValue{Value: "Orders processed " + ordersStr})
}
if err != nil {
return err
}
orderMap[order.Id] = *order
log.Printf("Order ID : %s - %s", order.Id, "Updated")
ordersStr += order.Id + ","
}
}
func main() {
initSampleData()
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen:%v", err)
}
s := grpc.NewServer()
pb.RegisterOrderManagementServer(s, &serve{})
log.Printf("Starting gRPC listener on port:%v", port)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve:%v", err)
}
}
客户端实现
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
pb "orderClientStream/client/ecommerce"
"time"
)
const (
address = "localhost:50051"
)
func main() {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Did not connect:%v", err)
}
defer conn.Close()
client := pb.NewOrderManagementClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
updOrder1 := pb.Order{Id: "102", Items: []string{"Google Pixel 3A11", "Google Pixel Book11"}, Destination: "Mountain View, CA", Price: 1100.00}
updOrder2 := pb.Order{Id: "103", Items: []string{"Apple Watch S411", "Mac Book Pro", "iPad Pro11"}, Destination: "San Jose, CA", Price: 2800.00}
updOrder3 := pb.Order{Id: "104", Items: []string{"Google Home Mini11", "Google Nest Hub", "iPad Mini11"}, Destination: "Mountain View, CA", Price: 2200.00}
updateStream, err := client.UpdateOrders(ctx)
if err != nil {
log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)
}
if err := updateStream.Send(&updOrder1); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
}
if err := updateStream.Send(&updOrder2); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
}
if err := updateStream.Send(&updOrder3); err != nil {
log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder3, err)
}
updateResp, err := updateStream.CloseAndRecv()
log.Printf("Updates Orders Res: %s", updateResp)
}
双向流RPC模式
在双向流 RPC 模式中,客户端以消息流的形式发送请求到服务器端, 服务器端也以消息流的形式进行响应。调用必须由客户端发起,但在此 之后,通信完全基于 gRPC 客户端和服务器端的应用程序逻辑。
服务定义
syntax = "proto3";
import "google/protobuf/wrappers.proto";
option go_package = "../ecommerce";
service OrderManagement{
rpc processOrders(stream google.protobuf.StringValue) returns(stream CombinedShipment);
}
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
message CombinedShipment {
string id = 1;
string status = 2;
repeated Order ordersList = 3;
}
服务端实现
package main
import (
"google.golang.org/grpc"
"io"
"log"
"net"
pb "service/ecommerce"
)
type server struct {
orderMap map[string]*pb.Order
}
const (
port = ":50051"
orderBatchSize = 3
)
var orderMap = make(map[string]pb.Order)
func initSampleData() {
orderMap["102"] = pb.Order{Id: "102", Items: []string{"Google Pixel 3A", "Mac Book Pro"}, Destination: "Mountain View, CA", Price: 1800.00}
orderMap["103"] = pb.Order{Id: "103", Items: []string{"Apple Watch S4"}, Destination: "San Jose, CA", Price: 400.00}
orderMap["104"] = pb.Order{Id: "104", Items: []string{"Google Home Mini", "Google Nest Hub"}, Destination: "Mountain View, CA", Price: 400.00}
orderMap["105"] = pb.Order{Id: "105", Items: []string{"Amazon Echo"}, Destination: "San Jose, CA", Price: 30.00}
orderMap["106"] = pb.Order{Id: "106", Items: []string{"Amazon Echo", "Apple iPhone XS"}, Destination: "Mountain View, CA", Price: 300.00}
}
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
batchMark := 1
var combinedShipmentMap = make(map[string]pb.CombinedShipment)
for {
orderId, err := stream.Recv()
log.Printf("Reading Proc order:%v", orderId)
if err == io.EOF {
log.Printf("EOF:%v", orderId)
for _, shipment := range combinedShipmentMap {
if err := stream.Send(&shipment); err != nil {
return err
}
}
return nil
}
if err != nil {
log.Println(err)
return err
}
destination := orderMap[orderId.GetValue()].Destination
shipment, found := combinedShipmentMap[destination]
if found {
ord := orderMap[orderId.GetValue()]
shipment.OrdersList = append(shipment.OrdersList, &ord)
combinedShipmentMap[destination] = shipment
} else {
comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!"}
ord := orderMap[orderId.GetValue()]
comShip.OrdersList = append(comShip.OrdersList, &ord)
combinedShipmentMap[destination] = comShip
log.Print(len(comShip.OrdersList), comShip.GetId())
}
if batchMark == orderBatchSize {
for _, comb := range combinedShipmentMap {
log.Printf("Shipping:%v", len(comb.OrdersList))
if err := stream.Send(&comb); err != nil {
return nil
}
}
batchMark = 1
combinedShipmentMap = make(map[string]pb.CombinedShipment)
} else {
batchMark++
}
}
}
func main() {
initSampleData()
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen:%v", err)
}
s := grpc.NewServer()
pb.RegisterOrderManagementServer(s, &server{})
log.Printf("Starting gRPC listener on port:%v", port)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve:%v", err)
}
}
客户端实现
package main
import (
pb "client/ecommerce"
"context"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"log"
"time"
)
const (
address = "localhost:50051"
)
func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
for {
combinedShipment, errProcOrder := streamProcOrder.Recv()
if errProcOrder == io.EOF {
break
}
log.Printf("Combined shipment : %v", combinedShipment.OrdersList)
}
<-c
}
func main() {
conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Did not connect:%v", err)
}
defer conn.Close()
client := pb.NewOrderManagementClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
streamProcOrder, _ := client.ProcessOrders(ctx)
if err := streamProcOrder.Send(&wrappers.StringValue{Value: "102"}); err != nil {
log.Fatalf("%v.Send(%v)=%v", client, "102", err)
}
if err := streamProcOrder.Send(&wrappers.StringValue{Value: "103"}); err != nil {
log.Fatalf("%v.Send(%v)=%v", client, "103", err)
}
if err := streamProcOrder.Send(&wrappers.StringValue{Value: "104"}); err != nil {
log.Fatalf("%v.Send(%v)=%v", client, "104", err)
}
channel := make(chan struct{})
go asncClientBidirectionalRPC(streamProcOrder, channel)
time.Sleep(time.Millisecond * 1000)
if err := streamProcOrder.Send(&wrappers.StringValue{Value: "101"}); err != nil {
log.Fatalf("%v.Send(%v)=%v", client, "101", err)
}
if err := streamProcOrder.CloseSend(); err != nil {
log.Fatal(err)
}
<-channel
}
|