实现效果
通过socket实现RPC,能够从客户端调用服务端的函数并返回结果
一、socket基础
在构建自己的RPC之前,我们需要先掌握Socket的一些用法,以及什么是RPC,下面分三块讲:分别是 1、什么是socket 2、socket模块 3、套接字对象内建方法 4、什么是PRC
1.什么是socket Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。
所以,我们无需深入理解tcp/udp协议,socket已经为我们封装好了,我们只需要遵循socket的规定去编程,写出的程序自然就是遵循tcp/udp标准的。 2.socket模块 要使用socket.socket()函数来创建套接字。其语法如下:
socket.socket(socket_family,socket_type,protocol=0)
socket_family可以是如下参数:
socket.AF_INET IPv4(默认)
socket.AF_INET6 IPv6
socket.AF_UNIX 只能够用于单一的Unix系统进程间通信
socket_type可以是如下参数:
socket.SOCK_STREAM 流式socket , for TCP (默认)
socket.SOCK_DGRAM 数据报式socket , for UDP
socket.SOCK_RAW 原始套接字,普通的套接字无法处理ICMP、IGMP等网络报文,而SOCK_RAW可以;其次,SOCK_RAW也可以处理特殊的IPv4报文;此外,利用原始套接字,可以通过IP_HDRINCL套接字选项由用户构造IP头。
socket.SOCK_RDM 是一种可靠的UDP形式,即保证交付数据报但不保证顺序。SOCK_RAM用来提供对原始协议的低级访问,在需要执行某些特殊操作时使用,如发送ICMP报文。SOCK_RAM通常仅限于高级用户或管理员运行的程序使用。
socket.SOCK_SEQPACKET 可靠的连续数据包服务
protocol参数:
0 (默认)与特定的地址家族相关的协议,如果是 0 ,则系统就会根据地址格式和套接类别,自动选择一个合适的协议
3.套接字对象内建方法 服务器端套接字函数
s.bind() 绑定地址(ip地址,端口)到套接字,参数必须是元组的格式例如:s.bind((‘127.0.0.1’,8009))
s.listen(5) 开始监听,5为最大挂起的连接数
s.accept() 被动接受客户端连接,阻塞,等待连接
客户端套接字函数
s.connect() 连接服务器端,参数必须是元组格式例如:s.connect((‘127,0.0.1’,8009))
公共用途的套接字函数
s.recv(1024) 接收TCP数据,1024为一次数据接收的大小
s.send(bytes) 发送TCP数据,python3发送数据的格式必须为bytes格式
s.sendall() 完整发送数据,内部循环调用send
s.close() 关闭套接字
4.什么是RPC
远程过程调用协议RPC(Remote Procedure Call Protocol)-----允许像调用本地服务一样调用远程服务。
二、代码实现
import json
import socket
funs = {}
def register_function(func):
"""Server端方法注册,Client端只可调用被注册的方法"""
name = func.__name__
funs[name] = func
class TCPServer(object):
def __init__(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket = None
def bind_listen(self, port):
self.sock.bind(('0.0.0.0', port))
self.sock.listen(5)
def accept_receive_close(self):
"""获取Client端信息"""
if self.client_socket is None:
(self.client_socket, address) = self.sock.accept()
if self.client_socket:
msg = self.client_socket.recv(1024)
data = self.on_msg(msg)
self.client_socket.send(data)
class RPCStub(object):
def __init__(self):
self.data = None
def call_method(self, data):
"""解析数据,调用对应的方法变将该方法执行结果返回"""
if len(data) == 0:
return json.dumps("something wrong").encode('utf-8')
self.data = json.loads(data.decode('utf-8'))
method_name = self.data['method_name']
method_args = self.data['method_args']
method_kwargs = self.data['method_kwargs']
res = funs[method_name](*method_args, **method_kwargs)
data = res
return json.dumps(data).encode('utf-8')
class RPCServer(TCPServer, RPCStub):
def __init__(self):
TCPServer.__init__(self)
RPCStub.__init__(self)
def loop(self, port):
self.bind_listen(port)
print('Server listen 5003 ...')
while True:
try:
self.accept_receive_close()
except Exception:
self.client_socket.close()
self.client_socket = None
continue
def on_msg(self, data):
return self.call_method(data)
@register_function
def add(a, b, c=10):
sum = a + b + c
print(sum)
return sum
@register_function
def setData(data):
print(data)
return data
s = RPCServer()
s.loop(5003)
import json
import socket
class TCPClient(object):
def __init__(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def connect(self, host, port):
"""链接Server端"""
self.sock.connect((host, port))
def send(self, data):
"""将数据发送到Server端"""
self.sock.send(data)
def recv(self, length):
"""接受Server端回传的数据"""
return self.sock.recv(length)
class RPCStub(object):
def __getattr__(self, function):
def _func(*args, **kwargs):
d = {'method_name': function, 'method_args': args, 'method_kwargs': kwargs}
self.send(json.dumps(d).encode('utf-8'))
data = self.recv(1024)
return data.decode('utf-8')
setattr(self, function, _func)
return _func
class RPCClient(TCPClient, RPCStub):
pass
c = RPCClient()
c.connect('127.0.0.1', 5003)
print(c.add(1, 2, 3))
print(c.setData({"sss": "ssss", "list": [1, 2, 3, 4]}))
服务端: 1、服务端对“本机的5003”端口进行监听 2、对服务端的方法进行注册“register_function” 3、对客户端传送过来的数据进行解析,查询对应的函数并将参数传入,返回给客户端 客户端: 1、客户端对服务端进行连接 2、将调用的方法名、参数传入服务端,服务端将数据进行返回 两者之间的数据采用Json文本格式
三、运行结果
服务端结果: 客户端结果: 此时我们如果再运行一次客户端
服务端结果: 客户端结果:
四、注意点
如果连接断开了(这里指客户端断开连接,如果服务端断开了,那只能重启了),需要重新创建套接字socket,重新连接 整个RPC的流程: 创建socket->建立连接->进行消息交互->关闭socket
五、后续扩充
1、在 S-C之间保持连接的时候,为了防止意外断连,可以采用心跳包的形式,每隔一段时间进行一次心跳包同步,如果断连了就重新进行socket连接 2、服务端可以将socket扩充为list,并利用address进行区分,实现多个客户端连接
六、参考文章
本文是参考下面文章整合并对代码加以改进写的 socket基础 RPC原文参考 参考二
|