半包粘包
什么是半包与粘包问题?
粘包问题是指当发送两条消息时,比如发送了 ABC 和 DEF,但另一端接收到的却是 ABCD,像这种一次性读取了两条数据的情况就叫做粘包(正常情况应该是一条一条读取)。 半包问题是指,当发送的消息是 ABC 时,另一端却接收到的是 AB 和 C 两条信息,像这种情况就叫做半包。
package socket;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
public class ClientSocket {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 9999);
final String message = "Hi,Java.";
try (OutputStream outputStream = socket.getOutputStream()) {
for (int i = 0; i < 10; i++) {
outputStream.write(message.getBytes());
}
}
}
}
package socket;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class ServeSocket {
private static final int BYTE_LENGTH = 20;
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9999);
Socket clientSocket = serverSocket.accept();
try (InputStream inputStream = clientSocket.getInputStream()) {
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[BYTE_LENGTH];
int count = inputStream.read(bytes, 0, BYTE_LENGTH);
if (count > 0) {
System.out.println("接收到客户端的信息是:" + new String(bytes));
}
count = 0;
}
}
}
}
为什么会有粘包和半包问题?
因为 TCP 是面向连接的传输协议,TCP 传输的数据是以流的形式, 而流数据是没有明确的开始结尾边界,所以 TCP 也没办法判断哪一段流属于一个消息。 粘包的主要原因: 发送方每次写入数据 < 套接字(Socket)缓冲区大小; 接收方读取套接字(Socket)缓冲区数据不够及时。 半包的主要原因: 发送方每次写入数据 > 套接字(Socket)缓冲区大小; 发送的数据大于协议MTU (Maximum Transmission Unit最大传输单元),因此必须拆包。
怎么做可以解决粘包和半包问题?
- 发送方和接收方规定固定大小的缓冲区,也就是发送和接收都使用固定大小的byte[]数组长度,当字符长度不够时使用空字符弥补;
package socket;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
public class ClientSocketV1 {
private static final int BYTE_LENGTH = 1024;
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 9091);
final String message = "Hi,Java.";
try (OutputStream outputStream = socket.getOutputStream()) {
byte[] bytes = new byte[BYTE_LENGTH];
int idx = 0;
for (byte b : message.getBytes()) {
bytes[idx] = b;
idx++;
}
for (int i = 0; i < 10; i++) {
outputStream.write(bytes, 0, BYTE_LENGTH);
}
}
}
}
package socket;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class ServeSocketV1 {
private static final int BYTE_LENGTH = 1024;
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9091);
Socket clientSocket = serverSocket.accept();
try (InputStream inputStream = clientSocket.getInputStream()) {
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[BYTE_LENGTH];
int count = inputStream.read(bytes, 0, BYTE_LENGTH);
if (count > 0) {
System.out.println("接收到客户端的信息是:" + new String(bytes).trim());
}
count = 0;
}
}
}
}
- 在 TCP 协议的基础上封装一层数据请求协议,既将数据包封装成数据头(存储数据正文大小)+ 数据正文的形式,这样在服务端就可以知道每个数据包的具体长度了,知道了发送数据的具体边界之后,就可以解决半包和粘包的问题了;
import datetime
import json
import socket
import struct
import asyncore
import threading
import ctypes
import time
ESME_ROK = 0x00000000
bind_transceiver = 0x00000009
bind_transceiver_resp = 0x80000009
enquire_link = 0x00000015
enquire_link_resp = 0x80000015
submit_sm = 0x00000004
submit_sm_resp = 0x80000004
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return str(obj, encoding='utf-8')
return json.JSONEncoder.default(self, obj)
class SMPPSocket(asyncore.dispatcher):
def __init__(self, sock, usr='admin', pwd='abcdef'):
super(SMPPSocket, self).__init__(sock)
self.send_buff = b''
self.read_buff = b''
self.read_flag = True
self.usr = usr
self.pwd = pwd
self.SMPP_DIC = {}
def auto_read(self, str_name, str_type, idx):
if str_type == '':
temp = b''
while struct.unpack('!s', self.read_buff[idx:idx + 1])[0] != b'\x00':
temp += struct.unpack('!s', self.read_buff[idx:idx + 1])[0]
idx += 1
self.SMPP_DIC[str_name] = temp.decode()
return idx + 1
if str_type == '!B':
self.SMPP_DIC[str_name] = struct.unpack(str_type, self.read_buff[idx:idx + 1])[0]
return idx + 1
if str_type == 'msg':
self.SMPP_DIC[str_name] = (struct.unpack('!'+str(self.SMPP_DIC['sm_length'])+'s', self.read_buff[idx:idx + self.SMPP_DIC['sm_length']])[0]).decode()
return idx+1
def msg_dic(self, type_list, name_list):
idxs = 16
for i in range(len(type_list)):
idxs = self.auto_read(name_list[i], type_list[i], idxs)
new_dic = {'MSISDN': self.SMPP_DIC['Destination_addr'],
'MsgContent': self.SMPP_DIC['short_message'],
'Sender': self.SMPP_DIC['source_addr'],
'DateCoding': self.SMPP_DIC['data_coding'],
'PriorityFlag': self.SMPP_DIC['priority_flag'],
'RegisteredDelivery': self.SMPP_DIC['registered_delivery'],
'DestaddrNpi': self.SMPP_DIC['dest_addr_npi'],
'DestaddrTon': self.SMPP_DIC['dest_addr_ton'],
'SourceaddrNpi': self.SMPP_DIC['source_addr_npi'],
'SourceaddrTon': self.SMPP_DIC['source_addr_ton'],
'ServiceType': self.SMPP_DIC['service_type'],
'ScheduleDeliveryTime': self.SMPP_DIC['schedule_delivery_time'],
'ValidityPeriod': self.SMPP_DIC['validity_period'],
'Time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'sar_msg_ref_num': '',
'sar_total_segments': '',
'sar_segment_seqnum': ''}
return new_dic
def readable(self) -> bool:
return self.read_flag
def handle_read(self):
self.read_flag = False
try:
if len(self.read_buff) < 16:
self.read_buff += self.recv(16 - len(self.read_buff))
self.read_flag = True
return
elif len(self.read_buff) < struct.unpack('!I', self.read_buff[0:4])[0]:
self.read_buff += self.recv(struct.unpack('!I', self.read_buff[0:4])[0] - len(self.read_buff))
if len(self.read_buff) < struct.unpack('!I', self.read_buff[0:4])[0]:
self.read_flag = True
return
except BlockingIOError:
self.read_flag = True
return
command_length = struct.unpack('!I', self.read_buff[0:4])[0]
command_id = struct.unpack('!I', self.read_buff[4:8])[0]
sequence_number = struct.unpack('!I', self.read_buff[12:16])[0]
print('get new message whose command_id and len are ', command_id, command_length)
if command_id == bind_transceiver:
print('bind_transceiver')
i = 16
username = b''
while struct.unpack('!s', self.read_buff[i:i + 1])[0] != b'\x00':
username += struct.unpack('!s', self.read_buff[i:i + 1])[0]
i += 1
i += 1
password = b''
while struct.unpack('!s', self.read_buff[i:i + 1])[0] != b'\x00':
password += struct.unpack('!s', self.read_buff[i:i + 1])[0]
i += 1
typist = ['!I', '!I', '!I', '!I', '!' + str(len(username)+1) + 's']
values = [16 + len(username) + 1, bind_transceiver_resp, 0x00000000, sequence_number, username + b'\x00']
if username.decode() != self.usr or password.decode() != self.pwd:
values[2] = 0x0000000E
print('username and password are wrong')
else:
print('username and password are right')
for i in range(len(typist)):
self.send_buff += struct.pack(typist[i], values[i])
elif command_id == enquire_link:
print('enquire_link')
typist = ['!I', '!I', '!I', '!I']
values = [16, enquire_link_resp, ESME_ROK, sequence_number]
for i in range(len(typist)):
self.send_buff += struct.pack(typist[i], values[i])
elif command_id == submit_sm:
print('submit_sm')
type_list = ['', '!B', '!B', '', '!B', '!B', '', '!B', '!B', '!B', '', '', '!B', '!B', '!B', '!B', '!B', 'msg']
name_list = ['service_type', 'source_addr_ton', 'source_addr_npi', 'source_addr', 'dest_addr_ton',
'dest_addr_npi', 'Destination_addr', 'esm_class', 'protocol_id', 'priority_flag',
'schedule_delivery_time', 'validity_period', 'registered_delivery', 'replace_if_present_flag',
'data_coding', 'sm_default_msg_id', 'sm_length', 'short_message']
tmp_dic = self.msg_dic(type_list, name_list)
print(tmp_dic)
typist = ['!I', '!I', '!I', '!I', '!2s']
values = [18, submit_sm_resp, 0x00000000, sequence_number, '7'.encode() + bytes(0)]
for i in range(len(typist)):
self.send_buff += struct.pack(typist[i], values[i])
else:
try:
tmp = self.recv(1024)
except BlockingIOError:
pass
self.read_buff = b''
self.read_flag = True
def writable(self) -> bool:
return len(self.send_buff) > 0
def handle_write(self) -> None:
sent = self.send(self.send_buff)
self.send_buff = self.send_buff[sent:]
def handle_close(self) -> None:
self.close()
class SMPPReactor(asyncore.dispatcher):
def __init__(self, host, port, usr='admin', pwd='abcdef'):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self.handle_list = []
self.usr = usr
self.pwd = pwd
def handle_accepted(self, sock, adds):
print('Incoming connection from %s' % repr(adds))
self.handle_list.append(SMPPSocket(sock, self.usr, self.pwd))
print(self.handle_list[-1])
def handle_close(self) -> None:
for i in range(len(self.handle_list)):
self.handle_list[i].close()
self.close()
class SMPPServer(threading.Thread):
cnt = 0
def __init__(self, ip, port, usr='admin', pwd='abcdef'):
super(SMPPServer, self).__init__()
self.ip = ip
self.port = int(port)
self.smpp = None
self.thread = None
self.usr = usr
self.pwd = pwd
def start(self):
self.smpp = SMPPReactor(self.ip, self.port, self.usr, self.pwd)
SMPPServer.cnt += 1
print('SMPP', SMPPServer.cnt, 'start success')
if SMPPServer.cnt == 1:
self.thread = threading.Thread(target=asyncore.loop, kwargs={'use_poll': True})
self.thread.start()
def stop(self):
self.smpp.handle_close()
print('SMPP', SMPPServer.cnt, 'stop success')
SMPPServer.cnt -= 1
smppserver1 = SMPPServer('127.0.0.1', 10703, 'admin', 'abcdef')
smppserver1.start()
time.sleep(10)
smppserver1.stop()
import asyncore
import socket
import struct
ESME_ROK = 0x00000000
bind_transceiver = 0x00000009
bind_transceiver_resp = 0x80000009
enquire_link = 0x00000015
enquire_link_resp = 0x80000015
submit_sm = 0x00000004
submit_sm_resp = 0x80000004
class HTTPClient(asyncore.dispatcher):
def __init__(self, host):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((host, 10703))
self.buffer = b''
typist = ['!I', '!I', '!I', '!I', '!5s', '!7s', '!13s', '!B', '!B', '!B', '!2s']
values = [46, bind_transceiver, 0, 4, b'root' + bytes(0), b'abcdba' + bytes(0), b'1234567890123', 1, 2, 3, b'2' + bytes(0)]
for i in range(len(typist)):
self.buffer += struct.pack(typist[i], values[i])
typist = ['!I', '!I', '!I', '!I']
values = [16, enquire_link, 0, 4]
for i in range(len(typist)):
self.buffer += struct.pack(typist[i], values[i])
typist = ['!I', '!I', '!I', '!I', '!2s', '!B', '!B', '!2s', '!B', '!B', '!2s', '!B', '!B', '!B', '!s', '!s',
'!B', '!B', '!B', '!B', '!B', '!2s']
values = [38, submit_sm, 0, 4, b'1' + bytes(0), 0, 0, b'1' + bytes(0), 0, 0, b'1' + bytes(0), 0, 0, 0, bytes(0),
bytes(0), 0, 0, 0, 0, 0, b'1' + bytes(0)]
for i in range(len(typist)):
self.buffer += struct.pack(typist[i], values[i])
def handle_connect(self):
print('connect success')
def handle_close(self):
self.close()
print('close success')
def readable(self) -> bool:
return True
def handle_read(self):
print(self.recv(8192))
def writable(self):
return len(self.buffer) > 0
def handle_write(self):
import time
sent = self.send(self.buffer)
self.buffer = self.buffer[sent:]
client = HTTPClient('127.0.0.1')
asyncore.loop()
- 以特殊的字符结尾,比如以“\n”结尾,这样我们就知道结束字符,从而避免了半包和粘包问题(推荐解决方案)。
package socket;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.Socket;
public class ClientSocketV3 {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 9092);
final String message = "Hi,Java.";
try (BufferedWriter bufferedWriter = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream()))) {
for (int i = 0; i < 10; i++) {
bufferedWriter.write(message + "\n");
bufferedWriter.flush();
}
}
}
}
package socket;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ServeSocketV3 {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9092);
Socket clientSocket = serverSocket.accept();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150,
100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));
threadPool.submit(() -> {
processMessage(clientSocket);
});
}
private static void processMessage(Socket clientSocket) {
try (BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()))) {
for (int i = 0; i < 10; i++) {
String msg = bufferedReader.readLine();
if (msg != null) {
System.out.println("接收到客户端的信息:" + msg);
}
}
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
阻塞模型
比喻
A同学用杯子装水,打开水龙头装满水然后离开,因为如果水龙头没有水,他也要等到有水并装满杯子才能离开去做别的事情(反正就是要装满水)。
场景
- 应用进程recvfrom执行系统调用,然后阻塞进入等待队列
- 内核收到系统调用,此时无数据报准备好,等待数据
- 数据报准备好了,内核就将数据从内核复制到用户空间
- 内核复制完成后返回成功指示
- 应用进程被唤醒,进入就绪队列,等待CPU分片处理数据报
函数使用
- 输入操作:read、readv、recv、recvfrom、recvmsg共5个函数,如果设置为阻塞状态,则会经过wait data和copy data两个阶段,如果设置为非阻塞则在wait 不到data时抛出异常
- 输出操作:write、writev、send、sendto、sendmsg共5个函数,在发送缓冲区满了会阻塞在原地,如果设置为非阻塞,则会抛出异常
- 接收外来链接:accept,与输入操作类似
- 发起外出链接:connect,与输出操作类似
read与recv
- read:数据在不超过指定的长度的时候有多少读多少,没有数据则会一直等待。所以一般情况下:我们读取数据都需要采用循环读的方式读取数据,因为一次read 完毕不能保证读到我们需要长度的数据,read 完一次需要判断读到的数据长度再决定是否还需要再次
- recv:recv(sockfd, buff, buff_size, MSG_WAITALL)中有一个MSG_WAITALL 的参数,正常情况下recv 是会等待直到读取到buff_size 长度的数据,但是这里的WAITALL 也只是尽量读全,在有中断的情况下recv 还是可能会被打断,造成没有读完指定的buff_size的长度。所以即使是采用recv + WAITALL 参数还是要考虑是否需要循环读取的问题,在实验中对于多数情况下recv (使用了MSG_WAITALL)还是可以读完buff_size,所以相应的性能会比直接read 进行循环读要好一些。
问题与改进
问题一:实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。 改进一:在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。 问题二:开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。 改进二:很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。 问题三:“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。 改进三:使用非阻塞接口
阻塞代码
import socket
server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind(('127.0.0.1',6666))
server.listen()
sock,addr=server.accept()
print('连接成功,recv阻塞等待收到数据')
data=sock.recv(1024).decode()
print('收到数据{},结束程序'.format(data))
sock.send(b'hello'+data.encode())
server.close()
sock.close()
import socket
client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1',6666))
input_str=input()
client.send(input_str.encode())
print(client.recv(1024).decode())
client.close()
多客户端(多线程)代码
非阻塞模型
比喻
B同学也用杯子装水,打开水龙头后发现没有水,它离开了,过一会他又拿着杯子来看看。在中间离开的这些时间里,B同学离开了装水现场,可以做他自己的事情,这就是非阻塞IO模型。但是它只有是检查无数据的时候是非阻塞的,在数据到达的时候依然要等待复制数据到用户空间(等着水将水杯装满),因此它还是同步IO。
场景
- 应用进程recvfrom执行系统调用(不是默认参数),如果内核无数据报准备好则马上返回EWOULDBLOCK
- 应用进程反复调用recvfrom等待返回成功指示(轮询),期间是非阻塞的可以有其他操作
- 内核这边就在等待数据准备好(从网卡接收或从磁盘读取到内核空间),准备好了就将数据从内核复制到用户空间
- 在复制的期间,由于没有返回EWOULDBLOCK,所以应用进程阻塞
- 应用进程收到成功指示,#处理数据报
代码
import time
import socket
server = socket.socket()
server.bind(('127.0.0.1', 8083))
server.listen(5)
server.setblocking(False)
r_list = []
w_list = {}
del_rlist = []
del_wlist = []
while 1:
try:
conn, addr = server.accept()
r_list.append(conn)
except BlockingIOError:
time.sleep(1)
print('这一秒在做其他的事情')
print('rlist: ', len(r_list))
for conn in r_list:
try:
data = conn.recv(1024)
if not data:
conn.close()
del_rlist.append(conn)
continue
w_list[conn] = data.upper()
except BlockingIOError:
continue
except ConnectionResetError:
conn.close()
del_rlist.append(conn)
for conn in del_rlist:
r_list.remove(conn)
del_rlist.clear()
print('wlist: ', len(w_list))
for conn, data in w_list.items():
try:
conn.send(data)
del_wlist.append(conn)
except BlockingIOError:
continue
for conn in del_wlist:
w_list.pop(conn)
del_wlist.clear()
import os
import time
import socket
import threading
client = socket.socket()
client.connect(('127.0.0.1', 8083))
while 1:
client.send('hello'.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
缺点
- 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
- 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
IO多路复用
比喻
在调用recv前先调用select或者poll,这2个系统调用都可以在内核准备好数据(网络数据到达内核)时告知用户进程,这个时候再调用recv一定是有数据的。因此这一过程中它是阻塞于select或poll,而没有阻塞于recv,而非阻塞IO定义在是读写操作时没有阻塞于系统调用的IO操作(不包括数据从内核复制到用户空间时的阻塞,因为这相对于网络IO来说确实很短暂),如果按这样理解,这种IO模型也能称之为非阻塞IO模型,但是按POSIX来看,它也是同步IO,故称之为同步非阻塞IO。
这种IO模型比较特别,分个段。因为它能同时监听多个文件描述符(fd)。这个时候C同学来装水,发现有一排水龙头,舍管阿姨告诉他这些水龙头都还没有水,等有水了告诉他。于是等啊等(select调用中),过了一会阿姨告诉他有水了,但不知道是哪个水龙头有水,自己看吧。于是C同学一个个打开,往杯子里装水(recv)。这里再顺便说说鼎鼎大名的epoll(高性能的代名词啊),epoll也属于IO复用模型,主要区别在于舍管阿姨会告诉C同学哪几个水龙头有水了,不需要一个个打开看(当然还有其它区别)。
场景
- 应用进程select系统调用,等待可能多个套接字中的任一个变为可读(注意这里没有阻塞于系统调用的io操作select,即非阻塞)
- 内核等待数据,待数据报准备好后返回可读条件
- 应用进程收到可读条件,进行Recvfrom系统调用,并且阻塞
- 内核将数据从内核复制到用户空间,返回成功指示
- 应用进程收到成功指示,处理数据报
代码
import socket
import select
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 8093))
server.listen(5)
server.setblocking(False)
rlist = [server, ]
rdata = {}
wlist = []
wdata = {}
print('预备!监听!!!')
count = 0
while True:
rl, wl, xl = select.select(rlist, wlist, [], 0.5)
print('%s 次数>>' % count, wl)
count = count + 1
for sock in rl:
if sock == server:
conn, addr = sock.accept()
rlist.append(conn)
else:
try:
data = sock.recv(1024)
if not data:
sock.close()
rlist.remove(sock)
continue
print("received {0} from client {1}".format(data.decode(), sock))
rdata[sock] = data.decode()
wdata[sock] = data.upper()
wlist.append(sock)
except Exception:
sock.close()
rlist.remove(sock)
for sock in wl:
sock.send(wdata[sock])
wlist.remove(sock)
wdata.pop(sock)
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8093))
while True:
msg = input('>>: ').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
import socket
import selectors
sel = selectors.DefaultSelector()
def accept(server_fileobj, mask):
conn, addr = server_fileobj.accept()
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
try:
data = conn.recv(1024)
if not data:
print('closing', conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper())
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()
server_fileobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_fileobj.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_fileobj.bind(('127.0.0.1', 8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False)
sel.register(server_fileobj, selectors.EVENT_READ,accept)
while True:
events = sel.select()
for sel_obj, mask in events:
callback = sel_obj.data
callback(sel_obj.fileobj, mask)
import socket
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c.connect(('127.0.0.1', 8088))
while True:
msg = input('>>: ')
if not msg:
continue
c.send(msg.encode('utf-8'))
data = c.recv(1024)
print(data.decode('utf-8'))
import select
import socket
response = b''
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
serversocket.setblocking(0)
epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)
try:
connections = {}
requests = {}
responses = {}
while True:
events = epoll.poll(1)
for fileno, event in events:
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
print('client connected:', address)
connection.setblocking(0)
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
elif event & select.EPOLLIN:
print("------recvdata---------")
requests[fileno] += connections[fileno].recv(1024)
if not requests[fileno]:
connections[fileno].close()
del connections[fileno]
del requests[connections[fileno]]
print(connections, requests)
epoll.modify(fileno, 0)
else:
epoll.modify(fileno, select.EPOLLOUT)
print('-' * 40 + '\n' + requests[fileno].decode())
elif event & select.EPOLLOUT:
print("--send data--")
byteswritten = connections[fileno].send(requests[fileno])
requests[fileno] = requests[fileno][byteswritten:]
if len(requests[fileno]) == 0:
epoll.modify(fileno, select.EPOLLIN)
elif event & select.EPOLLHUP:
print("end hup------")
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
中断
可编程中断控制器:
硬中断:与当前正在运行的程序无必然关系,如打印,鼠标,键盘 软中断:与当前正在运行的程序有必然联系,如除零,系统调用中断(向量值0x80),网卡中断(向量值15) 8259A中断控制器可以统一管理所有外部中断信号并根据优先级过筛选后与CPU通信 中断请求寄存器就是当前哪些引脚有中断信号 优先级解析器就是判断哪些中断是优先的保留其标志 正在服务的寄存器是保存如键盘IRQ1这样的标志,如果是可抢占的话可以会被抢占 INTR引脚传入的就是中断触发信号,CPU会利用其他引脚进行数据传输,识别哪类中断 RAM内存分用户空间(用户空间的每个进程有内核态堆栈,进程标识符中有两个指针分别指向两个堆栈)与内核空间(共同的) IRQ中断程序入口映射表在内核启动阶段完成加载,是统一不变的,只能在内核态访问
缓冲区:
socket由三部分组成:读缓冲,写缓冲,等待队列 socket.close()时如果读缓冲区有数据就会丢,写缓冲区有数据则会继续发送完
阻塞型IO写:
系统调用的过程:
用户进程对fd申请write系统调用(向量值0x80是十六进制就是图中128)发送数据,把用户空间的数据从寄存器拷贝到内核空间中 (切换内核态:保存寄存器到进程标识符,改变寄存器(内核等级标记,内核堆栈地址,内核代码地址)) 假设现在TCPIP正在使用写缓冲区(即正在发送数据,会自动加锁) 此时用户进程不能写入socket写缓冲区,会阻塞(该进程加入该socket等待队列) TCPIP使用完后,唤醒等待队列中的第一个进程 进程看写缓冲区大小是否小于要发送的数据大小,假设现在yes 设要发送10B,但写缓冲区仅5B,则先把从用户空间中传过来的数据(当前在内核空间)前5B写入socket写缓冲区,等待网卡发送 等待期间同上阻塞,网卡发送完5B数据后唤醒,再写入后5B,切换回用户态(用户堆栈与等级标记,从进程标记符读寄存器值)
非阻塞IO写:
用户进程对fd申请write系统调用,把用户空间的数据从寄存器拷贝到内核空间中 假设现在TCPIP正在使用写缓冲区(即正在发送数据,会自动加锁)或者写缓冲区已满(可能对端读缓冲区也满了但一直未读) 此时用户进程不能写入socket写缓冲区,直接报错BlockingError不能立即完成一个非阻塞性套接字操作并返回-1,代码实现时要try掉 进程看写缓冲区大小是否小于要发送的数据大小,假设现在yes 设要发送10B,但写缓冲区仅5B,则先把从用户空间中传过来的数据前5B写入socket写缓冲区,并立即返回5,完成 至于后面用户是否还要发后5B,或者说何时发,都由用户决定,反正这里不再阻塞 (同步异步是说用户态与内核态传输是否等待,等就同,不等就异,这里把用户空间写入socket写缓冲区是等的,所以是同步) (阻塞不阻塞是说网卡收发时是否等待,等就阻,不等就不阻,这里把把用户空间写入socket写缓冲区之后是不再等的,所以非阻)
阻塞型IO读:
bio通信底层原理: 假设已建立socket连接,并获得socket在内核空间中的fd 用户进程对fd进行系统调用recv接收数据,请求写入自定义缓冲区 用户态转内核态,假设现在内核发现对应socket下读缓冲区为空 把当前进程写入socket的等待队列中,挂起当前进程 网卡收到的报文通过内嵌的DMA把数据写入内存中的网卡缓冲区 DMA写入后,网卡向CPU发起IRQ中断 CPU把寄存器存在当前进程的文件标识符中,用户态转内核态等级标记,用户堆栈转内核堆栈 CPU从中断控制器读到中断向量,查IRQ中断程序入口映射表得到中断程序代码段地址 CPU进入中断开始处理,把网卡缓冲区收到的TCP/IP报文写入对应socket的读缓冲中 socket的读缓冲写入数据后,会激活等待队列中的进程 进程回到工作/就绪队列,等待CPU分配 进程获得CPU,此时是内核态 内核发现对应socket下读缓冲区有数据,把数据拷贝到内核堆栈 内核态转换为用户态,并通过寄存器返回传值给用户态中自定义的缓冲区 (这里 是传值,用户态不能读内核态数据) (如果自定义的缓冲区大小较小,则需要循环拷贝完,拷贝时是阻塞) (如果自定义的缓冲区大小较大,则read是有多少读多少,拷贝时是阻塞)
非阻塞IO读:
假设已建立socket连接,并获得socket在内核空间中的fd 用户进程对fd进行系统调用recv接收数据,请求写入自定义缓冲区 用户态转内核态,假设现在内核发现对应socket下读缓冲区为空 接报错BlockingError不能立即完成一个非阻塞性套接字操作并返回-1,代码实现时要try掉 内核发现对应socket下读缓冲区有数据,把数据拷贝到内核堆栈 内核态转换为用户态,并通过寄存器返回传值给用户态中自定义的缓冲区 (如果自定义的缓冲区大小较小,则需要循环拷贝完,拷贝时是阻塞) (如果自定义的缓冲区大小较大,则read是有多少读多少,拷贝时是阻塞)
select应用
select原理
- 当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
- select对比blocking IO其实并没有太大的不同,事实上还更差一些。因为它不仅阻塞了还多需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom),当只有一个连接请求的时候,这个模型还不如阻塞IO效率高。
- 用select的优势在于它可以同时处理多个connection,而阻塞IO那里不能,我不管阻塞不阻塞,你所有的连接包括recv等操作,我都帮你监听着(select/poll/epoll三种形式),其中任何一个有变动(有链接,有数据),我就告诉你用户,那么你就可以去调用这个数据了,这就是他的NB之处。
- 这个IO多路复用模型机制是操作系统帮我们提供的,在windows上有这么个机制叫做select,那么如果我们想通过自己写代码来控制这个机制或者自己写这么个机制,我们可以使用python中的select模块来完成上面这一系列代理的行为。
- 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
- 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是整个用户的process其实是一直被block的(select的timeout置-1时可以这么说)。只不过process是被select这个函数block(常规写法是while死循环下select指定timeout时间后for遍历fd表,所以可视为一直select),而不是被IO(recv)给block。
- 优点:相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
- select做得事情和第二阶段的阻塞(就是从内核态将数据拷贝到用户态的阻塞)没有关系,它始终帮你做得监听的工作,帮你节省了一些第一阶段阻塞的时间。
- 缺点:首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
- 缺点:该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。
- 缺点:每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
- 缺点:同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
- 缺点:select支持的文件描述符数量太小了,默认是1024
- 使用copy_from_user从用户空间拷贝fdset到内核空间
- 注册回调函数__pollwait
- 遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)
- 以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
- __pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll 来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数 据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
- poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
- 如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
- 把fd_set从内核空间拷贝到用户空间。
- 用户进程创建socket对象,拷贝监听的fd(bitmap)到内核空间,每一个bit会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;(具体来说是socket读缓冲有数据把等待队列的进程唤醒到就绪队列,再争夺CPU进行操作)
- 用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。
- select机制: Windows、Linux
- poll机制 : Linux和select监听机制一样,但是对监听列表里面的数量没有限制,select默认限制是1024个,但是他们两个都是操作系统轮询每一个被监听的文件描述符(如果数量很大,其实效率不太好),看是否有可读操作。
- epoll机制 : Linux它的监听机制和上面两个不同,他给每一个监听的对象绑定了一个回调函数,你这个对象有消息,那么触发回调函数给用户,用户就进行系统调用来拷贝数据,并不是轮询监听所有的被监听对象,这样的效率高很多。
- 理解完IO复用后,我们在来看下实现IO复用中的三个API(select、poll和epoll)的区别和联系:select,poll,epoll都是IO多路复用的机制,I/O多路复用就是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知应用程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。三者的原型如下所示:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
python中的select模块:
import select
fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
参数: 可接受四个参数(前三个必须)
rlist: wait until ready for reading
wlist: wait until ready for writing
xlist: wait for an “exceptional condition”
timeout: 超时时间
当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
返回值:三个列表与上面的三个参数列表是对应的
- select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表
1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中
2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中
3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中
4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化
poll介绍
- poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。
- poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,相比处理fdset来说,poll效率更高。poll返回后,需要对pollfd中的每个元素检查其revents值,来得指事件是否发生。
epoll应用
epoll原理
- 假设现在创建了一个epoll(eventpoll),epfd=6,监听事件列表有三个socket,fd为3,4,5,用红黑树管理,就绪队列(列表)与等待队列为空
用户进程执行epoll_wait,转内核态,发现绪队列为空,用户进程进入epoll等待队列,epoll进入三个socket的等待队列 - 某个socket的数据到了,epoll从三个socket的等待队列中出队,就绪socket对应epoll_event进入epoll就绪队列,用户进程从epoll等待队列中出队
- 用户进程获得CPU,内核态中的就绪列表拷贝给用户态中传入的就绪列表数组(个人感觉叫就绪队列比较好,因为有个参数可以设置取出前面指定数量个epoll_event的)
- epoll有自己的fd(万物皆文件),由事件列表/就绪列表/等待队列三个部分组成,有create/ctl/wait三个函数
-
事件列表是存储要监听的epoll_event -
就绪列表是存储已触发的epoll_event(图6-4,是从epoll_event里面的fd取出socket的,图6-3,eventpoll离开等待队列后,有数据的socket会进入eventpoll的就绪队列,然后epoll会把自己的等待队列中的进程唤醒变为就绪态争夺CPU,并把就绪队列拷贝到用户调用epoll_wait时传入的指针所指数组中,此时读写绝对是有数据的不会阻塞) -
等待队列是当前epoll监听的所有epoll_event皆没有触发,就会把当前进程压进epoll的等待队列中(图4,注意epoll自已会被压入所有监听的socket等待队列) -
epoll_create传入的参数决定能装多少个socket,或者说事件列表能装多少个epoll_event(但1个socket读与写是两个epoll_event),返回文件标识符 -
epoll_ctl是对标记epfd的epoll中的标记fd的socket进行增/删/改操作,对增与改还要配置新的epoll_event,返回值无用 其中,epoll_event包括感兴趣事件(读/写/挂断/边缘触发/水平触发等的事件组合)和此epoll_event的data(如socket的fd) -
epoll_wait是访问标记epfd的epoll中的最多maxevents个epoll_event(数组指针传入),等待时时timeout,返回值是就绪列表的长度,就绪列表是调用epoll_wait时用户传入的 -
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。 -
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。 -
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描。而epoll提供了三个函 数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
- epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?
-
对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定 EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝 一次。 -
对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在 epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调 函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。 -
对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子, 在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
- 收发数据的在用户态的内核态之间传递,select的bitmap在用户态的内核态之间传递,它们都是是指针传递,然后内核都会把数据拷贝一份在内核空间,只不过bitmap最后的结果返回是在原bitmap上体现的
- 显然这个bitmap,每次select都要拷贝两次且上一次select的readset无效了需要重新赋值,而且select完后还要遍历一次1024个bit,有1又要拷一次fd
- 话说linuxC中select返回的那个int没什么意思,它是readset/writeset/exceptset三个set中做好准备的文件描述符个数(不排除一个fd三个set都有1)
总结:
-
select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用 epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在 epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的 时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间,这就是回调机制带来的性能提升。 -
select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要 一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等待队列),这也能节省不少的开销。 -
这三种IO多路复用模型在不同的平台有着不同的支持,而epoll在windows下就不支持,好在我们有selectors模块,帮我们默认选择当前平台下最合适的,我们只需要写监听谁,然后怎么发送消息接收消息,但是具体怎么监听的,选择的是select还是poll还是epoll,这是selector帮我们自动选择的。
信号驱动模型
比喻
D同学让舍管阿姨等有水的时候通知他(注册信号函数),没多久D同学得知有水了,跑去装水。它还是同步IO(省不了装水的时间)。
场景
- 应用进程建立SIGIO信号处理程序,进行sigaction系统调用并马上返回,继续执行进程其他操作
- 内核等待数据准备好了,就递交SIGIO给信号处理程序
- 应用进程收到信号后,进行recvfrom系统调用,然后阻塞(数据从内核复制到应用缓冲区期间进程阻塞)
- 内核执行系统调用,将数据从内核复制到用户空间,完成后返回成功指示
- 应用进程收到成功指示后,处理数据报
应用
对于 socket 产生 SIGIO的条件:
- TCP套接字:
1.监听套接字上有新连接请求完成 2.某个断连请求发起 3.某个断连请求完成 4.数据到达套接字 5.数据已从套接字发送走(输出缓冲区有空闲空间) 6.发生某个异步错误 - UDP套接字:
1.数据报到达套接字 2.套接字上发生异步错误
代码
#ifndef _NET_INFO_H
#define _NET_INFO_H
#define RCV_LEN 1024
#define LISTEN_PORT 9999
#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <time.h>
#include <string.h>
#include <sys/select.h>
#include <sys/time.h>
#include <errno.h>
#include <signal.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <net/if.h>
#include <net/if_arp.h>
#include <sys/ioctl.h>
#include <sys/ioctl.h>
#include "net_info.h"
static void sigio_handler(int signo);
void sigio_socket_init(int skfd);
char recvBuf[RCV_LEN];
int listenfd;
int create_socket(){
int skfd = socket(AF_INET, SOCK_DGRAM, 0);
if (skfd < 0)return -1;
struct sockaddr_in saddr;
bzero(&saddr, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(LISTEN_PORT);
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(skfd, (struct sockaddr*)&saddr, sizeof(saddr)) < 0)return -1;
return skfd;
}
int main(){
int run_cnt = 0;
listenfd = create_socket();
if (listenfd < 0){
perror("prepare error");
exit(-1);
}
sigio_socket_init(listenfd);
while(1){
printf("run_cnt=%d\n",run_cnt++);
sleep(1);
}
return 0;
}
void sigio_socket_init(int skfd){
int ret = 0;
signal(SIGIO, sigio_handler);
ret = fcntl(skfd, F_SETOWN, getpid());
if (ret < 0){
perror("fcntl error");
exit(-1);
}
int on = 1;
ret = ioctl(skfd, FIOASYNC, &on);
if (ret < 0){
perror("ioctl error");
exit(-1);
}
}
static void sigio_handler(int signo){
if (signo != SIGIO)return;
printf("recv SIGIO(%d) from kernel\n",signo);
int i = 0;
for(i = 0; i < 3; i++){
int ret = recvfrom(listenfd, recvBuf, sizeof(recvBuf),0, NULL,NULL);
if (ret < 0){
if (errno == EWOULDBLOCK)
return ;
else{
perror("recvfrom error");
exit(-1);
}
}
printf("recv = %s\n",recvBuf);
}
}
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <time.h>
#include <string.h>
#include <sys/select.h>
#include <sys/time.h>
#include <errno.h>
#include <signal.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <net/if.h>
#include <net/if_arp.h>
#include <sys/ioctl.h>
#include <sys/ioctl.h>
#include "net_info.h"
int main(){
int skfd;
skfd = socket(AF_INET, SOCK_DGRAM, 0);
if (skfd < 0){
perror("socket error");
exit(-1);
}
int ret;
time_t tm;
struct sockaddr_in desAddr;
bzero(&desAddr, sizeof(desAddr));
desAddr.sin_family = AF_INET;
desAddr.sin_port = htons(LISTEN_PORT);
desAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
while (1){
time(&tm);
ret = sendto(skfd, ctime(&tm), strlen(ctime(&tm)), 0,
(struct sockaddr*)&desAddr, sizeof(desAddr));
if (ret < 0){
perror("");
exit(-1);
}
sleep(2);
}
return 0;
}
异步IO模型
前面四种模型皆为同步,因数据从内核到用户空间的复制时间不可省
比喻
E同学让舍管阿姨将杯子装满水后通知他,整个过程E同学都可以做别的事情(没有recv),这是真正的异步IO。
场景
- 应用进程进行aio_read系统调用并马上返回,应用进程可以执行其他操作
- 内核等待数据准备好,然后复制数据到用户空间,最后递交在aio_rad中指定的信号
- 应用进程收到信号后,处理数据报
应用
import asyncore,sys
class asyncoreClient(asyncore.dispatcher):
def __init__(self,host):
asyncore.dispatcher.__init__(self)
self.create_socket()
self.connect((host,6666))
self.buffer=b'123abc'
def handle_connect(self):
print("connect server succeed")
def handle_close(self):
print("connect close")
self.close()
def readable(self):
return True
def handle_read(self):
read_data=self.recv(1024).decode()
print(read_data)
self.buffer+=read_data[0:1].encode()
def writable(self):
return (len(self.buffer)>0)
def handle_write(self):
sent=self.send(self.buffer)
self.buffer=self.buffer[sent:]
if __name__=='__main__':
asyncoreClient('127.0.0.1')
asyncore.loop()
import asyncore,sys
class asyncoreClient(asyncore.dispatcher):
def __init__(self,host):
asyncore.dispatcher.__init__(self)
self.create_socket()
self.connect((host,6666))
self.buffer=b'123abc'
def handle_connect(self):
print("connect server succeed")
def handle_close(self):
print("connect close")
self.close()
def readable(self):
return True
def handle_read(self):
read_data=self.recv(1024).decode()
print(read_data)
self.buffer+=read_data[0:1].encode()
def writable(self):
return (len(self.buffer)>0)
def handle_write(self):
sent=self.send(self.buffer)
self.buffer=self.buffer[sent:]
if __name__=='__main__':
asyncoreClient('127.0.0.1')
asyncore.loop()
import asyncore,asynchat
class ChatServer(asyncore.dispatcher):
def __init__(self,port):
asyncore.dispatcher.__init__(self)
self.create_socket()
self.set_reuse_addr()
self.bind(('127.0.0.1',port))
self.listen(5)
self.users={}
self.main_room=ChatRoom(self)
def handle_accepted(self,sock,addr):
print('socket:',sock,'addr:',addr)
ChatSession(self,sock)
class ChatSession(asynchat.async_chat):
def __init__(self,server,sock):
asynchat.async_chat.__init__(self,sock)
self.server=server
self.set_terminator(b'\n')
self.data=[]
self.name=None
self.enter(LoginRoom(self.server))
def enter(self,room):
try:
cur=self.room
except AttributeError:
pass
else:
cur.remove(self)
self.room=room
room.add(self)
def collect_incoming_data(self, data):
self.data.append(data.decode())
def found_terminator(self):
line=''.join(self.data)
self.data=[]
self.room.handle(self,line.encode())
def handle_closed(self):
asynchat.async_chat.handle_closed(self)
class Room:
def handle(self,session,line):
line=line.decode()
if not line.strip():
return
parts=line.split(' ',1)
cmd=parts[0]
try:
line=parts[1].strip()
except IndexError:
line=''
method=getattr(self,'do_'+cmd,None)
method(session,line)
def __init__(self,server):
self.server=server
self.sessions=[]
def add(self,session):
self.sessions.append(session)
def remove(self,session):
self.sessions.remove(session)
def broadcast(self,line):
for session in self.sessions:
session.push(line)
def do_logout(self,session,line):
self.remove(session)
del self.server.users[session.name]
class ChatRoom(Room):
def add(self,session):
session.push(b'login success')
self.broadcast((session.name+' has entered the room. \n').encode())
self.server.users[session.name]=session
Room.add(self,session)
def remove(self,session):
Room.remove(self,session)
self.broadcast((session.name+' has left the room.\n').encode())
def do_say(self,session,line):
self.broadcast((session.name+':'+line+'\n').encode())
def do_look(self,session,line):
session.push(b'Online Users:\n')
for user in self.sessions:
session.push((user.name+'\n').encode())
class LoginRoom(Room):
def add(self,session):
Room.add(self,session)
session.push(b'Connect Success')
def do_login(self,session,line):
name=line.strip()
if not name:
session.push(b'Username empty')
elif name in self.server.users:
session.push(b'Username exists')
else:
session.name=name
session.enter(self.server.main_room)
if __name__=='__main__':
print('server will run on 127.0.0.1:6666')
ChatServer(6666)
asyncore.loop()
import socket
import threading
client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1',6666))
def receive(client):
while True:
data=client.recv(1024)
if data:
print(data.decode())
def send(client):
while True:
to_data=input()
client.send(to_data.encode())
threading.Thread(target=receive,args=(client,)).start()
threading.Thread(target=send,args=(client,)).start()
import asyncore
class EchoHanlder(asyncore.dispatcher):
def handle_read(self):
data=self.recv(1024)
print('get data:'+data.decode())
if data:
self.send(data)
class EchoServer(asyncore.dispatcher):
def __init__(self,host,port):
asyncore.dispatcher.__init__(self)
self.create_socket()
self.set_reuse_addr()
self.bind((host,port))
self.listen(5)
def handle_accepted(self,sock,addr):
EchoHanlder(sock)
if __name__=='__main__':
EchoServer('localhost',6666)
asyncore.loop()
asyncore框架概念
asyncore模块是Python自带的一个原生模块,
提供简单的API以实现异步socket通信,
并且为我们提供了异步socket服务器端和客户端的基础架构
在使用asyncore框架时,我们需要注意两个东西:
一、全局函数loop:创建asyncore事件循环,
在事件循环中调用底层的select方法来检测特定网络信道,
如果信道对应的socket对象状态发生改变,则自动产生一个高层次的事件信息,
然后针对该信息调用相应的回调方法进行处理。
二、基类dispatcher:一个底层的socket类的封装对象,
编程中继承于dispatchero在或其子类,
dispatchero在里面已经定义好了socket通信中的各种事件,
我们只需要重写特定的事件即可在该事件发生时实现自动回调处理。
为什么作为socket server的时候,服务器端的连接会自动断开?
因为socket连接是用完即断,
所以为了保持通信服务端必须在socket连接建立好后,
立即创建一个新的dispatcher实例来保存这个socket连接对象,
否则socket连接会被服务器端自动断开
补充
' '与str与b' '.decode(),代表字符串对象,用于python编程
' '.encode()与str.encode()与b' ',代表bytes对象,用于网络传输
asyncore聊天室实战
主要知识点:
asyncore作为服务器端的主要用法
async_chat模块的使用
pySimpleGUI界面框架
telnetlib作为客户端socket模块
如何设计一个聊天室的应用?
必要条件:服务器端,多客户端
必要约束:数据传输协议——以换行符为消息分隔符
原理:服务器监听消息来源,客户端连接服务器并发送消息到服务器
async_chat模块介绍:是dispatcher这个类的抽象子类,
定义了一些专门用于数据传输方面的方法,非常有利于实现数据传输
主要方法:
collect_incoming_data:接收数据
found_terminator:当输入数据流符合由set_terminator设置的终止条件时被调用
set_terminator:设置终止条件
push:向数据传输队列压入要传输的数据
|