IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> gRPC 笔记(05)— gRPC 通信模式之服务器端流 RPC、客户端流 RPC 、双向流 RPC的代码实现 -> 正文阅读

[网络协议]gRPC 笔记(05)— gRPC 通信模式之服务器端流 RPC、客户端流 RPC 、双向流 RPC的代码实现

通过使用流 stream,我们可以向服务器或者客户端发送批量数据,服务器和客户端在接收这些数据的时候,可以不必等所有的消息全接收后才开始响应,而是接收到第一条消息的时候就可以及时的响应。

1. 服务端流 RPC

在简单 RPC 模式中, gRPC 服务器端和 gRPC 客户端在通信时始终只有一个请求和一个响应。在服务器端流 RPC 模式中,服务器端在接收到客户端的请求消息后,会发回一个响应的序列。这种多个响应所组成的序列也被称为“流”。在将所有的服务器端响应发送完毕之后,服务器端会以 trailer 元数据的形式将其状态发送给客户端,从而标记流的结束。

假设需要实现一个订单搜索功能,利用该功能,只要提供一个搜索词就能得到匹配的结果,如图所
示。订单管理服务不会将所有匹配的订单一次性地发送给客户端,而是在找到匹配的订单时,逐步将其发送出去。这意味着当订单服务的客户端发出一个请求之后,会接收到多条响应消息。

服务器流

2. 客户端流 RPC

在客户端流 RPC 模式中,客户端会发送多个请求给服务器端,而不再是单个请求。服务器端则会发送一个响应给客户端。但是,服务器端不一定要等到从客户端接收到所有消息后才发送响应。基于这样的逻辑,我们可以在接收到流中的一条消息或几条消息之后就发送响应,也可以在读取完流中的所有消息之后再发送响应。

假设希望在订单管理服务中添加新的 updateOrders 方法,从而更新一个订单集合,如图所示。在这
里,我们想以消息流的形式发送订单列表到服务器端,服务器端会处理这个流并发送一条带有已更新订单状态的消息给客户端。

客户端流

3. 双向流 RPC

在双向流 RPC 模式中,客户端以消息流的形式发送请求到服务器端,服务器端也以消息流的形式进行响应。调用必须由客户端发起,但在此之后,通信完全基于 gRPC 客户端和服务器端的应用程序逻辑。

下面通过一个示例来进一步了解双向流 RPC 模式。如图所示,在订单管理服务服务用例中,假设需要一个订单处理功能,通过该功能,用户可以发送连续的订单集合(订单流),并根据投递地址对它们进行组合发货,也就是说,订单要根据投递目的地进行组织和发货。

双向流
双向流 RPC 和客户端流 RPC 调用的区别在于:

  • 客户端流 RPC 调用会先将客户端的请求以流的形式发送完毕,再获得服务端的响应;
  • 双向流 RPC 调用中,客户端发送一个请求数据后,立马就可以获取对应的服务器端响应;

4. 目录结构

wohu@wohu-dev:~/goProject/rpcDemo$ tree
.
├── bin
│   ├── client
│   └── server
├── client
│   └── client.go
├── go.mod
├── go.sum
├── pb
│   ├── run.sh
│   ├── string.pb.go
│   └── string.proto
├── server
│   └── server.go
└── service
    └── service.go

5 directories, 10 files
wohu@wohu-dev:~/goProject/rpcDemo$

5. proto 定义

string.proto 定义:

syntax = "proto3";

package pb;

option go_package = "./;pb"; 

service StringService{
    rpc Concat(StringRequest) returns (StringResponse) {}
    rpc Diff(StringRequest) returns (StringResponse) {}
    // 通过使用 returns (stream StringResponse) 将返回参数指定为流形式
    rpc LotsOfServerStream(StringRequest) returns (stream StringResponse) {}
    rpc LotsOfClientStream(stream StringRequest) returns (StringResponse) {}
    rpc LotsOfServerAndClientStream(stream StringRequest) returns (stream StringResponse) {}
}

