IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> 【网络编程】IO 多路复用 select -> 正文阅读

[系统运维]【网络编程】IO 多路复用 select

总述

????????在这个实验里,尝试对 IO 多路复用 select 接口进行使用。

????????在?【网络编程】TCP socket 单机通信实验?里实现了一个 client 线程和一个 server 线程进行 socket 通信的 demo。对于 server 线程来说,既要监听、接受连接请求,又要收、发、处理与客户端交互的数据。一个 server 线程在同一时间只能应付一个 client,在处理一个 client 的数据的时候就不能接受另一个 client 的连接请求。对于多客户端的场景,这样的 server 基本没有实用价值。

?????????一种解决问题的思路是:由一个线程/进程监听连接请求,由其他多个线程/进程分别对接每一个 client,这样就能让 server 同时处理与多个 client 的数据交互、及时接受新的 client 的连接请求:

????????但是,如果连接请求非常多,server 也负担不起一个 client 一个处理线程带来的资源消耗,线程切换也需要成本。而且实际上,大多数的时候,client 和 server 之间都没有数据交互。针对这一点,可以考虑用 IO 多路复用(select、poll、epoll):IO 多路复用可以实现一个线程监视多个文件句柄;一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;没有文件句柄就绪时会阻塞应用程序,交出 cpu。多路指的是多路网络连接,复用指的是复用同一个线程。本质上, IO 多路复用是把 accept、recv、send 阻塞等待事件就绪的工作集中起来交给内核管理。

主体流程结构

????????在?【网络编程】TCP socket 单机通信实验?的基础上,把客户端线程增加到 3 个,把单个服务端线程拆成一个监听线程和一个工作线程。在监听线程里监听端口、和 client 建立连接,把连接句柄传给工作线程;在工作线程里用 select 监测所有已连接的句柄是否可读,如果可读则进行数据的收发处理。

实验结果

????????select 接口可以监测到 3 个连接句柄上的可读事件,并把事件交给应用层,让应用程序完成后续的数据读写交互:

select 接口

函数原型

int select (int maxfdp, 
            fd_set *readset, 
            fd_set *writeset, 
            fd_set *exceptest, 
            struct timeval *timeout);

参数?maxfdp

????????第一个参数 maxfdp?是所有要监测的文件描述符的最大值加 1,注意:不是要监测的文件描述符的个数加 1文件描述符本质上就是一个整数,值超过 maxfdp 的文件描述符不会被 select 监测。

如果给 maxfdp 传入 3 + 1(要监测的文件描述符个数加 1):

select 不会对 3 个客户端的连接句柄进行监测:

3 个客户端发来的数据累积在服务端的接收窗口中,不会被服务端的应用层读取:

参数 指向 fd_set 的指针

????????第二、三、四个参数都是指向 fd_set 的指针。第二个参数?readset 对应可读事件的描述符集合;第三个参数?writeset 对应可写事件的描述符集合;第四个参数?exceptest 对应异常事件的描述符集合。

typedef struct {
    unsigned long fds_bits[__FDSET_LONGS];
} fd_set;

????????fd_set 实质上是一个无符号长整形的数组。数组的大小 __FDSET_LONGS 是一个宏定义,值是 1024/(8 * sizeof(unsigned long)),数组实际占用空间也就是?1024 / 8 = 128 Byte = 1024 bit。

????????fd_set 以位图的形式表示文件描述符集合:一个比特位对应一个文件描述符,用对应位置的比特位的 0 或 1 来表示是否对文件描述符进行监测或者文件描述符是否就绪。

????????因为这样的数组大小、这样的表示方式,导致 select 最多就只能监测 0~1023 的文件描述符,总计 1024 个 —— 是存放文件描述符的数据结构 fd_set?限制了 select 最多只能监听 1024 个文件描述符

????????select 提供了一些??可以用来操作文件描述符集合,实际上也就是更改对应比特位的 0、1 值:

FD_ZERO(fd_set* fds);

