IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> 网络编程学习笔记 -> 正文阅读

[系统运维]网络编程学习笔记

半包粘包

什么是半包与粘包问题?

粘包问题是指当发送两条消息时,比如发送了 ABC 和 DEF,但另一端接收到的却是 ABCD,像这种一次性读取了两条数据的情况就叫做粘包(正常情况应该是一条一条读取)。
半包问题是指,当发送的消息是 ABC 时,另一端却接收到的是 AB 和 C 两条信息,像这种情况就叫做半包。

package socket;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

/**
 * 客户端(只负责发送消息)
 */
public class ClientSocket {
    public static void main(String[] args) throws IOException {
        // 创建 Socket 客户端并尝试连接服务器端
        Socket socket = new Socket("127.0.0.1", 9999);
        // 发送的消息内容
        final String message = "Hi,Java.";
        // 使用输出流发送消息
        try (OutputStream outputStream = socket.getOutputStream()) {
            // 给服务器端发送 10 次消息
            for (int i = 0; i < 10; i++) {
                // 发送消息
                outputStream.write(message.getBytes());
            }
        }
    }
}
package socket;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * 服务器端(只负责接收消息)
 */
public class ServeSocket {
    // 字节数组的长度
    private static final int BYTE_LENGTH = 20;
    public static void main(String[] args) throws IOException {
        // 创建 Socket 服务器
        ServerSocket serverSocket = new ServerSocket(9999);
        // 获取客户端连接
        Socket clientSocket = serverSocket.accept();
        // 得到客户端发送的流对象
        try (InputStream inputStream = clientSocket.getInputStream()) {
            for (int i = 0; i < 10; i++) {
                // 循环获取客户端发送的信息
                byte[] bytes = new byte[BYTE_LENGTH];
                // 读取客户端发送的信息
                int count = inputStream.read(bytes, 0, BYTE_LENGTH);
                if (count > 0) {
                    // 成功接收到有效消息并打印
                    System.out.println("接收到客户端的信息是:" + new String(bytes));
                }
                count = 0;
            }
        }
    }
}

为什么会有粘包和半包问题?

因为 TCP 是面向连接的传输协议,TCP 传输的数据是以流的形式,
而流数据是没有明确的开始结尾边界,所以 TCP 也没办法判断哪一段流属于一个消息。
粘包的主要原因:
发送方每次写入数据 < 套接字(Socket)缓冲区大小;
接收方读取套接字(Socket)缓冲区数据不够及时。
半包的主要原因:
发送方每次写入数据 > 套接字(Socket)缓冲区大小;
发送的数据大于协议MTU (Maximum Transmission Unit最大传输单元),因此必须拆包。

怎么做可以解决粘包和半包问题?

  1. 发送方和接收方规定固定大小的缓冲区,也就是发送和接收都使用固定大小的byte[]数组长度,当字符长度不够时使用空字符弥补;
package socket;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

/**
 * 客户端,改进版一(只负责接收消息)
 */
public class ClientSocketV1 {
    private static final int BYTE_LENGTH = 1024;  // 字节长度
    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("127.0.0.1", 9091);
        final String message = "Hi,Java."; // 发送消息
        try (OutputStream outputStream = socket.getOutputStream()) {
            // 将数据组装成定长字节数组
            byte[] bytes = new byte[BYTE_LENGTH];
            int idx = 0;
            for (byte b : message.getBytes()) {
                bytes[idx] = b;
                idx++;
            }
            // 给服务器端发送 10 次消息
            for (int i = 0; i < 10; i++) {
                outputStream.write(bytes, 0, BYTE_LENGTH);
            }
        }
    }
}
package socket;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class ServeSocketV1 {
    private static final int BYTE_LENGTH = 1024;  // 字节数组长度(收消息用)
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(9091);
        // 获取到连接
        Socket clientSocket = serverSocket.accept();
        try (InputStream inputStream = clientSocket.getInputStream()) {
            for (int i = 0; i < 10; i++) {
                byte[] bytes = new byte[BYTE_LENGTH];
                // 读取客户端发送的信息
                int count = inputStream.read(bytes, 0, BYTE_LENGTH);
                if (count > 0) {
                    // 接收到消息打印
                    System.out.println("接收到客户端的信息是:" + new String(bytes).trim());
                }
                count = 0;
            }
        }
    }
}

  1. 在 TCP 协议的基础上封装一层数据请求协议,既将数据包封装成数据头(存储数据正文大小)+ 数据正文的形式,这样在服务端就可以知道每个数据包的具体长度了,知道了发送数据的具体边界之后,就可以解决半包和粘包的问题了;
# server
import datetime
import json
import socket
import struct
import asyncore
import threading
import ctypes
import time

ESME_ROK = 0x00000000
bind_transceiver = 0x00000009
bind_transceiver_resp = 0x80000009
enquire_link = 0x00000015
enquire_link_resp = 0x80000015
submit_sm = 0x00000004
submit_sm_resp = 0x80000004


class MyEncoder(json.JSONEncoder):  # 用于json读写时的解码
    def default(self, obj):
        if isinstance(obj, bytes):
            return str(obj, encoding='utf-8')
        return json.JSONEncoder.default(self, obj)