message StringRequest {
    string A = 1;
    string B = 2;
}

message StringResponse {
    string Ret = 1;
    string err = 2;
}

生成代码命令

#!/bin/bash
protoc -I=./ --go_out=plugins=grpc:. ./string.proto 

6. 服务端代码

server.go 内容

package main

import (
	"flag"
	"log"
	"rpcDemo/pb"
	"rpcDemo/service"

	"net"

	"google.golang.org/grpc"
)

func main() {
	flag.Parse()
	lis, err := net.Listen("tcp", "127.0.0.1:1234")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	grpcServer := grpc.NewServer()
	stringService := new(service.StringService)
	pb.RegisterStringServiceServer(grpcServer, stringService)
	grpcServer.Serve(lis)
}

service.go 内容

package service

import (
	"context"
	"errors"
	"io"
	"log"
	"strings"
	"time"

	stream_pb "rpcDemo/pb"
)

const (
	StrMaxSize = 1024
)

// Service errors
var (
	ErrMaxSize = errors.New("maximum size of 1024 bytes exceeded")

	ErrStrValue = errors.New("maximum size of 1024 bytes exceeded")
)

type StringService struct{}

func (s *StringService) Concat(ctx context.Context, req *stream_pb.StringRequest) (*stream_pb.StringResponse, error) {
	if len(req.A)+len(req.B) > StrMaxSize {
		response := stream_pb.StringResponse{Ret: ""}
		return &response, nil
	}
	response := stream_pb.StringResponse{Ret: req.A + req.B}
	return &response, nil
}

func (s *StringService) Diff(ctx context.Context, req *stream_pb.StringRequest) (*stream_pb.StringResponse, error) {
	if len(req.A) < 1 || len(req.B) < 1 {
		response := stream_pb.StringResponse{Ret: ""}
		return &response, nil
	}
	res := ""
	if len(req.A) >= len(req.B) {
		for _, char := range req.B {
			if strings.Contains(req.A, string(char)) {
				res = res + string(char)
			}
		}
	} else {
		for _, char := range req.A {
			if strings.Contains(req.B, string(char)) {
				res = res + string(char)
			}
		}
	}
	response := stream_pb.StringResponse{Ret: res}
	return &response, nil
}

/* 
StringService_LotsOfServerStreamServer 是流的引用对象,可以写入多个响应。
这里的业务逻辑是找到匹配的订单,并通过流将其依次发送出去。当找到新的订单时,
使用流引用对象的 Send(...) 方法将其写入流。一旦所有响应都写到了流中,
就可以通过返回 nil 来标记流已经结束,服务器端的状态和其他 trailer 元数据会发送给客户端
*/
func (s *StringService) LotsOfServerStream(req *stream_pb.StringRequest, qs stream_pb.StringService_LotsOfServerStreamServer) error {
	response := stream_pb.StringResponse{Ret: req.A + req.B}
	for i := 0; i < 10; i++ {
		time.Sleep(2 * time.Second)
		// 服务端流式响应可以通过 Send 方法返回多个 StringResponse 结构体,对象流序列化后流式返回
		qs.Send(&response)
	}
	return nil
}

func (s *StringService) LotsOfClientStream(qs stream_pb.StringService_LotsOfClientStreamServer) error {
	var params []string
	for {
		in, err := qs.Recv()
		if err == io.EOF {
			qs.SendAndClose(&stream_pb.StringResponse{Ret: strings.Join(params, "-")})
			return nil
		}
		if err != nil {
			log.Printf("failed to recv: %v", err)
			return err
		}
		params = append(params, in.A, in.B)
	}
}
func (s *StringService) LotsOfServerAndClientStream(qs stream_pb.StringService_LotsOfServerAndClientStreamServer) error {
	for {
		in, err := qs.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			log.Printf("failed to recv %v", err)
			return err
		}
		qs.Send(&stream_pb.StringResponse{Ret: in.A + in.B})
	}
}

7. 客户端代码

