TCP服务器epoll 的多种实现
对于网络IO会涉及到两个系统对象
- 用户空间中进程或者线程
- 操作系统内核
比如发生read 操作时就会经历两个阶段
- 等待数据就绪
- 将数据从内核缓冲区拷贝到用户缓冲区
由于各个阶段多有不同的情况,一组合么就产生了多种网络 IO 模型
阻塞IO
在Linux中默认所有socket 都是blocking 的,一个典型的读流程
-
当应用进程调用read 这个系统调用,如果数据没有到达,或者收到的数据包还不完整就会阻塞read 调用,等待足够的数据到达 -
Kernel准备好数据,他就会将数据从Kernel 中拷贝到用户内存,Kernel返回结果,解除block状态,重新运行起来 于是就有了下面这种服务结构
代码实现一个简单的反射服务器:
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#pragma clang diagnostic push
#pragma ide diagnostic ignored "EndlessLoop"
using std::cout;
using std::endl;
int main(int argc,char * argv[])
{
int listenfd = socket(AF_INET,SOCK_STREAM,0);
if(listenfd == -1)
{
cout<<"create listenfd failed"<<endl;
return -1;
}
struct sockaddr_in bindaddr{};
bindaddr.sin_family =AF_INET;
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_port= htons(3000);
if (bind(listenfd,(struct sockaddr*) &bindaddr, sizeof(bindaddr)) == -1)
{
cout<<"bind listen socket failed!"<<endl;
return -1;
}
if(listen(listenfd,SOMAXCONN) == -1)
{
cout<<"listen error"<<endl;
return -1;
}
while (true)
{
sockaddr_in clientaddr{};
socklen_t clientaddrlen = sizeof(clientaddr);
int clientfd = accept(listenfd,(struct sockaddr*)&clientaddr,&clientaddrlen);
if (clientfd != -1)
{
char recvBuf[32]={0};
int ret = recv(clientfd,recvBuf,32,0);
if (ret > 0)
{
cout<<"Receive data from the client:"<<recvBuf<<endl;
ret = send(clientfd,recvBuf, strlen(recvBuf),0);
if(ret != strlen(recvBuf))
cout<<"send failed"<<endl;
else
cout<<"send successfully"<<endl;
}
else
{
cout<<"Receive data error"<<endl;
}
close(clientfd);
}
}
close(listenfd);
return 0;
}
#pragma clang diagnostic pop
但这样的架构有巨大的缺陷:
- 因为所有IO都是阻塞的,这就造成
send 过程中线程将被阻塞,会浪费大量的CPU时间,效率极低
非阻塞IO
在Linux下,我们可以主动将socket 设置为非阻塞,这时流程就会编程下面这样
返回值 | 含义 |
---|
大于0 | 接收到的字节数 | 等于0 | 连接正常断开 | 等于-1,error等于EAGAIN | 表示recv操作还没有完成 | 等于-1,error不等于EAGAIN | 表示recv操作遇到系统错误 |
使用如下函数将socket 设置为非阻塞状态
fcntl( fd, F_SETFL, O_NONBLOCK );
于是我们可以实现如下模型
可以看到服务器线程可以通过循环调用 recv()接口,可以在单个线程内实现对所有连接的数据接收工作。但是上述模型绝不被推荐。因为,循环调用 recv()将大幅度推高 CPU 占用率;此外,在这个方案中 recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如 select()多路复用模式, 可以一次检测多个连接是否活跃
多路复用IO (IO multiplexing)
采用Linux中的select 或者poll
下面我们以select 举例
select函数用于检测一组socket 中是否有事件就绪.这里的事件为以下三类:
- 读事件就绪
- 在
socket 内核中,接收缓冲区中的字节数大于或者等于低水位标记SO_RCVLOWAT ,此时调用rec 或read 函数可以无阻塞的读取该文件描述符,并且返回值大于零 - TCP连接的对端关闭连接,此时本端调用r
recv 或read 函数对socket 进行读操作,recv 或read 函数返回0 - 在监听的
socket 上有新的连接请求 - 在
socket 尚有未处理的错误 - 写事件就绪
- 在
socket 内核中,发送缓冲区中的可用字节数大于等于低水位标记时,可以无阻塞的写,并且返回值大于0 socket 的写操作被关闭时,对一个写操作被关闭的socket 进行写操作,会触发SIGPIPE信号socket 使用非阻塞connect 连接成功或失败时 - 异常事件就绪
select() 如下:
#include <sys/select.h>
int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout);
参数说明
nfds: | Linux上的socket 也叫作fd,将这个参数的值设置为所有需要使用select函数检测事件的fd中的最大值加1即nfds=max(fd1,fd2,...,fdn)+1 |
---|
readfds: | 需要监听可读事件的fd集合 | writefds: | 需要监听可写事件fd的集合 | exceptfds: | 需要监听异常事件的fd集合 | timeout: | 超时时间,即在这个参数设定的时间内检测这些fd的事件,超过这个时间后,select 函数立即返回,这是一个timeval 结构体 |
其定义如下:
struct timeval{
long tv_sec;
long tv_usec;
}
参数readfds,writefds,exceptfds 的类型都是fd_set ,这是一个结构体信息
定义如下
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
#define __FD_ELT(d) ((d) / __NFDBITS)
#define __FD_MASK(d) ((__fd_mask) (1UL << ((d) % __NFDBITS)))
typedef struct
{
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#endif
} fd_set;
#define FD_SETSIZE __FD_SETSIZE
假设未定义__USE_XOPEN 整理一年
typedef struct
{
long int fds_bits[__FD_SETSIZE / __NFDBITS];
} fd_set;
将一个fd添加到fd_set 这个集合中时需要使用FD_SET 宏,其定义如下:
void FD_SET(fd, fdsetp)
实现如下:
#define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp)
__FD_SET (fd, fdsetp) 实现如下:
# define __FD_ZERO(set) \ do { \ unsigned int __i; \ fd_set *__arr = (set); \ for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i) \ __FDS_BITS (__arr)[__i] = 0; \ } while (0)#endif #define __FD_SET(d, set) \ ((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))
举个例子,假设现在fd的值为43,那么在数组下表为0的元素中第43个bit被置为1
再Linux上,向fd_set集合中添加新的fd时,采用位图法确定位置;在windows中添加fd至fd_set的实现规则依次从数组第0个位置开始向后递增
也就是说,FD_SET 宏本质上是在一个有1024个连续bit 的数组的第fd 位置置1 .
同理,FD_CLR 删除一个fd 的原理,也就是将数组的第fd 位置置为0
实例;
#include <sys/types.h>#include <sys/socket.h>#include <arpa/inet.h>#include <unistd.h>#include <iostream>#include <cstring>#include <sys/time.h>#include <vector>#include <cerrno>
使用nc -v 127.0.0.1 3000 来模拟客户端,打开三个终端
关于以上代码,需要注意以下几点:
-
select 函数在调用前后可能会修改readfds,writefds,exceptfds 所以想在下次调用select 函数时服用这些fd_set 变量需要重新清零,添加内容 for (int i = 0; i < clientfdslength; ++i) { if(clientfds[i] != INVALID_FD) { FD_SET(clientfds[i],&readset); if(maxfd<clientfds[i]) maxfd = clientfds[i]; } }
-
select 函数也会修改timeval 结构体的值,如果想复用这些变量,需要重新设置 timeval tm{}; tm.tv_sec = 1; tm.tv_usec =0;
-
如果将select 的timeval 参数设置为NULL ,则select 函数会一直阻塞下去
|