清空文件描述符集合 fds,也就是把所有比特位都置为 0

FD_SET(int fd, fd_set* fds);

把文件描述符 fd 加入集合 fds,也就是把对应的比特位置为 1

FD_ISSET(int fd, fd_set* fds); 

判断文件描述符 fd 是否在集合 fds 中,也就是判断对应的比特位是否为 1

FD_CLR(int fd, fd_set* fds);

把文件描述符?fd?从集合?fds?中删除,也就是把对应比特位置为 0

????????在使用方式上,先调用 FD_ZERO 将 fd_set 清零,然后调用 FD_SET 把需要监测的文件描述符加入 fd_set,接着调用函数 select 测试 fd_set 中的所有文件描述符,select 函数返回后,用宏 FD_ISSET 检查某个文件描述符对应的比特位是否仍然为 1,如果为 1 表示有事件就绪,为 0 则反之。

参数 超时时间

????????最后一个参数?timeout 是超时时间,表示等待多长时间之后就放弃等待,传 NULL 表示等待无限长的时间,持续阻塞直到有事件就绪才返回。

????????timeout 指向的?timeval 结构体有秒和微秒两个成员:

struct timeval
{
    __time_t tv_sec;        /* Seconds. */
    __suseconds_t tv_usec;  /* Microseconds. */
};

????????使用 timeval 时,直接对成员进行赋值就好,eg.

struct timeval timeout = {0};
timeout.tv_sec = 1;
timeout.tv_usec = 0;

返回值

????????如果 select 执行失败,返回 -1;如果超时,返回 0;如果有描述符就绪,则返回就绪的描述符的个数。

select 缺点

select 有几个主要的缺点,都跟文件描述符集合 fd_set 有关:

  1. 每次调用 select,都需要把 fd_set 从用户态拷贝到内核态,select 检测到有就绪事件或者超时返回时,又要把 fd_set 再从内核态拷贝回用户态,开销比较大;
  2. select 监测文件描述符用的是轮询的方法,一遍一遍对文件描述符集合进行线性扫描,检查是否有事件就绪,效率比较低;
  3. select 最多只能监测 1024 个文件描述符,数量有限;

完整代码实现

头文件

#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/syscall.h>

宏定义

#define LOCAL_IP_ADDR       "127.0.0.1"
#define SERVER_LISTEN_PORT  5197
#define MAX_LISTEN_EVENTS   16
#define NET_MSG_BUF_LEN     128
#define CLINET_SEND_MSG     "Hello Server~"
#define SERVER_SEND_MSG     "Hello Client~"

功能函数获取线程 ID

pid_t gettid(void) {
    // 需引入头文件 sys/syscall.h
    return syscall(SYS_gettid);
}

客户端线程入口函数

void* client(void* param) {
    int iRes = 0, iConnFd = 0, iNetMsgLen = 0;
    pthread_t thdId = gettid();
    char szNetMsg[NET_MSG_BUF_LEN] = {0};
    struct sockaddr_in stServAddr;

    iConnFd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == iConnFd) {
        printf("Client[%u] failed to create socket, err[%s]\n", 
               thdId, strerror(errno));
        return NULL;
    }

    // 填充目标地址结构体,指定协议族、目标端口、目标主机 IP 地址
    stServAddr.sin_family = AF_INET;
    stServAddr.sin_port = htons(SERVER_LISTEN_PORT);
    stServAddr.sin_addr.s_addr = inet_addr(LOCAL_IP_ADDR);
    // 1 参传套接字句柄,2 参传准备连接的目标地址结构体指针,3 参传地址结构体大小
    while (1) {
        iRes = connect(iConnFd, (struct sockaddr *)&stServAddr, 
                       sizeof(stServAddr));
        if (0 != iRes) {
            printf("Client[%u] failed to connect to[%s:%u], err[%s]\n",
                   thdId, LOCAL_IP_ADDR, SERVER_LISTEN_PORT, 
                   strerror(errno));
            sleep(2);
            continue;
        } else {
            printf("Client[%u] succeeded to connect to[%s:%u]\n", 
                   thdId, LOCAL_IP_ADDR, SERVER_LISTEN_PORT);
            break;
        }
    }

    iNetMsgLen = send(iConnFd, CLINET_SEND_MSG, 
                      strlen(CLINET_SEND_MSG), 0);
    if (iNetMsgLen < 0) {
        printf("Client[%u] failed to send msg to server, err[%s]\n", 
               thdId, strerror(errno));
        close(iConnFd);
        return NULL;
    }

    iNetMsgLen = recv(iConnFd, szNetMsg, sizeof(szNetMsg), 0);
    if (iNetMsgLen < 0) {
        printf("Client[%u] failed to read from network, err[%s]\n", 
               thdId, strerror(errno));
    } else {
        printf("Client[%u] recv reply[%s]\n", thdId, szNetMsg);
    }
    
    close(iConnFd);
    return NULL;
}