class SMPPSocket(asyncore.dispatcher):  # 每生成一个socket就实例化一个对象
    def __init__(self, sock, usr='admin', pwd='abcdef'):
        super(SMPPSocket, self).__init__(sock)
        self.send_buff = b''
        self.read_buff = b''
        self.read_flag = True
        self.usr = usr
        self.pwd = pwd
        self.SMPP_DIC = {}
        # self.setblocking(flag=False)  # asyncore是异步IO模型,必定为非阻塞,故无需设置也不能设置

    def auto_read(self, str_name, str_type, idx):  # 对self.read_buff解码存入self.SMPP_DIC
        if str_type == '':
            temp = b''
            while struct.unpack('!s', self.read_buff[idx:idx + 1])[0] != b'\x00':
                temp += struct.unpack('!s', self.read_buff[idx:idx + 1])[0]
                idx += 1
            self.SMPP_DIC[str_name] = temp.decode()
            return idx + 1
        if str_type == '!B':
            self.SMPP_DIC[str_name] = struct.unpack(str_type, self.read_buff[idx:idx + 1])[0]
            return idx + 1
        if str_type == 'msg':
            self.SMPP_DIC[str_name] = (struct.unpack('!'+str(self.SMPP_DIC['sm_length'])+'s', self.read_buff[idx:idx + self.SMPP_DIC['sm_length']])[0]).decode()
            return idx+1

    def msg_dic(self, type_list, name_list):  # 把self.SMPP_DIC字段转换成数据库字段
        idxs = 16
        for i in range(len(type_list)):
            idxs = self.auto_read(name_list[i], type_list[i], idxs)
        new_dic = {'MSISDN': self.SMPP_DIC['Destination_addr'],
                   'MsgContent': self.SMPP_DIC['short_message'],
                   'Sender': self.SMPP_DIC['source_addr'],
                   'DateCoding': self.SMPP_DIC['data_coding'],
                   'PriorityFlag': self.SMPP_DIC['priority_flag'],
                   'RegisteredDelivery': self.SMPP_DIC['registered_delivery'],
                   'DestaddrNpi': self.SMPP_DIC['dest_addr_npi'],
                   'DestaddrTon': self.SMPP_DIC['dest_addr_ton'],
                   'SourceaddrNpi': self.SMPP_DIC['source_addr_npi'],
                   'SourceaddrTon': self.SMPP_DIC['source_addr_ton'],
                   'ServiceType': self.SMPP_DIC['service_type'],
                   'ScheduleDeliveryTime': self.SMPP_DIC['schedule_delivery_time'],
                   'ValidityPeriod': self.SMPP_DIC['validity_period'],
                   'Time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                   'sar_msg_ref_num': '',
                   'sar_total_segments': '',
                   'sar_segment_seqnum': ''}
        return new_dic

    def readable(self) -> bool:  # 同一时间只有一个handle_read在执行
        return self.read_flag  # 测试证明,readable为False时接收到数据会一直存在读缓冲区

    def handle_read(self):  # readable为true且读缓冲区有数据时才会执行
        # 非阻塞IO很容易报错,recv会把内存中的缓存数据写入self.read_buff缓冲区,这里有两种错:
        # 一、消息体的数据由于网络延迟未接收完整,这个recv是不会报错的,但有些数据未读到故下面解包就会报错,,因此上面用判断叠加buff
        # 二、消息体的数据由于网络延迟完全没接收,这个recv就会报错“无法立即完成一个非阻挡性套接字操作”,因此要用try待下次再次接收
        self.read_flag = False
        try:
            if len(self.read_buff) < 16:  # 克服半包问题
                self.read_buff += self.recv(16 - len(self.read_buff))
                self.read_flag = True
                return
            elif len(self.read_buff) < struct.unpack('!I', self.read_buff[0:4])[0]:
                self.read_buff += self.recv(struct.unpack('!I', self.read_buff[0:4])[0] - len(self.read_buff))
                if len(self.read_buff) < struct.unpack('!I', self.read_buff[0:4])[0]:
                    self.read_flag = True
                    return
        except BlockingIOError:
            self.read_flag = True
            return
        command_length = struct.unpack('!I', self.read_buff[0:4])[0]
        command_id = struct.unpack('!I', self.read_buff[4:8])[0]
        # command_status = struct.unpack('!I', self.read_buff[8:12])[0]  # 三个消息都没有用到这个字段
        sequence_number = struct.unpack('!I', self.read_buff[12:16])[0]
        print('get new message whose command_id and len are ', command_id, command_length)
        if command_id == bind_transceiver:  # 账密验证
            print('bind_transceiver')
            i = 16
            username = b''
            while struct.unpack('!s', self.read_buff[i:i + 1])[0] != b'\x00':
                username += struct.unpack('!s', self.read_buff[i:i + 1])[0]
                i += 1
            i += 1
            password = b''
            while struct.unpack('!s', self.read_buff[i:i + 1])[0] != b'\x00':
                password += struct.unpack('!s', self.read_buff[i:i + 1])[0]
                i += 1
            typist = ['!I', '!I', '!I', '!I', '!' + str(len(username)+1) + 's']
            values = [16 + len(username) + 1, bind_transceiver_resp, 0x00000000, sequence_number, username + b'\x00']
            if username.decode() != self.usr or password.decode() != self.pwd:
                values[2] = 0x0000000E
                print('username and password are wrong')
            else:
                print('username and password are right')
            for i in range(len(typist)):
                self.send_buff += struct.pack(typist[i], values[i])
        elif command_id == enquire_link:  # 心跳,不解析
            print('enquire_link')
            typist = ['!I', '!I', '!I', '!I']
            values = [16, enquire_link_resp, ESME_ROK, sequence_number]
            for i in range(len(typist)):
                self.send_buff += struct.pack(typist[i], values[i])
        elif command_id == submit_sm:  # 短信,解析并返回字典
            print('submit_sm')
            type_list = ['', '!B', '!B', '', '!B', '!B', '', '!B', '!B', '!B', '', '', '!B', '!B', '!B', '!B', '!B', 'msg']
            name_list = ['service_type', 'source_addr_ton', 'source_addr_npi', 'source_addr', 'dest_addr_ton',
                         'dest_addr_npi', 'Destination_addr', 'esm_class', 'protocol_id', 'priority_flag',
                         'schedule_delivery_time', 'validity_period', 'registered_delivery', 'replace_if_present_flag',
                         'data_coding', 'sm_default_msg_id', 'sm_length', 'short_message']
            tmp_dic = self.msg_dic(type_list, name_list)
            print(tmp_dic)
            typist = ['!I', '!I', '!I', '!I', '!2s']
            values = [18, submit_sm_resp, 0x00000000, sequence_number, '7'.encode() + bytes(0)]
            for i in range(len(typist)):
                self.send_buff += struct.pack(typist[i], values[i])
        else:  # 未定义消息,尝试把消息全部接收掉
            try:
                tmp = self.recv(1024)
            except BlockingIOError:
                pass
        self.read_buff = b''
        self.read_flag = True

    def writable(self) -> bool:
        return len(self.send_buff) > 0

    def handle_write(self) -> None:
        sent = self.send(self.send_buff)
        self.send_buff = self.send_buff[sent:]

    def handle_close(self) -> None:
        self.close()


class SMPPReactor(asyncore.dispatcher):
    def __init__(self, host, port, usr='admin', pwd='abcdef'):  # 绑定端口并开始侦听
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((host, port))
        self.listen(5)
        self.handle_list = []
        self.usr = usr
        self.pwd = pwd

    def handle_accepted(self, sock, adds):  # 每次建立连接生成SMPPSocket实例
        print('Incoming connection from %s' % repr(adds))
        self.handle_list.append(SMPPSocket(sock, self.usr, self.pwd))
        print(self.handle_list[-1])

    def handle_close(self) -> None:  # 自我关闭前先把创建出来的socket逐个关闭掉
        for i in range(len(self.handle_list)):
            self.handle_list[i].close()
        self.close()


class SMPPServer(threading.Thread):

    cnt = 0

    def __init__(self, ip, port, usr='admin', pwd='abcdef'):
        super(SMPPServer, self).__init__()
        self.ip = ip
        self.port = int(port)
        self.smpp = None
        self.thread = None
        self.usr = usr
        self.pwd = pwd

    def start(self):  # 传入ip与port给Reactor进行accept,并开loop子线程
        self.smpp = SMPPReactor(self.ip, self.port, self.usr, self.pwd)
        SMPPServer.cnt += 1
        print('SMPP', SMPPServer.cnt, 'start success')
        if SMPPServer.cnt == 1:
            self.thread = threading.Thread(target=asyncore.loop, kwargs={'use_poll': True})
            self.thread.start()

    def stop(self):  # 关闭Reactor与loop线程
        self.smpp.handle_close()
        print('SMPP', SMPPServer.cnt, 'stop success')
        SMPPServer.cnt -= 1
        # if SMPPServer.cnt == 0:  # asyncore.loop写在子线程里的话,当所有socket关闭就会自动退出,但是自己在主函数写会一直阻塞
        #     ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.thread.ident), ctypes.py_object(SystemExit))


smppserver1 = SMPPServer('127.0.0.1', 10703, 'admin', 'abcdef')
smppserver1.start()
time.sleep(10)
smppserver1.stop()
# client
import asyncore
import socket
import struct

ESME_ROK = 0x00000000
bind_transceiver = 0x00000009
bind_transceiver_resp = 0x80000009
enquire_link = 0x00000015
enquire_link_resp = 0x80000015
submit_sm = 0x00000004
submit_sm_resp = 0x80000004


class HTTPClient(asyncore.dispatcher):
    def __init__(self, host):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect((host, 10703))
        self.buffer = b''  # 测试发送三种消息

        typist = ['!I', '!I', '!I', '!I', '!5s', '!7s', '!13s', '!B', '!B', '!B', '!2s']
        values = [46, bind_transceiver, 0, 4, b'root' + bytes(0), b'abcdba' + bytes(0), b'1234567890123', 1, 2, 3, b'2' + bytes(0)]
        for i in range(len(typist)):
            self.buffer += struct.pack(typist[i], values[i])

        typist = ['!I', '!I', '!I', '!I']
        values = [16, enquire_link, 0, 4]
        for i in range(len(typist)):
            self.buffer += struct.pack(typist[i], values[i])

        typist = ['!I', '!I', '!I', '!I', '!2s', '!B', '!B', '!2s', '!B', '!B', '!2s', '!B', '!B', '!B', '!s', '!s',
                  '!B', '!B', '!B', '!B', '!B', '!2s']
        values = [38, submit_sm, 0, 4, b'1' + bytes(0), 0, 0, b'1' + bytes(0), 0, 0, b'1' + bytes(0), 0, 0, 0, bytes(0),
                  bytes(0), 0, 0, 0, 0, 0, b'1' + bytes(0)]
        for i in range(len(typist)):
            self.buffer += struct.pack(typist[i], values[i])

    def handle_connect(self):
        print('connect success')

    def handle_close(self):
        self.close()
        print('close success')

    def readable(self) -> bool:
        return True

    def handle_read(self):
        print(self.recv(8192))

    def writable(self):
        return len(self.buffer) > 0

    def handle_write(self):
        import time
        sent = self.send(self.buffer)
        self.buffer = self.buffer[sent:]