client.go 代码

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"strconv"

	stream_pb "rpcDemo/pb"

	"google.golang.org/grpc"
)

func main() {
	serviceAddress := "127.0.0.1:1234"
	conn, err := grpc.Dial(serviceAddress, grpc.WithInsecure())
	if err != nil {
		panic("connect error")
	}
	defer conn.Close()

	// simpleRPC(conn)

	// sendServerStreamRequest(conn)

	// sendClientStreamRequest(conn)

	sendClientAndServerStreamRequest(conn)
}

func simpleRPC(conn *grpc.ClientConn) {
	bookClient := stream_pb.NewStringServiceClient(conn)
	stringReq := &stream_pb.StringRequest{A: "A", B: "B"}
	reply, _ := bookClient.Concat(context.Background(), stringReq)
	fmt.Printf("StringService Concat : %s concat %s = %s\n", stringReq.A, stringReq.B, reply.Ret)
}

/*
客户端的远程方法调用和一元 RPC 模式中的非常类似。但是,因为服
务器端往流中写入了多个响应,所以这里必须处理多个响应。因此,我
们在 gRPC 客户端的 Go 语言实现中使用 Recv 方法从客户端流中检索
消息,并且持续检索,直到流结束为止

*/
func sendServerStreamRequest(conn *grpc.ClientConn) {
	fmt.Printf("test sendServerStreamRequest \n")

	stringClient := stream_pb.NewStringServiceClient(conn)
	stringReq := &stream_pb.StringRequest{A: "A", B: "B"}
	stream, _ := stringClient.LotsOfServerStream(context.Background(), stringReq)
	for {
		item, stream_error := stream.Recv()
		// 当发现流结束的时候,Recv 会返回 io.EOF
		if stream_error == io.EOF {
			break
		}
		if stream_error != nil {
			log.Printf("failed to recv: %v", stream_error)
		}
		fmt.Printf("StringService Concat : %s concat %s = %s\n", stringReq.A, stringReq.B, item.GetRet())
	}
}

func sendClientStreamRequest(conn *grpc.ClientConn) {
	fmt.Printf("test sendClientStreamRequest \n")
	client := stream_pb.NewStringServiceClient(conn)
	stream, err := client.LotsOfClientStream(context.Background())
	for i := 0; i < 10; i++ {
		if err != nil {
			log.Printf("failed to call: %v", err)
			break
		}
		// 使用它的 Send 方法可以以流式的方式向服务器端发送请求
		stream.Send(&stream_pb.StringRequest{A: strconv.Itoa(i), B: strconv.Itoa(i + 1)})
	}
	// 发送完成后,调用其 CloseAndRecv() 方法来获得服务器端响应
	reply, err := stream.CloseAndRecv()
	if err != nil {
		fmt.Printf("failed to recv: %v", err)
	}
	log.Printf("sendClientStreamRequest ret is : %s", reply.Ret)
}

func sendClientAndServerStreamRequest(conn *grpc.ClientConn) {
	fmt.Printf("test sendClientAndServerStreamRequest \n")
	var err error
	client := stream_pb.NewStringServiceClient(conn)
	stream, err := client.LotsOfServerAndClientStream(context.Background())
	if err != nil {
		log.Printf("failed to call: %v", err)
		return
	}
	var i int
	for {
		err1 := stream.Send(&stream_pb.StringRequest{A: strconv.Itoa(i), B: strconv.Itoa(i + 1)})
		if err1 != nil {
			log.Printf("failed to send: %v", err)
			break
		}
		reply, err2 := stream.Recv()
		if err2 != nil {
			log.Printf("failed to recv: %v", err)
			break
		}
		log.Printf("sendClientAndServerStreamRequest Ret is : %s", reply.Ret)
		i++
	}
}

8. 编译运行

编译运行服务端

$ go build -o ./bin/server ./server/
$ ./bin/server

编译运行客户端

$ go build -o ./bin/client ./client/
$ ./bin/client
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-05-18 17:59:21  更:2022-05-18 18:00:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/29 11:10:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计