IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> grpc-python学习 -> 正文阅读

[网络协议]grpc-python学习

一、grpc-python介绍

RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务。如果 http 就是一种远程调用的协议。这里提到了远程过程调用,那么对于远程就应该有本地过程调用那么什么是本地过程调用,也就是调用本地方法对本地的对象进行操作。
例如调用 Add 方法向列表中添加一个对象。而远程过程调用,向列表添加对象方法并不在本地,调用远程提供 Add() 来实现该操作。

在这里插入图片描述

gRPC 是一个现代开源的高性能远程过程调用 (RPC) 框架,可以在任何环境中运行。它可以通过对负载平衡、跟踪、健康检查和身份验证的可插拔支持有效地连接数据中心内和跨数据中心的服务。它也适用于分布式计算的最后一英里,将设备、移动应用程序和浏览器连接到后端服务。
其由Google主要面向移动应用开发并基于http/2协议标准而设计,基于ProtoBuf( Protocol Buffers)序列化协议开发,且支持众多开发语言(python,golang,javascript,c,c++,php,ruby等)。
gRPC提供了一种简单的方法来精确地定义服务和为ios,android和后台支持服务自动生成可靠性很强的客户端功能库。客户端充分利用高级流和链接功能,从而有助于节省带宽,降低的TCP链接次数,节省CPU使用、和电池寿命。

grpc与restful的对比

gRPCREST
full name(全名)gRPC Remote Procedure CallsRepresentational State Transfer
payload (有效载荷)ProtobufJSON
unreadable binary data (不可读二进制流)readable data(可读文件)
HTTP协议HTTP/2HTTP1.1/HTTP/2
Performance(性能)faster(较快)一般
Safe (安全性)Type Safe (更安全)一般
Cross Language (跨语言)支持支持
clinet(是否需要客户端)需要一个不需要
mode(使用模式)Any functionGET/POST/PUT/DELETE/…

什么是protobuf?

  • protobuf (protocol buffer) 是google提供一个具有高效的协议数据交换格式工具库(类似json),但相比于json,protobuf有更高的转化效率,时间效率和空间效率都是JSON的3-5倍数。
  • 具有跨语言性: 支持 python golang java c++ javascript等等。

二、 环境准备

环境:
Python3.8.5
Ubuntu20.04

grpc官网:
https://www.grpc.io/
官方操作文档:
https://www.grpc.io/docs/languages/python/quickstart/

模块安装

安装必要模块包

pip install grpcio-tools
pip install grpc

1. 创建proto文件,以及参数详解

创建helloworld.proto文件并添加以下代码:

syntax = "proto3";  //指定版本

package test;     // 包的命名名称

// 创建服务对象
service grpc_test {
	// 定义一个rpc函数
	rpc 
	Hello_grpc(Hello_grpc_req)		// 请求函数
	returns ( Hello_grpc_res){}		// 反回函数
}

message Hello_grpc_req {
	string name = 1;
	int age = 2;
}
message Hello_grpc_res{
	string result = 1;
}

2. 文件编译命令

使用 protoc 编译 helloworld.proto 文件, 生成 python 语言的实现

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto
#编译 proto 文件:
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto


python -m grpc_tools.protoc: python 下的 protoc 编译器通过 python 模块(module) 实现, 所以说这一步非常省心
--python_out=. : 编译生成处理 protobuf 相关的代码的路径, 这里生成到当前目录
--grpc_python_out=. : 编译生成处理 grpc 相关的代码的路径, 这里生成到当前目录
-I. helloworld.proto : proto 文件的路径, 这里的 proto 文件在当前目录

Makefile的使用

创建Makefile文件
Makefile内容:

proto:
	python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. helloworld.proto

命令执行:

make proto

创建了Makefile文件, 命令"make proto" 和 "python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto"效果是一样的。

3. 编译后生成的代码

helloworld_pb2.py: 用来和 protobuf 数据进行交互

  • 每一个message对应的信息存储,比如我们的request与response在这里被定义extension(延展)

helloworld_pb2_grpc.py: 用来和 grpc 进行交互

  • 用来存储每一个服务的server与客户端以及注册server的工具
  • 客户端名为: service_name + Stub
  • 服务端名为: service_name + Service
  • 注册服务为: add_服务端名_to_server

4. 创建服务端和客户端

