C++ select模型详解(多路复用IO)
TIPS:以下内容纯属博主自主查阅资料并结合个人理解得出,各位读者仅做参考即可。
关于其它的IO模型可参考链接(阻塞I/O、非阻塞I/O、多路复用I/O、信号I/O、异步I/O):https://blog.csdn.net/qq135595696/article/details/121549381
引言
??在什么情况下需要使用select模型呢?阻塞I/O问题我们可以使用多线程处理或者将阻塞I/O改为非阻塞I/O,系统调用之后就返回结果,可是完成I/O(即获取数据),我们不得不使用polling(轮询),可每次轮询都是一次系统调用,这将导致一些情况下非阻塞I/O甚至不如阻塞I/O。既然应用进程需要一直polling(轮询),且内核也需要频繁的操作,这时就出现了新的模型——多路复用IO模型。
??让内核代理应用进程去做polling(轮询),然后应用进程只有数据准备之后再发起I/O请求不就可以了吗?的确,多路复用I/O就是这样的原理。由内核负责监控应用进程指定的socket文件描述符,当socket准备好数据(可读或可写或异常)的时候,通知应用进程。==那么有一个问题是内核如何通知应用进程呢?==这个时候我们就可以联想到设计模式中的观察者模式了。当目标发生的状态发生改变之后,通知其观察者(也就是应用进程)。这个过程中的关键逻辑就是内核保存了所有socket文件描述符,内核通过应用进程特定的回调事件通知应用进程(这个过程在C++ select模型已经透明,即默认使用select模型时,应用进程已经在内核创建了相应的回调事件)。
??在多数情况下,select更多配合的都是非阻塞I/O方式使用。如下:
APP proces:用户态(即应用进程)
Kernel:内核(内核态)
polling:非阻塞I/O,在这个过程进行轮询
Recv:系统请求数据调用
data waiting右侧:从硬件(磁盘、网卡)的数据读取数据到内核buffer中
cpu copy:从内核buffer中拷贝数据到app buffer中(这个过程需要使用CPU,即整个进程处于阻塞状态)
handle data:应用进程处理数据
??多路复用I/O的本质就是多路监听+阻塞/非阻塞I/O。一般多路复用I/O配合非阻塞I/O进行使用。因为读写socket的时候,并不确定读到什么时候才能读完数据。在一个循环读的过程中,如果设置为阻塞状态,那么进程就会挂起,这将导致多路复用I/O跟阻塞I/O不存在本质区别,即比较好的做法是设置为非阻塞状态。
??多路复用IO几乎成为了主流server方式。尤其是epoll,成为了nginx、redis,tornado等软件高性能的基石。select、poll、epoll是目前主流的多路复用I/O技术。
select模型的原理
??网络通信过程在Unix系统中通常被抽象为文件的读写过程。select模型中的一个socket文件描述符通常可以看成一个由设备驱动程序管理的一个设备,驱动程序可以知道自身的数据是否可用。同时,该设备支持阻塞操作并实现了一组自身的等待队列,如读/写等待队列用户支持上层(用户层)所需的block(阻塞)和non-block(非阻塞)操作。设备的资源如果可用(可读/可写)则会通知应用进程。反之则会让进程睡眠,等待数据到来的时候,再唤醒应用进程。
??多个这样的设备的文件描述符被放在一个队列中,然后select调用的时候遍历这个队列,如果对应的文件描述符可读/可写则会返回该文件描述符(调用应用进程的回调事件)。当遍历结束之后,如果仍然没有一个可用的文件描述符,select会让用户进程睡眠,直到等待资源可用的时候再唤醒用户进程并返回对应的文件描述符(调用应用进程的回调事件),select每次遍历都是线性的。
select模型的不足
??尽管select模型使用很便利,且具有跨平台的特性。但是select模型还是存在一些问题。select模型需要遍队列中的文件描述符,并且这个队列还有最大限制(64)。随着文件描述数量的增长,用户态和内核的地址空间的复制引发的开销也会线性增长。即使监视的文件描述符长时间不活跃,select模型还是会进行线性扫描它。
??为了解决这些问题,操作系统又提供了poll方案,但是poll模型和select模型大致相当,只是改变了一些限制。目前Linux最先进的方式是epoll模型。
select模型与C/S模型的不同点
在互联网应用中,多数架构是C/S模型,即client发出请求,server接收请求,处理之后返回响应。
- C/S模型中accept()会阻塞一直等待socket连接
- select模型只解决accept()的等待问题,但并未解决recv()、send()执行带来的阻塞问题
C++中select()函数
select(
_In_ int nfds,
_Inout_opt_ fd_set FAR * readfds,
_Inout_opt_ fd_set FAR * writefds,
_Inout_opt_ fd_set FAR * exceptfds,
_In_opt_ const struct timeval FAR * timeout
);
select模型的调用时间复杂度是线性的,即O(n)。
select模型是线程不安全的。
struct timeval {
long tv_sec;
long tv_usec;
};
#define FD_SETSIZE 64
typedef struct fd_set {
u_int fd_count;
SOCKET fd_array[FD_SETSIZE];
} fd_set;
如何突破64或者说1024的限制?答案是使用多线程技术,每个线程单独使用一个select进行检测。这样的话,系统能处理的并发连接数等于线程数*1024。
以下定义的宏能对fd_set进行处理:
| 参数 | 含义 |
---|
1 | FD_ZERO(fd_set *fdset) | 清空文件描述符集合 | 2 | FD_SET(int fd,fd_set *fdset) | 设置需要监听的描述符(把监听的描述符置为1) | 3 | FD_CLR(int fd,fd_set *fdset) | 清除监听的描述符(把监听描述符置为0) | 4 | FD_ISSET(int fd,fd_set *fdset) | 判断描述符是否设置(判断描述符是否设置为1) |
select( )工作流程
- 使用FD_ZERO宏初始化一个fd_set对象(即初始化socket队列)。实际上就是select的第2、3、4的形参。
- 使用FD_SET宏将socket文件描述符键入到fd_set对象中(即加入到socket队列中)。
- 调用select函数,等待函数返回。如果没有套接字返回,那么select函数会把fe_set对象中的socket队列清空。如果有套接字返回,那么将是返回可读/可写/异常的socket集合。其余不可读/不可写/无异常的套接字将进行清除。
- 使用FD_ISSET对返回的套接字集合进行检查,对相应的套接字进行操作。
- 之后反复执行以上几个步骤。
源码实例(简单的C++ select模型服务端server)
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <iostream>
#include <WinSock2.h>
#include <vector>
#pragma comment (lib,"ws2_32")
using std::cout;
using std::endl;
using std::cin;
int main()
{
WSADATA wsaData;
WSAStartup(MAKEWORD(2, 2), &wsaData);
SOCKET serverSock = socket(AF_INET, SOCK_STREAM, 0);
if (INVALID_SOCKET == serverSock)
{
cout << "创建服务端SOCKET" << endl;
return 0;
}
SOCKADDR_IN serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(7890);
serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
if (SOCKET_ERROR == bind(serverSock, (sockaddr *)&serverAddr, sizeof(SOCKADDR_IN)))
{
cout << "bind 失败" << endl;
return 0;
}
cout << "bind 成功" << endl;
if (INVALID_SOCKET == listen(serverSock, 5))
{
cout << "listen 失败" << endl;
return 0;
}
cout << "listen 成功" << endl;
std::vector<SOCKET> clients;
while (true)
{
fd_set reads;
FD_ZERO(&reads);
FD_SET(serverSock, &reads);
do
{
std::vector<SOCKET>::iterator begin = clients.begin();
auto end = clients.end();
for (; begin != end; begin++)
FD_SET(*begin, &reads);
} while (false);
int nRet = select(0, &reads, nullptr, nullptr, nullptr);
if (0 == nRet)continue;
if (-1 == nRet)break;
if (FD_ISSET(serverSock, &reads))
{
cout << "客户端连接" << endl;
SOCKADDR_IN clientAddr;
int addrLen = sizeof(SOCKADDR_IN);
SOCKET clientSock = accept(serverSock, (sockaddr *)&clientAddr, &addrLen);
if (INVALID_SOCKET == clientSock)
{
std::cout << "客户端连接失败 " << std::endl;
return 0;
}
cout << "客户端连接成功 " << inet_ntoa(clientAddr.sin_addr) << " port " << ntohs(clientAddr.sin_port) << endl;
clients.push_back(clientSock);
}
else
{
do
{
auto begin = clients.begin();
auto end = clients.end();
for (; begin != end; begin++)
{
if (FD_ISSET(*begin, &reads))
{
char recvBuffer[1024]{ 0 };
int nRecv = recv(*begin, recvBuffer, 1024, 0);
if (nRecv < 0)continue;
if (0 == nRecv)
{
cout << "关闭客户端\n";
begin = clients.erase(begin);
break;
}
if(nRecv > 0 )
cout << "recvLen:" << nRecv << " 数据:" << recvBuffer << endl;
}
}
} while (false);
}
}
closesocket(serverSock);
WSACleanup();
return 0;
}
select模型图示(以服务端为例)
参考文章
https://blog.csdn.net/qq_37163944/article/details/81740486
https://blog.csdn.net/qq_40477151/article/details/80314013
https://segmentfault.com/a/1190000019207061
|