IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> [golang]grpc与流式rpc简介 -> 正文阅读

[网络协议][golang]grpc与流式rpc简介


gRPC是一个语言中立、平台中立、高性能、通用的开源RPC框架;基于ProtoBuf(Protocol Buffers)序列化协议开发,且支持众多开发语言。

gRPC安装

安装protoc

从 https://github.com/google/protobuf/releases下载预编译的“protoc编译器”,用于生成gRPC服务代码。

解压zip文件,并将protoc二进制文件所在bin目录添加到PATH环境变量中。

安装golang插件

安装好golang后,设定环境:

go env -w GO111MODULE=auto
go env -w GOPROXY=https://goproxy.cn

安装proto-go,命令如下:

go get -u github.com/golang/protobuf/proto
go get -u github.com/golang/protobuf/protoc-gen-go

因被墙,可无法直接安装:

git clone https://github.com/golang/protobuf.git
# 进入protobuf/protoc-gen-go目录
go build
go install
# 此时,在$GOPATH下bin目录下会有protoc-gen-go  (把此目录添加到PATH环境变量)

生成

要生成grpc,有三步:

  • 编写.proto文件
  • 利用工具将.proto文件生成对应语言的代码
  • 根据生成的代码编写服务端和客户端的代码
// 在proto文件中,需要设定包名
option go_package = "./;mytest";  //路径和包名

# 编写好后,通过protoc生成对应代码():
./protoc --go_out=plugins=grpc:proto -I proto/ ./proto/product.proto

命令参数说明:

  • -I:指定proto文件存放目录;若文件不在当前目录下,则是必须的;
  • --go_out=plugins=grpc::指定生成go的代码,并且冒号后指定go代码的存放目录;

简单gRPC

gRPC 里客户端应用可以像调用本地对象一样直接调用远端服务上的应用方法,使得创建分布式应用和服务更简单。其基于以下理念:

  • 定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型);
  • 在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用;
  • 在客户端拥有一个存根(像服务端一样的方法),代理服务端对应的接口。

定义proto

先定义个简单的请求与相应示例

syntax = "proto3";

package service;

option go_package = "./;product";

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

把示例文件放在proto目录下,执行protoc命令:

./protoc --go_out=plugins=grpc:proto -I proto/ ./proto/product.proto

会生成product.pb.go文件,里面包含服务端与客户端的代码。

服务端

要实现proto中的服务,需要实现GreeterServer接口:

type GreeterServer interface {
	SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

先实现一个对应服务:

type server struct {
}

func (s *server) SayHello(ctx context.Context, in *product.HelloRequest) (*product.HelloReply, error) {
	log.Println("Input:", in)
	return &product.HelloReply{Message: "Hello " + in.Name}, nil
}

然后创建gRPC服务:

func main() {
	// 创建 Tcp 连接
	listen, err := net.Listen("tcp", ":5901")
	if err != nil {
		log.Println("Listen failed:", err)
		return
	}
	
	rpcServer := grpc.NewServer()
	
	// 注册服务实现者
	// 此函数在.pb.go中,自动生成
	product.RegisterGreeterServer(rpcServer, &server{})

	// 在 gRPC 服务上注册反射服务
	// 以便可通过grpcurl工具来查询服务列表或调用grpc方法
	// reflection.Register(grpcServer)

	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("Start RPC Server fail: %v", err)
	}
}

客户端

客户端可通过Dial方便地连接,若要做一些参数配置,则需要通过DialContext:

func main() {
	//conn, err := grpc.Dial(":5901", grpc.WithInsecure())
	conn, err := grpc.DialContext(context.Background(), ":5901",
		grpc.WithInsecure(),
		grpc.WithKeepaliveParams(keepalive.ClientParameters{
			Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
			Timeout:             2 * time.Second,  // wait 2 second for ping ack before considering the connection dead
			PermitWithoutStream: true,  // send pings even without active streams
		}))
	if err != nil {
		log.Println("Connect fail:", err)
		//return
	}
	defer conn.Close()

	count := 0
	client := product.NewGreeterClient(conn)
	for {
		time.Sleep(5 * time.Second)
		count++
		log.Println("Try", count)

		reply, err := client.SayHello(context.Background(), &product.HelloRequest{Name: "Mike" + strconv.Itoa(count)})

		if err != nil {
			log.Println("Call fail:", err)
			continue
		}

		log.Println("Reply:", reply)
	}
}