服务端代码 server.py:

import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc

from concurrent import futures # 进程线程包


class grpc_test(pb2_grpc.grpc_testServicer):
    def Hello_grpc(self, request, context):
        name = request.name
        age = request.age

        result = f'my name is {name}, i am {age} years old'
        return pb2.Hello_grpc_res(result=result)

def run():
    grpc_server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=4)   # 指定线程数
    )
    pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)  # 注册服务到grpc_server里
    grpc_server.add_insecure_port('0.0.0.0:6000')                       # 指定ip、端口号
    print('server will start at 0.0.0.0:6000')
    grpc_server.start()

    # 防止启动后直接结束添加阻塞,在其他语言不会出现这样的问题
    try:
        while 1:
            time.sleep(3600)
    except Exception as e:
        grpc_server.stop(0)
        print(e)
if __name__ == '__main__':
    run()

客户端 client.py:

import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc


def run():
    # 定义一个频道 绑定ip、端口号
    conn = grpc.insecure_channel('0.0.0.0:6000')
    
    # 生成客户端
    client = pb2_grpc.grpc_testStub(channel=conn)
    
    # 传入参数获取返回值
    response = client.Hello_grpc(pb2.Hello_grpc_req(
        name='yanilo',
        age=18
    ))
    print(response.result)

if __name__ == '__main__':
    run()

反回结果:

my name is yanilo, i am 18 years old

三、常用的protobuf数据类型

类型说明
string字符串类型,要求是utf-8或者7-bit与ascii编码的字符串
bytes比特类型
bool布尔类型
int3232整型
int6464位整型
float浮点类型
repeated数组(列表)repeated string data = 1;
map字典类型(python叫法) map<string,string> data = 1;

四、常用的protobuf的特殊字符

类型说明
package包名称
syntaxprotobuf版本
service定义服务
rpc定义服务中的方法
stream定义的方法传输为流传输
message定义消息体 message User{}
extend扩展消息体 extend User{}
import导入一些插件
//注释

五、不同数据类型示例

1. helloworld.proto:

syntax = "proto3";  //指定版本

package test;     // 包的命名名称

service grpc_test {
	// 定义一个rpc函数
	rpc Hello_grpc(Hello_grpc_req) returns (Hello_grpc_res){}
	rpc helle_test(hello_requests) returns (hello_response){}
}


message Hello_grpc_req {
	string name = 1;
	int32 age = 2;
}
message Hello_grpc_res{
	string result = 1;
}
message hello_requests{
	string name = 1;
	int64 age = 2;
	repeated string  data = 3;
	map<string,int64> number_1 = 4; // string,int32
	map<string,hello_requests_number_value> number = 5;
}

message hello_requests_number_value{
	string name = 1;
	int32 age = 2;
	bool is_active = 3;


}


message hello_response{
	string result = 1;
}


2. server.py:

import time

import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc


from concurrent import futures # 进程线程包


class grpc_test(pb2_grpc.grpc_testServicer):
    def Hello_grpc(self, request, context):
        name = request.name
        age = request.age

        result = f'my name is {name}, i am {age} years old'
        return pb2.Hello_grpc_res(result=result)

    def helle_test(self, request, context):
        name = request.name
        age = request.age
        data = request.data
        number_1 = request.number_1
        number= request.number
        result = f"{name}\n{age}\n{data}\n{number_1}\n{number}"
        return pb2.hello_response(result=result)

def run():
    grpc_server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=4)   # 指定线程数
    )
    pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)  # 注册服务到grpc_server里
    grpc_server.add_insecure_port('0.0.0.0:50010')                       # 指定ip、端口号
    print('server will start at 0.0.0.0:50010')
    grpc_server.start()

    # 防止启动后直接结束添加阻塞,在其他语言不会出现这样的问题
    try:
        while 1:
            time.sleep(3600)
    except Exception as e:
        grpc_server.stop(0)
        print(e)
if __name__ == '__main__':
    run()

3. client.py:

import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc


def run():
    # 定义一个频道 绑定ip、端口号
    conn = grpc.insecure_channel('127.0.0.1:50010')

    # 生成客户端
    client = pb2_grpc.grpc_testStub(channel=conn)

    # 传入参数获取返回值
    response = client.Hello_grpc(pb2.Hello_grpc_req(
        name='yanilo',
        age=18
    ))
    print(response.result)
    print('*****************')
    response1 = client.helle_test(pb2.hello_requests(
        name='yanilo',
        age=100,
        data=["qwq"],
        number_1={'k': 11,'k2': 22,'k3': 33},
        number={'k': pb2.hello_requests_number_value(
            age=123,
            name='yanilo_value',
            is_active=True
        )},

    ))
    print(response1.result)


if __name__ == '__main__':
    run()

4. 反回结果

服务端:

在这里插入图片描述
客户端:
在这里插入图片描述

六、grpc basic: 四种通信方式

1. unary 单程

  • 两者都是非流形式,客服端一次请求, 服务器一次应答,(上面的不同数据类型示例就是非流形式)
  • 客服端一次请求, 服务器一次应答

2. stream

单向流和双向流都是客户端与服务端建立长链接

  • 单向流:
    1. 一种是客户端不停给服务端发送流数据,服务器端只负责接收;
      客服端多次请求(流式), 服务器一次应答

    2. 一种是服务器端不停给客户端发送流数据,客户端只负责接收;
      客服端一次请求, 服务器多次应答(流式)

  • 双向流: 客户端与服务端建立长链接,双方发送和接收流数据

proto 中想要表示流式传输, 只需要在proto文件中服务(service )的rpc函数里添加 stream 关键字即可
例:

service grpc_test {
	// 定义一个rpc函数
	rpc
			// 客户端接收流形式数据,但不发送流形式数据
			test_client_stream(test_client_stream_requests)		
			// stream 服务端以流的形式传输数据
			returns(stream test_client_stream_response){}
}



3. 单向流和双向流示例

3.1 单向流1

客户端每一秒给服务端端发送流数据,服务端接收流数据,但不传输流数据。
helloworld.proto

syntax = "proto3";  //指定版本

package test;     // 包的命名名称

service grpc_test {
	// 定义一个rpc函数
	rpc
			// 客户端接收非流形式数据,发送流形式数据
			test_client(stream test_client_requests)
			// 服务端发送非流形式数据,接受流形式数据
			returns( test_client_response){}
}
message test_client_requests{
	string data = 1;
}
message test_client_response{
	string result = 1;
}

执行命令:

make proto

server.py

import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
from concurrent import futures   # 进程线程包

class grpc_test(pb2_grpc.grpc_testServicer):

    def test_client(self, request_iterator, context):
        # context.is_active()     # 检测客户端是否还活着
        # 接收来自客户端流数据
        index = 0
        for request in request_iterator:
            data = request.data
            index += 1
            result = f'receive {data},index:{index}'
            print(result)
            time.sleep(1)
            # 条件终止连接
            if index == 10:
                break
        return pb2.test_client_response(result='ok')

def run():
    grpc_server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=4)   # 指定线程数
    )
    pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)  # 注册服务到grpc_server里
    grpc_server.add_insecure_port('0.0.0.0:50010')                       # 指定ip、端口号
    print('server will start at 0.0.0.0:50010')
    grpc_server.start()

    # 防止启动后直接结束添加阻塞,在其他语言不会出现这样的问题
    try:
        while 1:
            time.sleep(3600)
    except Exception as e:
        grpc_server.stop(0)
        print(e)
if __name__ == '__main__':
    run()

client.py

import random
import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc

def test():
    """
    循环发送流数据
    每一秒中向服务端发送流数据
    :return: 
    """
    index = 0
    while True:
        index += 1
        time.sleep(1)
        str_random = str(random.random())
        yield pb2.test_client_requests(
            data=str_random
        )
        # 条件终止连接
        if index == 15:
            break
        
def run():
    # 定义一个频道 绑定ip、端口号
    conn = grpc.insecure_channel('127.0.0.1:50010')

    # 生成客户端
    client = pb2_grpc.grpc_testStub(channel=conn)

    # 传入参数获取返回值
    response = client.test_client(test())
    print(response)
  
  
if __name__ == '__main__':
    run()

结果:
在这里插入图片描述

3.2 单向流2

服务器每一秒给客户端发送流数据,客户端接收流数据,但不传输流数据。
helloworld.proto:

syntax = "proto3";  //指定版本

package test;     // 包的命名名称