client = HTTPClient('127.0.0.1')
asyncore.loop()

  1. 以特殊的字符结尾,比如以“\n”结尾,这样我们就知道结束字符,从而避免了半包和粘包问题(推荐解决方案)。
package socket;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.Socket;

/**
 * 客户端,改进版三(只负责发送消息)
 */
public class ClientSocketV3 {
    public static void main(String[] args) throws IOException {
        // 启动 Socket 并尝试连接服务器
        Socket socket = new Socket("127.0.0.1", 9092);
        final String message = "Hi,Java."; // 发送消息
        try (BufferedWriter bufferedWriter = new BufferedWriter(
                new OutputStreamWriter(socket.getOutputStream()))) {
            // 给服务器端发送 10 次消息
            for (int i = 0; i < 10; i++) {
                // 注意:结尾的 \n 不能省略,它表示按行写入
                bufferedWriter.write(message + "\n");
                // 刷新缓冲区(此步骤不能省略)
                bufferedWriter.flush();
            }
        }
    }
}
package socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 服务器端,改进版三(只负责收消息)
 */
public class ServeSocketV3 {
    public static void main(String[] args) throws IOException {
        // 创建 Socket 服务器端
        ServerSocket serverSocket = new ServerSocket(9092);
        // 获取客户端连接
        Socket clientSocket = serverSocket.accept();
        // 使用线程池处理更多的客户端
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150,
                100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));
        threadPool.submit(() -> {
            // 消息处理
            processMessage(clientSocket);
        });
    }
    private static void processMessage(Socket clientSocket) {
        // 获取客户端发送的消息流对象
        try (BufferedReader bufferedReader = new BufferedReader(
                new InputStreamReader(clientSocket.getInputStream()))) {
            for (int i = 0; i < 10; i++) {
                // 按行读取客户端发送的消息
                String msg = bufferedReader.readLine();
                if (msg != null) {
                    // 成功接收到客户端的消息并打印
                    System.out.println("接收到客户端的信息:" + msg);
                }
            }
        } catch (IOException ioException) {
            ioException.printStackTrace();
        }
    }
}

阻塞模型

在这里插入图片描述

比喻

A同学用杯子装水,打开水龙头装满水然后离开,因为如果水龙头没有水,他也要等到有水并装满杯子才能离开去做别的事情(反正就是要装满水)。

场景

  1. 应用进程recvfrom执行系统调用,然后阻塞进入等待队列
  2. 内核收到系统调用,此时无数据报准备好,等待数据
  3. 数据报准备好了,内核就将数据从内核复制到用户空间
  4. 内核复制完成后返回成功指示
  5. 应用进程被唤醒,进入就绪队列,等待CPU分片处理数据报

函数使用

  1. 输入操作:read、readv、recv、recvfrom、recvmsg共5个函数,如果设置为阻塞状态,则会经过wait data和copy data两个阶段,如果设置为非阻塞则在wait 不到data时抛出异常
  2. 输出操作:write、writev、send、sendto、sendmsg共5个函数,在发送缓冲区满了会阻塞在原地,如果设置为非阻塞,则会抛出异常
  3. 接收外来链接:accept,与输入操作类似
  4. 发起外出链接:connect,与输出操作类似

read与recv

  • read:数据在不超过指定的长度的时候有多少读多少,没有数据则会一直等待。所以一般情况下:我们读取数据都需要采用循环读的方式读取数据,因为一次read 完毕不能保证读到我们需要长度的数据,read 完一次需要判断读到的数据长度再决定是否还需要再次
  • recv:recv(sockfd, buff, buff_size, MSG_WAITALL)中有一个MSG_WAITALL 的参数,正常情况下recv 是会等待直到读取到buff_size 长度的数据,但是这里的WAITALL 也只是尽量读全,在有中断的情况下recv 还是可能会被打断,造成没有读完指定的buff_size的长度。所以即使是采用recv + WAITALL 参数还是要考虑是否需要循环读取的问题,在实验中对于多数情况下recv (使用了MSG_WAITALL)还是可以读完buff_size,所以相应的性能会比直接read 进行循环读要好一些。

问题与改进

问题一:实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
改进一:在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。
问题二:开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。
改进二:很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。
问题三:“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。
改进三:使用非阻塞接口

阻塞代码

# block_server
import socket
server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind(('127.0.0.1',6666))
server.listen()
sock,addr=server.accept()
print('连接成功,recv阻塞等待收到数据')
data=sock.recv(1024).decode()
print('收到数据{},结束程序'.format(data))
sock.send(b'hello'+data.encode())
server.close()
sock.close()

# block_client
import socket
client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1',6666))
input_str=input()
client.send(input_str.encode())
print(client.recv(1024).decode())
client.close()

多客户端(多线程)代码

在这里插入图片描述

非阻塞模型

在这里插入图片描述

比喻

B同学也用杯子装水,打开水龙头后发现没有水,它离开了,过一会他又拿着杯子来看看。在中间离开的这些时间里,B同学离开了装水现场,可以做他自己的事情,这就是非阻塞IO模型。但是它只有是检查无数据的时候是非阻塞的,在数据到达的时候依然要等待复制数据到用户空间(等着水将水杯装满),因此它还是同步IO。

场景

  1. 应用进程recvfrom执行系统调用(不是默认参数),如果内核无数据报准备好则马上返回EWOULDBLOCK
  2. 应用进程反复调用recvfrom等待返回成功指示(轮询),期间是非阻塞的可以有其他操作
  3. 内核这边就在等待数据准备好(从网卡接收或从磁盘读取到内核空间),准备好了就将数据从内核复制到用户空间
  4. 在复制的期间,由于没有返回EWOULDBLOCK,所以应用进程阻塞
  5. 应用进程收到成功指示,#处理数据报

代码

# server
import time
import socket
server = socket.socket()
server.bind(('127.0.0.1', 8083))
server.listen(5)
server.setblocking(False)  # 设置不阻塞
r_list = []  # 存储当前已连接的套接字,每轮循环都遍历套接字读取内容
w_list = {}  # key是准备好要发送的套接字,val是发送的内容
del_rlist = []  # 存储将要删除的conn连接
del_wlist = []  # 存储将要删除的conn连接
while 1:
    try:
        conn, addr = server.accept()  # 因为现在设置了不阻塞,所以会报错(默认是阻塞在这一直等)
        r_list.append(conn)  # 把连接保存起来,不然下次循环的时候,上一次的连接就没有了
    except BlockingIOError:  # 报错表示当前没有新客户端请求连接
        time.sleep(1)  # 这一秒等同于去做其他事
        print('这一秒在做其他的事情')
        print('rlist: ', len(r_list))  # 看一下有多少个连接等待被遍历读出
        for conn in r_list:  # 遍历读列表,依次取出套接字读取内容
            try:
                data = conn.recv(1024)  # 因为设置了不阻塞,所以如果没有数据读到就会报错
                if not data:  # 当一个客户端暴力关闭的时候,会一直接收b'',别忘了判断一下数据
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list[conn] = data.upper()  # 把收到的内容改成大写并与本socket压进待发送的字典中
            except BlockingIOError:  # 没有数据就继续下一个套接字
                continue
            except ConnectionResetError:  # 套接字出异常则关闭,并加入删除列表,等待被清除
                conn.close()
                del_rlist.append(conn)
        for conn in del_rlist:  # 清理无用的套接字,无需再监听它们的IO操作
            r_list.remove(conn)
        del_rlist.clear() # 清空列表中保存的已经删除的内容
        print('wlist: ', len(w_list))  # 看一下有多少个连接等待被遍历发送
        for conn, data in w_list.items():  # 遍历写列表,依次取出套接字发送内容
            try:
                conn.send(data)
                del_wlist.append(conn)  # 发送完了,这个连接压进待删列表
            except BlockingIOError:  # 不能马上发出就报错(可能是因为对面的读缓冲区满了)
                continue
        for conn in del_wlist:
            w_list.pop(conn)
        del_wlist.clear()

