一、grpc-python介绍
RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务。如果 http 就是一种远程调用的协议。这里提到了远程过程调用,那么对于远程就应该有本地过程调用那么什么是本地过程调用,也就是调用本地方法对本地的对象进行操作。 例如调用 Add 方法向列表中添加一个对象。而远程过程调用,向列表添加对象方法并不在本地,调用远程提供 Add() 来实现该操作。
gRPC 是一个现代开源的高性能远程过程调用 (RPC) 框架,可以在任何环境中运行。它可以通过对负载平衡、跟踪、健康检查和身份验证的可插拔支持有效地连接数据中心内和跨数据中心的服务。它也适用于分布式计算的最后一英里,将设备、移动应用程序和浏览器连接到后端服务。 其由Google主要面向移动应用开发并基于http/2协议标准而设计,基于ProtoBuf( Protocol Buffers)序列化协议开发,且支持众多开发语言(python,golang,javascript,c,c++,php,ruby等)。 gRPC提供了一种简单的方法来精确地定义服务和为ios,android和后台支持服务自动生成可靠性很强的客户端功能库。客户端充分利用高级流和链接功能,从而有助于节省带宽,降低的TCP链接次数,节省CPU使用、和电池寿命。
grpc与restful的对比
| gRPC | REST |
---|
full name(全名) | gRPC Remote Procedure Calls | Representational State Transfer | payload (有效载荷) | Protobuf | JSON | | unreadable binary data (不可读二进制流) | readable data(可读文件) | HTTP协议 | HTTP/2 | HTTP1.1/HTTP/2 | Performance(性能) | faster(较快) | 一般 | Safe (安全性) | Type Safe (更安全) | 一般 | Cross Language (跨语言) | 支持 | 支持 | clinet(是否需要客户端) | 需要一个 | 不需要 | mode(使用模式) | Any function | GET/POST/PUT/DELETE/… |
什么是protobuf?
- protobuf (protocol buffer) 是google提供一个具有高效的协议数据交换格式工具库(类似json),但相比于json,protobuf有更高的转化效率,时间效率和空间效率都是JSON的3-5倍数。
- 具有跨语言性: 支持 python golang java c++ javascript等等。
二、 环境准备
环境: Python3.8.5 Ubuntu20.04
grpc官网: https://www.grpc.io/ 官方操作文档: https://www.grpc.io/docs/languages/python/quickstart/
模块安装
安装必要模块包
pip install grpcio-tools
pip install grpc
1. 创建proto文件,以及参数详解
创建helloworld.proto 文件并添加以下代码:
syntax = "proto3";
package test;
service grpc_test {
rpc
Hello_grpc(Hello_grpc_req)
returns ( Hello_grpc_res){}
}
message Hello_grpc_req {
string name = 1;
int age = 2;
}
message Hello_grpc_res{
string result = 1;
}
2. 文件编译命令
使用 protoc 编译 helloworld.proto 文件, 生成 python 语言的实现
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto
python -m grpc_tools.protoc: python 下的 protoc 编译器通过 python 模块(module) 实现, 所以说这一步非常省心
--python_out=. : 编译生成处理 protobuf 相关的代码的路径, 这里生成到当前目录
--grpc_python_out=. : 编译生成处理 grpc 相关的代码的路径, 这里生成到当前目录
-I. helloworld.proto : proto 文件的路径, 这里的 proto 文件在当前目录
Makefile的使用
创建Makefile 文件 Makefile 内容:
proto:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. helloworld.proto
命令执行:
make proto
创建了Makefile文件, 命令"make proto " 和 "python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto "效果是一样的。
3. 编译后生成的代码
helloworld_pb2.py : 用来和 protobuf 数据进行交互
- 每一个message对应的信息存储,比如我们的request与response在这里被定义extension(延展)
helloworld_pb2_grpc.py : 用来和 grpc 进行交互
- 用来存储每一个服务的server与客户端以及注册server的工具
- 客户端名为: service_name + Stub
- 服务端名为: service_name + Service
- 注册服务为: add_服务端名_to_server
4. 创建服务端和客户端
服务端代码 server.py :
import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
from concurrent import futures
class grpc_test(pb2_grpc.grpc_testServicer):
def Hello_grpc(self, request, context):
name = request.name
age = request.age
result = f'my name is {name}, i am {age} years old'
return pb2.Hello_grpc_res(result=result)
def run():
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4)
)
pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)
grpc_server.add_insecure_port('0.0.0.0:6000')
print('server will start at 0.0.0.0:6000')
grpc_server.start()
try:
while 1:
time.sleep(3600)
except Exception as e:
grpc_server.stop(0)
print(e)
if __name__ == '__main__':
run()
客户端 client.py :
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
def run():
conn = grpc.insecure_channel('0.0.0.0:6000')
client = pb2_grpc.grpc_testStub(channel=conn)
response = client.Hello_grpc(pb2.Hello_grpc_req(
name='yanilo',
age=18
))
print(response.result)
if __name__ == '__main__':
run()
反回结果:
my name is yanilo, i am 18 years old
三、常用的protobuf数据类型
类型 | 说明 |
---|
string | 字符串类型,要求是utf-8或者7-bit与ascii编码的字符串 | bytes | 比特类型 | bool | 布尔类型 | int32 | 32整型 | int64 | 64位整型 | float | 浮点类型 | repeated | 数组(列表)repeated string data = 1; | map | 字典类型(python叫法) map<string,string> data = 1; |
四、常用的protobuf的特殊字符
类型 | 说明 |
---|
package | 包名称 | syntax | protobuf版本 | service | 定义服务 | rpc | 定义服务中的方法 | stream | 定义的方法传输为流传输 | message | 定义消息体 message User{} | extend | 扩展消息体 extend User{} | import | 导入一些插件 | // | 注释 |
五、不同数据类型示例
1. helloworld.proto:
syntax = "proto3";
package test;
service grpc_test {
rpc Hello_grpc(Hello_grpc_req) returns (Hello_grpc_res){}
rpc helle_test(hello_requests) returns (hello_response){}
}
message Hello_grpc_req {
string name = 1;
int32 age = 2;
}
message Hello_grpc_res{
string result = 1;
}
message hello_requests{
string name = 1;
int64 age = 2;
repeated string data = 3;
map<string,int64> number_1 = 4;
map<string,hello_requests_number_value> number = 5;
}
message hello_requests_number_value{
string name = 1;
int32 age = 2;
bool is_active = 3;
}
message hello_response{
string result = 1;
}
2. server.py:
import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
from concurrent import futures
class grpc_test(pb2_grpc.grpc_testServicer):
def Hello_grpc(self, request, context):
name = request.name
age = request.age
result = f'my name is {name}, i am {age} years old'
return pb2.Hello_grpc_res(result=result)
def helle_test(self, request, context):
name = request.name
age = request.age
data = request.data
number_1 = request.number_1
number= request.number
result = f"{name}\n{age}\n{data}\n{number_1}\n{number}"
return pb2.hello_response(result=result)
def run():
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4)
)
pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)
grpc_server.add_insecure_port('0.0.0.0:50010')
print('server will start at 0.0.0.0:50010')
grpc_server.start()
try:
while 1:
time.sleep(3600)
except Exception as e:
grpc_server.stop(0)
print(e)
if __name__ == '__main__':
run()
3. client.py:
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
def run():
conn = grpc.insecure_channel('127.0.0.1:50010')
client = pb2_grpc.grpc_testStub(channel=conn)
response = client.Hello_grpc(pb2.Hello_grpc_req(
name='yanilo',
age=18
))
print(response.result)
print('*****************')
response1 = client.helle_test(pb2.hello_requests(
name='yanilo',
age=100,
data=["qwq"],
number_1={'k': 11,'k2': 22,'k3': 33},
number={'k': pb2.hello_requests_number_value(
age=123,
name='yanilo_value',
is_active=True
)},
))
print(response1.result)
if __name__ == '__main__':
run()
4. 反回结果
服务端:
客户端:
六、grpc basic: 四种通信方式
1. unary 单程
- 两者都是非流形式,客服端一次请求, 服务器一次应答,(上面的不同数据类型示例就是非流形式)
- 客服端一次请求, 服务器一次应答
2. stream
单向流和双向流都是客户端与服务端建立长链接
- 单向流:
-
一种是客户端不停给服务端发送流数据,服务器端只负责接收; 客服端多次请求(流式), 服务器一次应答 -
一种是服务器端不停给客户端发送流数据,客户端只负责接收; 客服端一次请求, 服务器多次应答(流式) - 双向流: 客户端与服务端建立长链接,双方发送和接收流数据;
proto 中想要表示流式传输, 只需要在proto文件中服务(service )的rpc函数里添加 stream 关键字即可 例:
service grpc_test {
rpc
test_client_stream(test_client_stream_requests)
returns(stream test_client_stream_response){}
}
3. 单向流和双向流示例
3.1 单向流1
客户端每一秒给服务端端发送流数据,服务端接收流数据,但不传输流数据。 helloworld.proto :
syntax = "proto3";
package test;
service grpc_test {
rpc
test_client(stream test_client_requests)
returns( test_client_response){}
}
message test_client_requests{
string data = 1;
}
message test_client_response{
string result = 1;
}
执行命令:
make proto
server.py :
import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
from concurrent import futures
class grpc_test(pb2_grpc.grpc_testServicer):
def test_client(self, request_iterator, context):
index = 0
for request in request_iterator:
data = request.data
index += 1
result = f'receive {data},index:{index}'
print(result)
time.sleep(1)
if index == 10:
break
return pb2.test_client_response(result='ok')
def run():
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4)
)
pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)
grpc_server.add_insecure_port('0.0.0.0:50010')
print('server will start at 0.0.0.0:50010')
grpc_server.start()
try:
while 1:
time.sleep(3600)
except Exception as e:
grpc_server.stop(0)
print(e)
if __name__ == '__main__':
run()
client.py :
import random
import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
def test():
"""
循环发送流数据
每一秒中向服务端发送流数据
:return:
"""
index = 0
while True:
index += 1
time.sleep(1)
str_random = str(random.random())
yield pb2.test_client_requests(
data=str_random
)
if index == 15:
break
def run():
conn = grpc.insecure_channel('127.0.0.1:50010')
client = pb2_grpc.grpc_testStub(channel=conn)
response = client.test_client(test())
print(response)
if __name__ == '__main__':
run()
结果:
3.2 单向流2
服务器每一秒给客户端发送流数据,客户端接收流数据,但不传输流数据。 helloworld.proto :
syntax = "proto3";
package test;
service grpc_test {
rpc
test_client_stream(test_client_stream_requests)
returns(stream test_client_stream_response){}
}
message test_client_stream_requests{
string data = 1;
}
message test_client_stream_response{
string result = 1;
}
执行命令:
make proto
server.py :
import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
from concurrent import futures
class grpc_test(pb2_grpc.grpc_testServicer):
def test_client_stream(self, request, context):
index = 0
while context.is_active():
data = request.data
index += 1
result = f'send {data},index:{index}'
print(result)
time.sleep(1)
yield pb2.test_client_stream_response(result=result)
def run():
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4)
)
pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)
grpc_server.add_insecure_port('0.0.0.0:50010')
print('server will start at 0.0.0.0:50010')
grpc_server.start()
try:
while 1:
time.sleep(3600)
except Exception as e:
grpc_server.stop(0)
print(e)
if __name__ == '__main__':
run()
client.py :
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
def run():
conn = grpc.insecure_channel('127.0.0.1:50010')
client = pb2_grpc.grpc_testStub(channel=conn)
response = client.test_client_stream(pb2.test_client_stream_requests(
data='yanilo'
))
for i in response:
print(i.result)
if __name__ == '__main__':
run()
结果:
3.2 双向流
客服端和服务端都以流形式传输数据 helloworld.proto :
syntax = "proto3";
package test;
service grpc_test {
rpc
test_client(stream test_client_requests)
returns( stream test_client_response){}
}
message test_client_requests{
string data = 1;
}
message test_client_response{
string result = 1;
}
执行命令:
make proto
server.py :
import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
from concurrent import futures
class grpc_test(pb2_grpc.grpc_testServicer):
def test_client(self, request_iterator, context):
index = 0
for request in request_iterator:
data = request.data
index += 1
result = f'receive {data},index:{index}'
print(result)
time.sleep(1)
if index == 10:
break
yield pb2.test_client_response(result=result)
def run():
grpc_server = grpc.server(
futures.ThreadPoolExecutor(max_workers=4)
)
pb2_grpc.add_grpc_testServicer_to_server(grpc_test(), grpc_server)
grpc_server.add_insecure_port('0.0.0.0:50010')
print('server will start at 0.0.0.0:50010')
grpc_server.start()
try:
while 1:
time.sleep(3600)
except Exception as e:
grpc_server.stop(0)
print(e)
if __name__ == '__main__':
run()
client.py :
import random
import time
import grpc
import helloworld_pb2 as pb2
import helloworld_pb2_grpc as pb2_grpc
def test():
"""
循环发送流数据
每一秒中向服务端发送流数据
:return:
"""
index = 0
while True:
index += 1
time.sleep(1)
str_random = str(random.random())
yield pb2.test_client_requests(
data=str_random
)
if index == 15:
break
def run():
conn = grpc.insecure_channel('127.0.0.1:50010')
client = pb2_grpc.grpc_testStub(channel=conn)
response = client.test_client(test(), timeout=10)
for i in response:
print(i)
if __name__ == '__main__':
run()
结果:
七、错误码与服务端客户端发送与接收错误信息
|