功能函数处理可读事件

int eventProc(int *piConnFdSet, fd_set *pfsReadSet) {
    int iIndex = 0, iConnFd = 0, iNetMsgLen = 0;
    char szNetMsg[NET_MSG_BUF_LEN] = {0};

    for (iIndex = 0; iIndex < MAX_LISTEN_EVENTS; iIndex++) {
        if (!FD_ISSET(piConnFdSet[iIndex], pfsReadSet)) {
            continue;
        }

        // 用临时变量简化代码
        iConnFd = piConnFdSet[iIndex];
        
        // 接收 client 消息
        iNetMsgLen = recv(iConnFd, szNetMsg, sizeof(szNetMsg), 0);
        if (iNetMsgLen < 0) {
            printf("Server failed to recv from network, err[%s]\n", 
                   strerror(errno));
            close(iConnFd);
            return -1;
        }
        
        printf("Server recv msg[%s]\n", szNetMsg);

        // 答复 client
        iNetMsgLen = send(iConnFd, SERVER_SEND_MSG, 
                          strlen(SERVER_SEND_MSG), 0);
        if (iNetMsgLen < 0) {
            printf("Server failed to reply client, err[%s]\n", 
                   strerror(errno));
            close(iConnFd);
            return -1;
        }

        piConnFdSet[iIndex] = 0;
        close(iConnFd);
    }
    
    return 0;
}

服务端工作线程入口函数

void* serverWork(void* param) {
    int iRes = 0, iIndex = 0, iMaxFd = 0, iEventNum = 0;
    int *piConnFdSet = (int *)param;
    fd_set fsReadSet;
    struct timeval timeout = {0};
    
    // 监听
    while (1) {
        timeout.tv_sec = 1;
        timeout.tv_usec = 0;
		
        FD_ZERO(&fsReadSet);
        #if 1
        iMaxFd = 0;
        for (iIndex = 0; iIndex < MAX_LISTEN_EVENTS; iIndex++) {
            FD_SET(piConnFdSet[iIndex], &fsReadSet);
            iMaxFd = piConnFdSet[iIndex] > iMaxFd ? 
                     piConnFdSet[iIndex] : iMaxFd;
        }
        #else
        iMaxFd = 3;
        #endif

        iEventNum = select(iMaxFd + 1, &fsReadSet, NULL, NULL, &timeout);
        if (-1 == iEventNum) {
            printf("Server failed to select event.\n");
            break;
        }
		
        printf("Server select get [%u] event\n", iEventNum);
        if (0 == iEventNum) {
            continue;
        }

        iRes = eventProc(piConnFdSet, &fsReadSet);
        if (0 != iRes) {
            printf("Server failed to proc event.\n");
            break;
        }        
    }

    return NULL;
}

服务端监听线程入口函数