# 为什么要设置两个del_xxx的列表呢?
# 因为如果遍历时直接对r_list与w_list的元素做删除操作,会报错RuntimeError: dictionary changed size during iteration

# client
import os
import time
import socket
import threading
client = socket.socket()
client.connect(('127.0.0.1', 8083))
while 1:
    client.send('hello'.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode('utf-8'))

# 多线程的客户端请求版本
# def func():
#     sk = socket.socket()
#     sk.connect(('127.0.0.1',9000))
#     sk.send(b'hello')
#     time.sleep(1)
#     print(sk.recv(1024))
#     sk.close()
# for i in range(20):
#     threading.Thread(target=func).start()

缺点

  1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
  2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

IO多路复用

在这里插入图片描述

比喻

在调用recv前先调用select或者poll,这2个系统调用都可以在内核准备好数据(网络数据到达内核)时告知用户进程,这个时候再调用recv一定是有数据的。因此这一过程中它是阻塞于select或poll,而没有阻塞于recv,而非阻塞IO定义在是读写操作时没有阻塞于系统调用的IO操作(不包括数据从内核复制到用户空间时的阻塞,因为这相对于网络IO来说确实很短暂),如果按这样理解,这种IO模型也能称之为非阻塞IO模型,但是按POSIX来看,它也是同步IO,故称之为同步非阻塞IO。

这种IO模型比较特别,分个段。因为它能同时监听多个文件描述符(fd)。这个时候C同学来装水,发现有一排水龙头,舍管阿姨告诉他这些水龙头都还没有水,等有水了告诉他。于是等啊等(select调用中),过了一会阿姨告诉他有水了,但不知道是哪个水龙头有水,自己看吧。于是C同学一个个打开,往杯子里装水(recv)。这里再顺便说说鼎鼎大名的epoll(高性能的代名词啊),epoll也属于IO复用模型,主要区别在于舍管阿姨会告诉C同学哪几个水龙头有水了,不需要一个个打开看(当然还有其它区别)。

场景

  1. 应用进程select系统调用,等待可能多个套接字中的任一个变为可读(注意这里没有阻塞于系统调用的io操作select,即非阻塞)
  2. 内核等待数据,待数据报准备好后返回可读条件
  3. 应用进程收到可读条件,进行Recvfrom系统调用,并且阻塞
  4. 内核将数据从内核复制到用户空间,返回成功指示
  5. 应用进程收到成功指示,处理数据报

代码

# select_server.py
import socket
import select
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 8093))
server.listen(5)
server.setblocking(False)  # 设置为非阻塞
# 初始化将服务端socket对象加入监听列表,后面还要动态添加一些conn连接对象,当accept的时候sk就有感应,当recv的时候conn就有动静
rlist = [server, ]
rdata = {}  # 存放客户端发送过来的消息
wlist = []  # 等待写对象
wdata = {}  # 存放要返回给客户端的消息
print('预备!监听!!!')
count = 0  # 写着计数用的,为了看实验效果用的,没用
while True:
    # 开始 select 监听,对rlist中的服务端server进行监听,select函数阻塞进程,直到rlist中的套接字被触发
    # (在此例中,套接字接收到客户端发来的握手信号,从而变得可读,满足select函数的“可读”条件),
    # 被触发的(有动静的)套接字(服务器套接字)返回给了rl这个返回值里面;
    rl, wl, xl = select.select(rlist, wlist, [], 0.5)
    print('%s 次数>>' % count, wl)
    count = count + 1
    # 对rl进行循环判断是否有客户端连接进来,当有客户端连接进来时select将触发
    for sock in rl:
        # 判断当前触发的是不是socket对象, 当触发的对象是socket对象时,说明有新客户端accept连接进来了
        if sock == server:
            # 接收客户端的连接, 获取客户端对象和客户端地址信息
            conn, addr = sock.accept()
            # 把新的客户端连接加入到监听列表中,当客户端的连接有接收消息的时候,select将被触发,会知道这个连接有动静,有消息,那么返回给rl这个返回值列表里面。
            rlist.append(conn)
        else:
            # 由于客户端连接进来时socket接收客户端连接请求,将客户端连接加入到了监听列表中(rlist),客户端发送消息的时候这个连接将触发
            # 所以判断是否是客户端连接对象触发
            try:
                data = sock.recv(1024)  # 其实select返回的就是有数据要处理的了
                # 收到b''时,我们将这个连接关闭掉(收到b''表示断连),并从监听列表中移除
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                print("received {0} from client {1}".format(data.decode(), sock))
                # 将接受到的客户端的消息保存下来
                rdata[sock] = data.decode()
                # 将客户端连接对象和这个对象接收到的消息加工成返回消息,并添加到wdata这个字典里面
                wdata[sock] = data.upper()
                # 需要给这个客户端回复消息的时候,我们将这个连接添加到wlist写监听列表中
                wlist.append(sock)
            # 如果这个连接出错了,客户端暴力断开了(注意,我还没有接收他的消息,或者接收他的消息的过程中出错了)
            except Exception:
                # 关闭这个连接
                sock.close()
                # 在监听列表中将他移除,因为不管什么原因,它毕竟是断开了,没必要再监听它了
                rlist.remove(sock)
    # 如果现在没有客户端请求连接,也没有客户端发送消息时,开始对发送消息列表进行处理,是否需要发送消息
    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

    # #将一次select监听列表中有接收数据的conn对象所接收到的消息打印一下
    # for k,v in rdata.items():
    #     print(k,'发来的消息是:',v)
    # #清空接收到的消息
    # rdata.clear()


# select_client.py
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8093))
while True:
    msg = input('>>: ').strip()
    if not msg:
        continue
    client.send(msg.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode('utf-8'))
client.close()