流式gRPC

相比于简单gRPC,流式(stream)gRPC实现发送/接收大量数据,或不断传输数据的场景。可分为:

  • ServerStream,服务端推送流:客户端发送一个单独的请求,服务端返回流式数据,客户端读取流式数据直到EOF
    • 服务端处理完成后(推流完所有数据),return nil代表响应完成;
    • 客户端通过err == io.EOF判断服务端是否响应完成;
  • ClientStream,客户端推送流:客户端写入流式数据,写入完成后等待服务端返回结果
    • 客户端发送完毕通过**CloseAndRecv**关闭stream 并接收服务端响应;
    • 服务端通过err == io.EOF判断客户端是否发送完毕,完毕后使用**SendAndClose**关闭 stream并返回响应;
  • BidirectionalStream,双向推送流:这两个流是独立运行的,因此客户端和服务器可以按照自己需要的顺序进行读写
    • 客户端服务端都通过stream向对方推送数据,推送完成后通过**CloseSend**关闭流;通过err == io.EOF判断服务端是否响应完成(接收完数据);
    • 服务端通过err == io.EOF判断客户端是否响应完成,通过return nil表示已经完成响应;

定义proto

定义数据结构:

syntax = "proto3";
option go_package = ".;proto";

//三个流式rpc
//GetStream服务器返回流
//PutStream客户端上传流
//DiStream双向流
service Stream{
  rpc GetStream(MsgRequest)returns(stream MsgReply){}
  rpc PutStream(stream MsgRequest)returns(MsgReply){}
  rpc BiStream(stream MsgRequest)returns(stream MsgReply){}
}

message MsgRequest {
  string data = 1;
}

message MsgReply {
  string data = 1;
}

服务端

定义服务,实现对应流接口:

type server struct {
}

//服务端->客户端 单向流
func (*server) GetStream(req *proto.MsgRequest, getServer proto.Stream_GetStreamServer) error {
	log.Println("GetServer Start.")
	i := 0
	for i < 10 {
		i++
		getServer.Send(&proto.MsgReply{Data: req.Data + ":" + fmt.Sprintf("%v", time.Now().Unix())})
		log.Println("Get Res Send.")
		time.Sleep(1 * time.Second)
	}
	log.Println("GetServer Start.")
	return nil
}

//客户端->服务端 单向流
func (*server) PutStream(putServer proto.Stream_PutStreamServer) error {
	log.Println("PutServer Start.")
	var cliStr strings.Builder
	for {
		if putReq, err := putServer.Recv(); err == nil {
			log.Println("Put Req: " + putReq.Data)
			cliStr.WriteString(putReq.Data)
		} else {
			putServer.SendAndClose(&proto.MsgReply{Data: "Finish. Your Data is: " + cliStr.String()})
			break
		}
	}
	log.Println("PutServer Done.")
	return nil
}

//双向流
func (*server) BiStream(biServer proto.Stream_BiStreamServer) error {
	log.Println("BiServer Start.")
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		for {
			biReq, err := biServer.Recv()
			if err != nil {
				if err == io.EOF {
					log.Printf("[INFO] recv end")
				}
				break
			} else {
				log.Println("Bi Req: " + biReq.Data)
			}
		}
		wg.Done()
	}()

	go func() {
		for {
			err := biServer.Send(&proto.MsgReply{Data: "ok"})
			if err != nil {
				break
			} else {
				log.Println("Bi Res: ok")
				time.Sleep(time.Second)
			}
		}
		wg.Done()
	}()

	wg.Wait()
	log.Println("BiServer Done.")
	return nil
}

启动服务:

func main() {
	//监听端口
	lis, err := net.Listen("tcp", ":5902")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	
	//创建一个grpc 服务器
	s := grpc.NewServer()
	//注册事件
	proto.RegisterStreamServer(s, &server{})
	
	// 注册服务端反射服务
	// reflection.Register(s)
	
	//处理链接
	s.Serve(lis)
}

客户端

调用对应接口成功后,会返回对应流对象:

func main() {
	//新建grpc连接
	grpcConn, err := grpc.Dial(":5902", grpc.WithInsecure())
	if err != nil {
		log.Fatalln(err)
	}
	defer grpcConn.Close()

	//通过连接 生成一个client对象。
	c := proto.NewStreamClient(grpcConn)

	//设置超时
	//ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	//defer cancel()
	ctx := context.Background()

	
	//调用服务端推送流,获取服务端流数据
	log.Println("GetStream:")
	getClient, err := c.GetStream(ctx, &proto.MsgRequest{Data: "Get Time"})
	if err != nil {
		log.Fatalln(err)
		return
	}
	for {
		aa, err := getClient.Recv()
		if err != nil {
			if err == io.EOF {
				log.Printf("[INFO] recv end")
			}
			break
		}
		log.Println("Get Res Data: " + aa.Data)
	}

	
	//客户端推送流
	log.Println("PutStream:")
	putClient, err := c.PutStream(ctx)
	if err != nil {
		log.Fatalln(err)
		return
	}
	i := 1
	for i < 4 {
		i++
		var putData = proto.MsgRequest{Data: "Put " + strconv.Itoa(i) + " "}
		log.Println("Put Req Data: " + putData.Data)
		putClient.Send(&putData)
		time.Sleep(time.Second)
	}
	putRes, err := putClient.CloseAndRecv()
	if err != nil {
		log.Fatalln(err)
	}
	log.Printf("Put Done. Res is %v", putRes.Data)

	
	//双向流
	log.Println("BiStream:")
	//设置结束等待
	done := make(chan struct{})
	biClient, err := c.BiStream(ctx)
	if err != nil {
		log.Fatalln(err)
		return
	}
	go func() {
		for {
			biRes, err := biClient.Recv()
			if err != nil {
				return
			} else {
				log.Println("Bi Res Data: " + biRes.Data)
			}
		}
	}()

	go func() {
		i := 1
		for i < 4 {
			i++
			biReq := proto.MsgRequest{Data: "send " + strconv.Itoa(i) + " "}
			log.Println("Bi Req Data: " + biReq.Data)
			biClient.Send(&biReq)
			time.Sleep(time.Second)
		}
        biClient.CloseSend()
		done <- struct{}{}
	}()

	<-done
	log.Println("All Done.")
}

proto3

proto3用于构建 protocol buffer 数据:包括 .proto 文件语法和如何基于该 .proto 文件生成数据访问类。

所有数据结构以message来定义:

  • 每个字段必须指定具体类型(可以是简单类型,也可以是message);
  • 每个字段都需分配唯一的标识编号,投入到使用后不应该被更改:
    • 对于1~15编号的字段,只需要一个字节编码;
    • 对于16~2047编号的字段,则需要两个字节编码;
    • 字段号的范围为:1~536870911(2^29-1)。其中19000~19999为ProtocolBuffer自己预留(reserved)的字段号不能使用。
  • 字段可以使用两种规则描述:
    • 单一的(singular):0个或1个,不用在字段定义中指出;
    • 重复的(repeated):0个到多个,需要在字段定义中指出。
  • 默认值,消息被解析时,若字段并没有被赋值,将会被设为默认值:
    • string:空串
    • bytes:空的bytes序列
    • bool:false
    • 数字类型:0
    • 枚举类型:默认值为枚举类型中定义的第一个值,也就是0
    • 消息类型(message):取决于所编译的语言; 对于repeated,为空的list。

数据类型

消息标量字段可以是以下类型之一:

.protoC++JavaPythonGoC#Notes
doubledoubledoublefloatfloat64double
floatfloatfloatfloatfloat32float
int32int32intintint32int使用变长编码,对于负值的效率很低,如果你的域有可能有负值,请使用sint64替代
uint32uint32intint/longuint32uint使用变长编码
uint64uint64longint/longuint64ulong使用变长编码
sint32int32intintint32int使用变长编码,这些编码在负值时比int32高效的多
sint64int64longint/longint64long使用变长编码,有符号的整型值。编码时比通常的int64高效。
fixed32uint32intintuint32uint总是4个字节,如果数值总是比总是比228大的话,这个类型会比uint32高效。
fixed64uint64longint/longuint64ulong总是8个字节,如果数值总是比总是比256大的话,这个类型会比uint64高效。
sfixed32int32intintint32int总是4个字节
sfixed64int64longint/longint64long总是8个字节
boolboolbooleanboolboolbool
stringstringStringstr/unicodestringstring一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本。
bytesstringByteStringstr[]byteByteString可能包含任意顺序的字节数据。
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 18:20:11  更:2022-04-18 18:20:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 4:35:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码