service grpc_test {
	// 定义一个rpc函数
	rpc
			// 客户端接收流形式数据,但不发送流形式数据
			test_client_stream(test_client_stream_requests)		
			// stream 服务端以流的形式传输数据
			returns(stream test_client_stream_response){}
}
message test_client_stream_requests{
	string data = 1;
}
message test_client_stream_response{
	string result = 1;
}

执行命令:

make proto

server.py

import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc


from concurrent import futures # 进程线程包


class grpc_test(pb2_grpc.grpc_testServicer):
    def test_client_stream(self, request, context):
        # context.is_active()     # 检测客户端是否还活着

        # 每一秒中向客户发送数据
        index = 0
        while context.is_active():
            data = request.data
            index += 1
            result = f'send {data},index:{index}'
            print(result)
            time.sleep(1)
            yield pb2.test_client_stream_response(result=result)



def run():
    grpc_server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=4)   # 指定线程数
    )
    pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)  # 注册服务到grpc_server里
    grpc_server.add_insecure_port('0.0.0.0:50010')                       # 指定ip、端口号
    print('server will start at 0.0.0.0:50010')
    grpc_server.start()

    # 防止启动后直接结束添加阻塞,在其他语言不会出现这样的问题
    try:
        while 1:
            time.sleep(3600)
    except Exception as e:
        grpc_server.stop(0)
        print(e)
if __name__ == '__main__':
    run()

client.py

import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc

def run():
    # 定义一个频道 绑定ip、端口号
    conn = grpc.insecure_channel('127.0.0.1:50010')

    # 生成客户端
    client = pb2_grpc.grpc_testStub(channel=conn)

    # 传入参数获取返回值
    response = client.test_client_stream(pb2.test_client_stream_requests(
            data='yanilo'
    ))
    for i in response:
        print(i.result)

if __name__ == '__main__':
    run()

结果:
在这里插入图片描述

3.2 双向流

客服端和服务端都以流形式传输数据
helloworld.proto

syntax = "proto3";  //指定版本

package test;     // 包的命名名称

service grpc_test {
	// 定义一个rpc函数
	rpc
			// 客服端和服务端都以流形式传输数据
			test_client(stream test_client_requests)
			returns( stream test_client_response){}
}
message test_client_requests{
	string data = 1;
}
message test_client_response{
	string result = 1;
}

执行命令:

make proto

server.py

import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
from concurrent import futures   # 进程线程包

class grpc_test(pb2_grpc.grpc_testServicer):

    def test_client(self, request_iterator, context):
        # context.is_active()     # 检测客户端是否还活着

        # 接收来自客户端流数据, 且向客户端发送数据
        index = 0
        for request in request_iterator:
            data = request.data
            index += 1
            result = f'receive {data},index:{index}'
            print(result)
            time.sleep(1)
            # 条件终止连接
            if index == 10:
               # context.cancel()        # 强制终止连接,会引起客户端抛出异常
                break
            yield pb2.test_client_response(result=result)

def run():
    grpc_server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=4)   # 指定线程数
    )
    pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)  # 注册服务到grpc_server里
    grpc_server.add_insecure_port('0.0.0.0:50010')                       # 指定ip、端口号
    print('server will start at 0.0.0.0:50010')
    grpc_server.start()

    # 防止启动后直接结束添加阻塞,在其他语言不会出现这样的问题
    try:
        while 1:
            time.sleep(3600)
    except Exception as e:
        grpc_server.stop(0)
        print(e)
if __name__ == '__main__':
    run()

client.py

import random
import time

import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc


def test():
    """
    循环发送流数据
    每一秒中向服务端发送流数据
    :return:
    """
    index = 0
    while True:
        index += 1
        time.sleep(1)
        str_random = str(random.random())
        yield pb2.test_client_requests(
            data=str_random
        )
        # 条件终止连接
        if index == 15:
            break



def run():
    # 定义一个频道 绑定ip、端口号
    conn = grpc.insecure_channel('127.0.0.1:50010')

    # 生成客户端
    client = pb2_grpc.grpc_testStub(channel=conn)

    # 传入参数获取返回值
    response = client.test_client(test(), timeout=10)       # timeout超出时长抛出异常

    for i in response:
        print(i)
if __name__ == '__main__':
    run()

结果:
在这里插入图片描述

七、错误码与服务端客户端发送与接收错误信息

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 00:30:35  更:2022-04-01 00:32:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/2 2:26:43-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码