# selector_server.py
import socket
import selectors
sel = selectors.DefaultSelector()
def accept(server_fileobj, mask):
    conn, addr = server_fileobj.accept()
    sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
    try:
        data = conn.recv(1024)
        if not data:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper())
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()
server_fileobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_fileobj.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 地址可重用
# 一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用
server_fileobj.bind(('127.0.0.1', 8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False)  # 设置socket的接口为非阻塞
sel.register(server_fileobj, selectors.EVENT_READ,accept)  # select读列表append文件句柄server_fileobj并绑定回调函数accept
while True:
    events = sel.select()  # 检测所有的fileobj,是否有完成wait data的
    for sel_obj, mask in events:
        callback = sel_obj.data  # callback这里就是回调函数(上面的accept或read)
        callback(sel_obj.fileobj, mask)  # accpet(server_fileobj,selectors.EVENT_READ)


# selector_client.py
import socket
c = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c.connect(('127.0.0.1', 8088))
while True:
    msg = input('>>: ')
    if not msg:
        continue
    c.send(msg.encode('utf-8'))
    data = c.recv(1024)
    print(data.decode('utf-8'))


# epoll_demo.py
#!/usr/bin/env python
import select
import socket  # 假设socket中有epoll类
response = b''
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
serversocket.setblocking(0)  # 因为socket默认是阻塞的,所以需要使用非阻塞(异步)模式。
# 为什么一定是非阻塞?若设为阻塞,你现在要发2000B,发了1000B发现写缓冲区满了(因为对端读缓冲区满了),你就会卡死
epoll = select.epoll()  # 创建一个epoll对象
epoll.register(serversocket.fileno(), select.EPOLLIN)  # 在服务端socket上面注册对读event的关注。一个读event随时会触发服务端socket去接收一个socket连接
try:  # 字典connections映射文件描述符(整数)到其相应的网络连接对象
    connections = {}
    requests = {}
    responses = {}
    while True:  # 查询epoll对象,看是否有任何关注的event被触发。参数“1”表示,我们会等待1秒来看是否有event发生
        # 如果有任何我们感兴趣的event发生在这次查询之前,这个查询就会带着这些event的列表立即返回
        events = epoll.poll(1)  # event作为一个序列(fileno,event code)的元组返回
        for fileno, event in events:  # fileno是文件描述符的代名词,始终是一个整数
            if fileno == serversocket.fileno():  # 如果是服务端产生event,表示有一个新的连接进来
                connection, address = serversocket.accept()
                print('client connected:', address)
                connection.setblocking(0)  # 设置新的socket为非阻塞模式
                epoll.register(connection.fileno(), select.EPOLLIN)  # 为新的socket注册对读(EPOLLIN)event的关注
                connections[connection.fileno()] = connection
                requests[connection.fileno()] = b''  # 初始化接收的数据
            elif event & select.EPOLLIN:  # 如果发生一个读event,就读取从客户端发送过来的新数据
                print("------recvdata---------")
                requests[fileno] += connections[fileno].recv(1024)  # 接收客户端发送过来的数据
                if not requests[fileno]:  # 如果客户端退出,关闭客户端连接,取消所有的读和写监听
                    connections[fileno].close()
                    del connections[fileno]  # 删除connections字典中的监听对象
                    del requests[connections[fileno]]  # 删除接收数据字典对应的句柄对象
                    print(connections, requests)
                    epoll.modify(fileno, 0)
                else:  # 一旦完成请求已收到,就注销对读event的关注,注册对写(EPOLLOUT)event的关注。写event发生的时候,会回复数据给客户端
                    epoll.modify(fileno, select.EPOLLOUT)
                    print('-' * 40 + '\n' + requests[fileno].decode())  # 打印完整的请求,证明虽然与客户端的通信是交错进行的,但数据作为整体来组装处理
            elif event & select.EPOLLOUT:  # 如果一个写event在一个客户端socket上面发生,它会接受新的数据以便发送到客户端
                print("--send data--")  # 每次发送一部分响应数据,直到完整的响应数据都已经发送给操作系统等待传输给客户端
                byteswritten = connections[fileno].send(requests[fileno])
                requests[fileno] = requests[fileno][byteswritten:]
                if len(requests[fileno]) == 0:   # 一旦完整的响应数据发送完成,就不再关注写event
                    epoll.modify(fileno, select.EPOLLIN)
            # HUP(挂起)event表明客户端socket已经断开(即关闭),所以服务端也需要关闭
            # 没有必要注册对HUP event的关注。在socket上面,它们总是会被epoll对象注册
            elif event & select.EPOLLHUP:
                print("end hup------")
                epoll.unregister(fileno)  # 注销对此socket连接的关注
                connections[fileno].close()  # 关闭socket连接
                del connections[fileno]
finally:  # 打开的socket连接不需要关闭,因为Python会在程序结束的时候关闭。这里显式关闭是一个好的代码习惯
    epoll.unregister(serversocket.fileno())
    epoll.close()
    serversocket.close()

中断

可编程中断控制器:

在这里插入图片描述

硬中断:与当前正在运行的程序无必然关系,如打印,鼠标,键盘
软中断:与当前正在运行的程序有必然联系,如除零,系统调用中断(向量值0x80),网卡中断(向量值15)
8259A中断控制器可以统一管理所有外部中断信号并根据优先级过筛选后与CPU通信
中断请求寄存器就是当前哪些引脚有中断信号
优先级解析器就是判断哪些中断是优先的保留其标志
正在服务的寄存器是保存如键盘IRQ1这样的标志,如果是可抢占的话可以会被抢占
INTR引脚传入的就是中断触发信号,CPU会利用其他引脚进行数据传输,识别哪类中断
RAM内存分用户空间(用户空间的每个进程有内核态堆栈,进程标识符中有两个指针分别指向两个堆栈)与内核空间(共同的)
IRQ中断程序入口映射表在内核启动阶段完成加载,是统一不变的,只能在内核态访问

缓冲区:

在这里插入图片描述

socket由三部分组成:读缓冲,写缓冲,等待队列
socket.close()时如果读缓冲区有数据就会丢,写缓冲区有数据则会继续发送完

阻塞型IO写:

系统调用的过程:
在这里插入图片描述

用户进程对fd申请write系统调用(向量值0x80是十六进制就是图中128)发送数据,把用户空间的数据从寄存器拷贝到内核空间中
(切换内核态:保存寄存器到进程标识符,改变寄存器(内核等级标记,内核堆栈地址,内核代码地址))
假设现在TCPIP正在使用写缓冲区(即正在发送数据,会自动加锁)
此时用户进程不能写入socket写缓冲区,会阻塞(该进程加入该socket等待队列)
TCPIP使用完后,唤醒等待队列中的第一个进程
进程看写缓冲区大小是否小于要发送的数据大小,假设现在yes
设要发送10B,但写缓冲区仅5B,则先把从用户空间中传过来的数据(当前在内核空间)前5B写入socket写缓冲区,等待网卡发送
等待期间同上阻塞,网卡发送完5B数据后唤醒,再写入后5B,切换回用户态(用户堆栈与等级标记,从进程标记符读寄存器值)

非阻塞IO写:

用户进程对fd申请write系统调用,把用户空间的数据从寄存器拷贝到内核空间中
假设现在TCPIP正在使用写缓冲区(即正在发送数据,会自动加锁)或者写缓冲区已满(可能对端读缓冲区也满了但一直未读)
此时用户进程不能写入socket写缓冲区,直接报错BlockingError不能立即完成一个非阻塞性套接字操作并返回-1,代码实现时要try掉
进程看写缓冲区大小是否小于要发送的数据大小,假设现在yes
设要发送10B,但写缓冲区仅5B,则先把从用户空间中传过来的数据前5B写入socket写缓冲区,并立即返回5,完成
至于后面用户是否还要发后5B,或者说何时发,都由用户决定,反正这里不再阻塞
(同步异步是说用户态与内核态传输是否等待,等就同,不等就异,这里把用户空间写入socket写缓冲区是等的,所以是同步)
(阻塞不阻塞是说网卡收发时是否等待,等就阻,不等就不阻,这里把把用户空间写入socket写缓冲区之后是不再等的,所以非阻)

阻塞型IO读:

bio通信底层原理:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
假设已建立socket连接,并获得socket在内核空间中的fd
用户进程对fd进行系统调用recv接收数据,请求写入自定义缓冲区
用户态转内核态,假设现在内核发现对应socket下读缓冲区为空
把当前进程写入socket的等待队列中,挂起当前进程
网卡收到的报文通过内嵌的DMA把数据写入内存中的网卡缓冲区
DMA写入后,网卡向CPU发起IRQ中断
CPU把寄存器存在当前进程的文件标识符中,用户态转内核态等级标记,用户堆栈转内核堆栈
CPU从中断控制器读到中断向量,查IRQ中断程序入口映射表得到中断程序代码段地址
CPU进入中断开始处理,把网卡缓冲区收到的TCP/IP报文写入对应socket的读缓冲中
socket的读缓冲写入数据后,会激活等待队列中的进程
进程回到工作/就绪队列,等待CPU分配
进程获得CPU,此时是内核态
内核发现对应socket下读缓冲区有数据,把数据拷贝到内核堆栈
内核态转换为用户态,并通过寄存器返回传值给用户态中自定义的缓冲区
(这里 是传值,用户态不能读内核态数据)
(如果自定义的缓冲区大小较小,则需要循环拷贝完,拷贝时是阻塞)
(如果自定义的缓冲区大小较大,则read是有多少读多少,拷贝时是阻塞)

非阻塞IO读:

假设已建立socket连接,并获得socket在内核空间中的fd
用户进程对fd进行系统调用recv接收数据,请求写入自定义缓冲区
用户态转内核态,假设现在内核发现对应socket下读缓冲区为空
接报错BlockingError不能立即完成一个非阻塞性套接字操作并返回-1,代码实现时要try掉
内核发现对应socket下读缓冲区有数据,把数据拷贝到内核堆栈
内核态转换为用户态,并通过寄存器返回传值给用户态中自定义的缓冲区
(如果自定义的缓冲区大小较小,则需要循环拷贝完,拷贝时是阻塞)
(如果自定义的缓冲区大小较大,则read是有多少读多少,拷贝时是阻塞)

select应用

linux select函数API:https://www.processon.com/view/link/5f601ed86376894e326d9730
在这里插入图片描述

select原理

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • select介绍:
  1. 当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
  2. select对比blocking IO其实并没有太大的不同,事实上还更差一些。因为它不仅阻塞了还多需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom),当只有一个连接请求的时候,这个模型还不如阻塞IO效率高。
  3. 用select的优势在于它可以同时处理多个connection,而阻塞IO那里不能,我不管阻塞不阻塞,你所有的连接包括recv等操作,我都帮你监听着(select/poll/epoll三种形式),其中任何一个有变动(有链接,有数据),我就告诉你用户,那么你就可以去调用这个数据了,这就是他的NB之处。
  4. 这个IO多路复用模型机制是操作系统帮我们提供的,在windows上有这么个机制叫做select,那么如果我们想通过自己写代码来控制这个机制或者自己写这么个机制,我们可以使用python中的select模块来完成上面这一系列代理的行为。
  5. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
  6. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是整个用户的process其实是一直被block的(select的timeout置-1时可以这么说)。只不过process是被select这个函数block(常规写法是while死循环下select指定timeout时间后for遍历fd表,所以可视为一直select),而不是被IO(recv)给block。
  7. 优点:相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
  8. select做得事情和第二阶段的阻塞(就是从内核态将数据拷贝到用户态的阻塞)没有关系,它始终帮你做得监听的工作,帮你节省了一些第一阶段阻塞的时间。
  9. 缺点:首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
  10. 缺点:该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。
  11. 缺点:每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
  12. 缺点:同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
  13. 缺点:select支持的文件描述符数量太小了,默认是1024
  • select的调用步骤如下:
  1. 使用copy_from_user从用户空间拷贝fdset到内核空间
  2. 注册回调函数__pollwait
  3. 遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)
  4. 以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
  5. __pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll 来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数 据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
  6. poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
  7. 如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
  8. 把fd_set从内核空间拷贝到用户空间。
  • select监听fd变化的过程分析:
  1. 用户进程创建socket对象,拷贝监听的fd(bitmap)到内核空间,每一个bit会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;(具体来说是socket读缓冲有数据把等待队列的进程唤醒到就绪队列,再争夺CPU进行操作)
  2. 用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。
  • IO多路复用的机制:
  1. select机制: Windows、Linux
  2. poll机制 : Linux和select监听机制一样,但是对监听列表里面的数量没有限制,select默认限制是1024个,但是他们两个都是操作系统轮询每一个被监听的文件描述符(如果数量很大,其实效率不太好),看是否有可读操作。
  3. epoll机制 : Linux它的监听机制和上面两个不同,他给每一个监听的对象绑定了一个回调函数,你这个对象有消息,那么触发回调函数给用户,用户就进行系统调用来拷贝数据,并不是轮询监听所有的被监听对象,这样的效率高很多。
  • 理解完IO复用后,我们在来看下实现IO复用中的三个API(select、poll和epoll)的区别和联系:select,poll,epoll都是IO多路复用的机制,I/O多路复用就是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知应用程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。三者的原型如下所示:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// select的第一个参数nfds为fdset集合中最大描述符值加1,fdset是一个位数组,其大小限制为__FD_SETSIZE(1024),