void* serverLsn(void* param) {
    int iRes = 0, iIndex = 0;
    int iLsnFd = 0, iConnFd = 0, iReusePort = 0, iSockAddrLen = 0;
    int *paiConnFdSet = (int *)param;
    struct sockaddr_in stLsnAddr;
    struct sockaddr_in stCliAddr;

    // 创建 socket
    iLsnFd = socket(AF_INET, SOCK_STREAM, 0);                       
    if (-1 == iLsnFd) {
        printf("Server failed to create socket, err[%s]\n", 
               strerror(errno));
        return NULL;
    }

    // 设置端口复用
    iReusePort = 1;
    iRes = setsockopt(iLsnFd, SOL_SOCKET, SO_REUSEPORT, &iReusePort, 
                      sizeof (iReusePort));
    if (-1 == iRes) {
        printf("Server failed set reuse attr, err[%s]\n", 
               strerror(errno));
        close(iLsnFd);
        return NULL;
    }

    stLsnAddr.sin_family = AF_INET;
    stLsnAddr.sin_port = htons(SERVER_LISTEN_PORT);
    stLsnAddr.sin_addr.s_addr = INADDR_ANY;    
    // 绑定端口
    iRes = bind(iLsnFd, (struct sockaddr*)&stLsnAddr, 
                sizeof(stLsnAddr));   
    if (-1 == iRes) {
        printf("Server failed to bind port[%u], err[%s]\n", 
               SERVER_LISTEN_PORT, strerror(errno));
        close(iLsnFd);
        return NULL;
    } else {
        printf("Server succeeded to bind port[%u], start listen.\n", 
               SERVER_LISTEN_PORT);
    }


    // 监听
    iRes = listen(iLsnFd, MAX_LISTEN_EVENTS);
    if (-1 == iRes) {
        printf("Server failed to listen port[%u], err[%s]\n", 
               SERVER_LISTEN_PORT, strerror(errno));
        close(iLsnFd);
        return NULL;
    }
    
    while (iIndex < MAX_LISTEN_EVENTS) {
        iSockAddrLen = sizeof(stCliAddr);
        // 1 参传入监听句柄,2 传入地址结构体指针接收客户端地
        // 3 参传入地址结构体大小
        iConnFd = accept(iLsnFd, (struct sockaddr*)&stCliAddr, 
                         &iSockAddrLen);
        if (-1 == iConnFd) {
            printf("Server failed to accept connect request, err[%s]\n",
                   strerror(errno));
            break;
        } else {
            printf("Server accept connect request from[%s:%u]\n", 
                   inet_ntoa(stCliAddr.sin_addr), 
                   ntohs(stCliAddr.sin_port));
            paiConnFdSet[iIndex] = iConnFd;
            iIndex++;
        }
    }
    
    close(iLsnFd);
    return NULL;
}

主函数

int main() {
    // 线程 ID,实质是 unsigned long 类型整数
    pthread_t thdServerWork = 101;
    pthread_t thdServerLsn = 102;
    pthread_t thdClient1 = 1;
    pthread_t thdClient2 = 2;
    pthread_t thdClient3 = 3;
    
    // 监测是否可读的 socket 文件描述符集合 
    int aiConnFdSet[MAX_LISTEN_EVENTS] = {0};
    
    // 1 参传线程 ID,2 参传线程属性,
    // 3 参指定线程入口函数,4 参指定传给入口函数的参数
    pthread_create(&thdServerWork, NULL, serverWork, &aiConnFdSet[0]);
    pthread_create(&thdServerLsn, NULL, serverLsn, &aiConnFdSet[0]);
    pthread_create(&thdClient1, NULL, client, NULL);
    pthread_create(&thdClient2, NULL, client, NULL);
    pthread_create(&thdClient3, NULL, client, NULL);


    // 1 参传入线程 ID,2 参用于接收线程入口函数的返回值,不需要返回值则置 NULL
    pthread_join(thdServerWork, NULL);
    pthread_join(thdServerLsn, NULL);
    pthread_join(thdClient1, NULL);
    pthread_join(thdClient2, NULL);
    pthread_join(thdClient3, NULL);
    
    return 0;
}
  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2021-10-12 23:53:40  更:2021-10-12 23:54:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 18:28:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码