// 位数组的每一位代表其对应的描述符是否需要被检查。第二三四参数表示需要关注读、写、错误事件的文件描述符位数组,
// 这些参数既是输入参数也是输出参数,可能会被内核修改用于标示哪些描述符上发生了关注的事件,所以每次调用select前都需要重新初始化fdset。
// timeout参数为超时时间,该结构会被内核修改,其值为超时剩余的时间。
python中的select模块:
import select
fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])
参数: 可接受四个参数(前三个必须)
    rlist: wait until ready for reading  #等待读的对象,你需要监听的需要获取数据的对象列表
    wlist: wait until ready for writing  #等待写的对象,你需要写一些内容的时候,input等等,也就是说我会循环他看看是否有需要发送的消息,如果有我取出这个对象的消息并发送出去,一般用不到,这里我们也给一个[]。
    xlist: wait for an “exceptional condition”  #等待异常的对象,一些额外的情况,一般用不到,但是必须传,那么我们就给他一个[]。
    timeout: 超时时间
    当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
返回值:三个列表与上面的三个参数列表是对应的
- select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表
    1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中
    2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中
    3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中
    4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化

poll介绍

  • poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。
  • poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,相比处理fdset来说,poll效率更高。poll返回后,需要对pollfd中的每个元素检查其revents值,来得指事件是否发生。

epoll应用

https://www.processon.com/view/link/5f6034210791295dccbc1426
**加粗样式**

epoll原理

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • epoll文字描述:
  1. 假设现在创建了一个epoll(eventpoll),epfd=6,监听事件列表有三个socket,fd为3,4,5,用红黑树管理,就绪队列(列表)与等待队列为空
    用户进程执行epoll_wait,转内核态,发现绪队列为空,用户进程进入epoll等待队列,epoll进入三个socket的等待队列
  2. 某个socket的数据到了,epoll从三个socket的等待队列中出队,就绪socket对应epoll_event进入epoll就绪队列,用户进程从epoll等待队列中出队
  3. 用户进程获得CPU,内核态中的就绪列表拷贝给用户态中传入的就绪列表数组(个人感觉叫就绪队列比较好,因为有个参数可以设置取出前面指定数量个epoll_event的)
  • epoll有自己的fd(万物皆文件),由事件列表/就绪列表/等待队列三个部分组成,有create/ctl/wait三个函数
  1. 事件列表是存储要监听的epoll_event

  2. 就绪列表是存储已触发的epoll_event(图6-4,是从epoll_event里面的fd取出socket的,图6-3,eventpoll离开等待队列后,有数据的socket会进入eventpoll的就绪队列,然后epoll会把自己的等待队列中的进程唤醒变为就绪态争夺CPU,并把就绪队列拷贝到用户调用epoll_wait时传入的指针所指数组中,此时读写绝对是有数据的不会阻塞)

  3. 等待队列是当前epoll监听的所有epoll_event皆没有触发,就会把当前进程压进epoll的等待队列中(图4,注意epoll自已会被压入所有监听的socket等待队列)

  4. epoll_create传入的参数决定能装多少个socket,或者说事件列表能装多少个epoll_event(但1个socket读与写是两个epoll_event),返回文件标识符

  5. epoll_ctl是对标记epfd的epoll中的标记fd的socket进行增/删/改操作,对增与改还要配置新的epoll_event,返回值无用
    其中,epoll_event包括感兴趣事件(读/写/挂断/边缘触发/水平触发等的事件组合)和此epoll_event的data(如socket的fd)

  6. epoll_wait是访问标记epfd的epoll中的最多maxevents个epoll_event(数组指针传入),等待时时timeout,返回值是就绪列表的长度,就绪列表是调用epoll_wait时用户传入的

  7. epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

  8. epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

  9. 另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描。而epoll提供了三个函 数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注册要监听的事件类型;epoll_wait则是等待事件的产生。epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

  • epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?
  1. 对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定 EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝 一次。

  2. 对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在 epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调 函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。

  3. 对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子, 在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

  • 特别注意:
  1. 收发数据的在用户态的内核态之间传递,select的bitmap在用户态的内核态之间传递,它们都是是指针传递,然后内核都会把数据拷贝一份在内核空间,只不过bitmap最后的结果返回是在原bitmap上体现的
  2. 显然这个bitmap,每次select都要拷贝两次且上一次select的readset无效了需要重新赋值,而且select完后还要遍历一次1024个bit,有1又要拷一次fd
  3. 话说linuxC中select返回的那个int没什么意思,它是readset/writeset/exceptset三个set中做好准备的文件描述符个数(不排除一个fd三个set都有1)

总结:

  1. select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用 epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在 epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的 时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间,这就是回调机制带来的性能提升。

  2. select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要 一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内部定义的等待队列),这也能节省不少的开销。

  3. 这三种IO多路复用模型在不同的平台有着不同的支持,而epoll在windows下就不支持,好在我们有selectors模块,帮我们默认选择当前平台下最合适的,我们只需要写监听谁,然后怎么发送消息接收消息,但是具体怎么监听的,选择的是select还是poll还是epoll,这是selector帮我们自动选择的。

信号驱动模型

在这里插入图片描述

比喻

D同学让舍管阿姨等有水的时候通知他(注册信号函数),没多久D同学得知有水了,跑去装水。它还是同步IO(省不了装水的时间)。

场景

  1. 应用进程建立SIGIO信号处理程序,进行sigaction系统调用并马上返回,继续执行进程其他操作
  2. 内核等待数据准备好了,就递交SIGIO给信号处理程序
  3. 应用进程收到信号后,进行recvfrom系统调用,然后阻塞(数据从内核复制到应用缓冲区期间进程阻塞)
  4. 内核执行系统调用,将数据从内核复制到用户空间,完成后返回成功指示
  5. 应用进程收到成功指示后,处理数据报

应用

对于 socket 产生 SIGIO的条件:

  • TCP套接字:
    1.监听套接字上有新连接请求完成
    2.某个断连请求发起
    3.某个断连请求完成
    4.数据到达套接字
    5.数据已从套接字发送走(输出缓冲区有空闲空间)
    6.发生某个异步错误
  • UDP套接字:
    1.数据报到达套接字
    2.套接字上发生异步错误

代码

#ifndef _NET_INFO_H  
#define _NET_INFO_H
#define RCV_LEN 1024
#define LISTEN_PORT 9999
#endif

#include <sys/types.h>        
#include <sys/socket.h>  
#include <stdio.h>  
#include <stdlib.h>  
#include <arpa/inet.h>  
#include <unistd.h>  
#include <time.h>  
#include <string.h>  
#include <sys/select.h>  
#include <sys/time.h>  
#include <errno.h>
#include <signal.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <net/if.h>
#include <net/if_arp.h>
#include <sys/ioctl.h>
#include <sys/ioctl.h>
#include "net_info.h"
static void sigio_handler(int signo);
void sigio_socket_init(int skfd);
char recvBuf[RCV_LEN];
int listenfd;
int create_socket(){
	int skfd = socket(AF_INET, SOCK_DGRAM, 0);
	if (skfd < 0)return -1;
	struct sockaddr_in saddr;
	bzero(&saddr, sizeof(saddr));
	saddr.sin_family = AF_INET;
	saddr.sin_port = htons(LISTEN_PORT);
	saddr.sin_addr.s_addr = htonl(INADDR_ANY);
	if (bind(skfd, (struct sockaddr*)&saddr, sizeof(saddr)) < 0)return -1;
	return skfd;
}
int main(){
    int run_cnt = 0;
	listenfd = create_socket();
	if (listenfd < 0){
		perror("prepare error");
		exit(-1);
	}
	sigio_socket_init(listenfd);
	while(1){
        printf("run_cnt=%d\n",run_cnt++);
        sleep(1);
    }
	return 0;
}
void sigio_socket_init(int skfd){
	int ret = 0;
	signal(SIGIO, sigio_handler); //1.建立SIGIO信号的处理函数
	ret = fcntl(skfd, F_SETOWN, getpid()); //2.设置该套接字的属主
	if (ret < 0){
		perror("fcntl error");
		exit(-1);
	}
	int on = 1; //3.开启该套接字的信号驱动式I/O
	ret = ioctl(skfd, FIOASYNC, &on);
	if (ret < 0){
		perror("ioctl error");
		exit(-1);
	}
}
static void sigio_handler(int signo){
	if (signo != SIGIO)return;
	printf("recv SIGIO(%d) from kernel\n",signo);
    int i = 0;
    for(i = 0; i < 3; i++){
    	int ret = recvfrom(listenfd, recvBuf, sizeof(recvBuf),0, NULL,NULL);//当内核数据没有从内核拷贝下来完成,进程继续阻塞
    	if (ret < 0){
    		if (errno == EWOULDBLOCK)
    			return ;
            else{
    		    perror("recvfrom error");
    		    exit(-1);
            }
    	}			
    	printf("recv = %s\n",recvBuf);
    }
}

#include <sys/types.h>        
#include <sys/socket.h>  
#include <stdio.h>  
#include <stdlib.h>  
#include <arpa/inet.h>  
#include <unistd.h>  
#include <time.h>  
#include <string.h>  
#include <sys/select.h>  
#include <sys/time.h>  
#include <errno.h>
#include <signal.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <net/if.h>
#include <net/if_arp.h>
#include <sys/ioctl.h>
#include <sys/ioctl.h>
#include "net_info.h"
int main(){
	int skfd;
	skfd = socket(AF_INET, SOCK_DGRAM, 0);
	if (skfd < 0){
		perror("socket error");
		exit(-1);
	}
	int ret;
	time_t tm;
	struct sockaddr_in desAddr;
	bzero(&desAddr, sizeof(desAddr));
	desAddr.sin_family = AF_INET;
	desAddr.sin_port = htons(LISTEN_PORT);
	desAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
	while (1){	
		time(&tm);
		ret = sendto(skfd, ctime(&tm), strlen(ctime(&tm)), 0,
					 (struct sockaddr*)&desAddr, sizeof(desAddr));
		if (ret < 0){
			perror("");
			exit(-1);
		}
		sleep(2);
	}
	return 0;
}

异步IO模型

前面四种模型皆为同步,因数据从内核到用户空间的复制时间不可省

在这里插入图片描述

比喻

E同学让舍管阿姨将杯子装满水后通知他,整个过程E同学都可以做别的事情(没有recv),这是真正的异步IO。

场景

  1. 应用进程进行aio_read系统调用并马上返回,应用进程可以执行其他操作
  2. 内核等待数据准备好,然后复制数据到用户空间,最后递交在aio_rad中指定的信号
  3. 应用进程收到信号后,处理数据报

应用

  • 局域网聊天室
# asyncoreClientDemo.py
import asyncore,sys
class asyncoreClient(asyncore.dispatcher):
    def __init__(self,host):
        asyncore.dispatcher.__init__(self)
        self.create_socket()
        self.connect((host,6666))
        self.buffer=b'123abc'
    def handle_connect(self):
        print("connect server succeed")
    def handle_close(self):
        print("connect close")
        self.close()
    def readable(self):
        return True
    def handle_read(self):
        read_data=self.recv(1024).decode()
        print(read_data)
        self.buffer+=read_data[0:1].encode()
    def writable(self):
        return (len(self.buffer)>0)
    def handle_write(self):
        sent=self.send(self.buffer)
        self.buffer=self.buffer[sent:]
if __name__=='__main__':
    asyncoreClient('127.0.0.1')
    asyncore.loop()
# ChatClient.py
import asyncore,sys
class asyncoreClient(asyncore.dispatcher):
    def __init__(self,host):
        asyncore.dispatcher.__init__(self)
        self.create_socket()
        self.connect((host,6666))
        self.buffer=b'123abc'
    def handle_connect(self):
        print("connect server succeed")
    def handle_close(self):
        print("connect close")
        self.close()
    def readable(self):
        return True
    def handle_read(self):
        read_data=self.recv(1024).decode()
        print(read_data)
        self.buffer+=read_data[0:1].encode()
    def writable(self):
        return (len(self.buffer)>0)
    def handle_write(self):
        sent=self.send(self.buffer)
        self.buffer=self.buffer[sent:]
if __name__=='__main__':
    asyncoreClient('127.0.0.1')
    asyncore.loop()
# ChatServer.py
import asyncore,asynchat                    # 导入两包
class ChatServer(asyncore.dispatcher):      # 服务器类
    def __init__(self,port):                # 构造函数
        asyncore.dispatcher.__init__(self)  # 父类初始化(不写会默认调用,但重写了就要显式用父类才调用)
        self.create_socket()                # 创建socket
        self.set_reuse_addr()               # 地址重用
        self.bind(('127.0.0.1',port))       # 绑定
        self.listen(5)                      # 侦听,最大有五个连接等待被接入
        self.users={}                       # 创建用户列表
        self.main_room=ChatRoom(self)       # 主房间是聊天房间(为何放在server类这?因仅1服务器也仅1房间)
    def handle_accepted(self,sock,addr):    # 处理用户的接入
        print('socket:',sock,'addr:',addr)  # 打印socket与addr
        ChatSession(self,sock)              # 创建一个聊天会话类,把self(下面的server)与socket传过去
class ChatSession(asynchat.async_chat):         # 聊天会话类,负责与客户端通讯
    def __init__(self,server,sock):             # 构造函数
        asynchat.async_chat.__init__(self,sock) # 父类的构造函数就是显式调用
        self.server=server                      # 复合类(把上面的类传下来了)
        self.set_terminator(b'\n')              # 数据结束标志
        self.data=[]                            # 接受客户端的接口数据
        self.name=None                          # 保存用户名字
        self.enter(LoginRoom(self.server))      # 变登陆状态
    def enter(self,room):                       # 从当前房间移除自身,然后添加到指定房间
        try:                                    # 尝试
            cur=self.room                       # 取出当前状态
        except AttributeError:                  # 没有的话
            pass                                # 跳过
        else:                                   # 否则,就是有当前状态room
            cur.remove(self)                    # 就移除
        self.room=room                          # 把新状态更新为传入的状态(上面刚创建LoginRoom就存到这)
        room.add(self)                          # 新状态增加我
    def collect_incoming_data(self, data):      # 收集接口出现数据
        self.data.append(data.decode())         # 数据添加
    def found_terminator(self):                 # 找到了结束符
        line=''.join(self.data)                 # 将消息拼接成字符串
        self.data=[]                            # 把接口数据清空
        self.room.handle(self,line.encode())    # 处理接收的指令
    def handle_closed(self):                    # 关闭话柄
        asynchat.async_chat.handle_closed(self) # 父类结束方法
class Room:                                     # 状态类
    def handle(self,session,line):              # 处理
        line=line.decode()                      # 解码
        if not line.strip():                    # 空命令
            return                              # 返回
        parts=line.split(' ',1)                 # 切2份如login abc, say hello, 或无参的logout, look
        cmd=parts[0]                            # 读出下标为0的字段
        try:                                    # 尝试
            line=parts[1].strip()               # 获取下标为1的字段
        except IndexError:                      # 下标错误
            line=''                             # 指令为空
        method=getattr(self,'do_'+cmd,None)     # 获取'do_'+cmd的方法
        method(session,line)                    # 调用这个方法
    def __init__(self,server):                      # 构造函数
        self.server=server                          # 服务器成员属性
        self.sessions=[]                            # 会话集合
    def add(self,session):                          # 添加会话(可看到基类方法的作用是子类重写后可调用,避免重复写)
        self.sessions.append(session)               # 往会话群里加会话
    def remove(self,session):                       # 移除会话(五)
        self.sessions.remove(session)               # 从会话群里减会话(六)
    def broadcast(self,line):                       # 广播
        for session in self.sessions:               # 逐个会话读出来
            session.push(line)                      # 发送内容
    def do_logout(self,session,line):               # 登出(一)
        self.remove(session)                        # 从当前状态移出会话(二)
        del self.server.users[session.name]         # 服务器的用户列表移除用户名字(八)
class ChatRoom(Room):                               # 正在聊天的客户(成员有服务器与会话集合)
    def add(self,session):                          # 当前状态添加会话
        session.push(b'login success')              # 发送login success
        self.broadcast((session.name+' has entered the room. \n').encode())  # 广播
        self.server.users[session.name]=session     # 用户字典添加用户名-会话键值对
        Room.add(self,session)                      # 状态添加会话
    def remove(self,session):                       # 移除会话(三)
        Room.remove(self,session)                   # 状态移除(四)
        self.broadcast((session.name+' has left the room.\n').encode())  # 广播 (七)
    def do_say(self,session,line):                  # 发送信息
        self.broadcast((session.name+':'+line+'\n').encode()) # 广播
    def do_look(self,session,line):                 # 查在线用户
        session.push(b'Online Users:\n')            # 发送Online Users:\n
        for user in self.sessions:                  # 遍历会话列表
            session.push((user.name+'\n').encode()) # 发送会话中的用户名
class LoginRoom(Room):                              # 正在登陆的用户
    def add(self,session):                          # 增加
        Room.add(self,session)                      # 调用父类的添加(下面do_login末句enter会把self.room指向的对象改掉,则会话级别的LoginRoom由于不再有引用(或者说指针)指向他而被释放,服务器级别的ChatRoom是永远只有一个且不被释放,这就是为什么要在ChatServer中初始化ChatRoom为main_room然后在LoginRoom中传入self.server.main_room了)
        session.push(b'Connect Success')            # 发送信息
    def do_login(self,session,line):                # 登陆功能
        name=line.strip()                           # 取出指令
        if not name:                                # 如果用户名为空
            session.push(b'Username empty')         # 发送Username empty
        elif name in self.server.users:             # 如果名字已在服务器用户名列表
            session.push(b'Username exists')        # 发送Username exists
        else:                                       # 否则
            session.name=name                       # 赋值
            session.enter(self.server.main_room)    # 进入新的状态(就是聊天状态)
if __name__=='__main__':                            # 主函数
    print('server will run on 127.0.0.1:6666')      # 提示
    ChatServer(6666)                                # 聊天服务器
    asyncore.loop()                                 # 循环检查

# EchoClient.py
import socket
import threading
client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1',6666))
def receive(client):
    while True:
        data=client.recv(1024)
        if data:
            print(data.decode())
def send(client):
    while True:
        to_data=input()
        client.send(to_data.encode())
threading.Thread(target=receive,args=(client,)).start()
threading.Thread(target=send,args=(client,)).start()

# EchoServer.py
import asyncore
class EchoHanlder(asyncore.dispatcher):
    def handle_read(self):
        data=self.recv(1024)
        print('get data:'+data.decode())
        if data:
            self.send(data)

class EchoServer(asyncore.dispatcher):
    def __init__(self,host,port):
        asyncore.dispatcher.__init__(self)
        self.create_socket()
        self.set_reuse_addr()  # 服务器监听地址可重用
        self.bind((host,port))
        self.listen(5)  # 监听队列最大长度
    def handle_accepted(self,sock,addr):
        EchoHanlder(sock)
    
if __name__=='__main__':
    EchoServer('localhost',6666)
    asyncore.loop()

asyncore框架概念
asyncore模块是Python自带的一个原生模块,
    提供简单的API以实现异步socket通信,
    并且为我们提供了异步socket服务器端和客户端的基础架构
    在使用asyncore框架时,我们需要注意两个东西:
一、全局函数loop:创建asyncore事件循环,
    在事件循环中调用底层的select方法来检测特定网络信道,
    如果信道对应的socket对象状态发生改变,则自动产生一个高层次的事件信息,
    然后针对该信息调用相应的回调方法进行处理。
二、基类dispatcher:一个底层的socket类的封装对象,
    编程中继承于dispatchero在或其子类,
    dispatchero在里面已经定义好了socket通信中的各种事件,
    我们只需要重写特定的事件即可在该事件发生时实现自动回调处理。

为什么作为socket server的时候,服务器端的连接会自动断开?
	因为socket连接是用完即断,
	所以为了保持通信服务端必须在socket连接建立好后,
	立即创建一个新的dispatcher实例来保存这个socket连接对象,
	否则socket连接会被服务器端自动断开

补充
' 'strb' '.decode(),代表字符串对象,用于python编程
' '.encode()str.encode()b' ',代表bytes对象,用于网络传输

asyncore聊天室实战
主要知识点:
asyncore作为服务器端的主要用法
async_chat模块的使用
pySimpleGUI界面框架
telnetlib作为客户端socket模块

如何设计一个聊天室的应用?
必要条件:服务器端,多客户端
必要约束:数据传输协议——以换行符为消息分隔符
原理:服务器监听消息来源,客户端连接服务器并发送消息到服务器

async_chat模块介绍:是dispatcher这个类的抽象子类,
定义了一些专门用于数据传输方面的方法,非常有利于实现数据传输
主要方法:
collect_incoming_data:接收数据
found_terminator:当输入数据流符合由set_terminator设置的终止条件时被调用
set_terminator:设置终止条件
push:向数据传输队列压入要传输的数据


  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2021-09-19 08:21:07  更:2021-09-19 08:21:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/2 0:08:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码