一、五种IO模型
内存和外设进行沟通的动作叫做IO。在网络层面,数据往网络里写的本质是将数据从内存写到网卡设备上,从网络里读的本质是将数据从网卡设备读到内存中。
任何IO过程,都要包含两个步骤,第一是等待, 第二是拷贝。而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间。所以高效IO的本质是, 尽可能地减少等待的比重。
- 读 recv/recvfrom: 等待内核的接收缓冲区当中有数据来(等待IO过程)、接收缓冲区之中有了数据(等待IO就绪)。 recv/recvfrom(sockfd,buff,size,0): 将接收缓冲区的数据拷贝到用户层空间(buff)之中(拷贝)。
- 写 send/sendto:等待内核的发送缓冲区当中有空间(等待IO过程)、发送缓冲区之中有了空间(等待IO就绪)。将应用层数据,拷贝到发送缓冲区之中(拷贝)。
低阶IO:是指类似于将用户输入的内容读取到某个变量中,将变量中的值打印在屏幕上等,简单来说就是对C库自己所维护的缓冲区进行I/O操作。
高阶IO:通常应用于网络Socket编程,对UDP(TCP)所维护的发送缓冲区和接收缓冲区进行I/O操作。并且高阶IO分为同步IO和异步IO,同步IO又分为阻塞IO、非阻塞IO、信号驱动IO和多路转接IO。
1.1. 阻塞IO
资源不可用的情况下(比如缓冲区没数据或者缓冲区满了),IO请求一直被阻塞,直到资源可用,就叫做阻塞IO,所有的套接字, 默认都是阻塞方式,阻塞IO是最常见的IO模型。
阻塞IO的特点:
- 一旦调用阻塞IO,阻塞等待的时长取决于内核。
- 在等待过程当中,对于等待的执行流而言,CPU的利用率是极低的。
- 在IO就绪到拷贝数据之间,实时性是非常高的(有鱼咬立马起杠提钩)。
- 阻塞IO的代码比较简单,容易编写。
1.2. 非阻塞IO
资源不可用的时候,IO请求不会阻塞,而是直接返回,返回当前资源不可用,并且返回EWOULDBLOCK错误码。
如果当前资源不可用,IO请求返回之后,表示本次IO请求没有真正完成,所以想要完成IO请求,非阻塞需要搭配循环使用,直至完成IO请求。
非阻塞IO往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询。这对CPU来说是较大的浪费, 一般只有特定场景下才使用
非阻塞IO特点:
-
非阻塞IO对CPU的利用率比阻塞IO高。 -
代码复杂,流程控制复杂,因为需要循环的缘故。 -
需要搭配循环调用,直至IO请求完成。 -
IO准备就绪到拷贝数据之间不够实时。 -
区分阻塞IO和非阻塞IO:只需要关心IO调用是否立即返回即可,没有立即返回说明是阻塞的,直接返回说明是非阻塞的。
while{
非阻塞IO调用
}
非阻塞的函数接口
文件描述符, 默认都是阻塞IO,如果想要修改为非阻塞,则需要fcntl来设置其属性
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... )
cmd参数:
复制一个现有的描述符(cmd=F_DUPFD).
获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)
通过fcntl,实现一个非阻塞的文件描述符:
void SetNoBlock(int fd)
{
int fl=fcntl(fd,F_GETFL);
if(fl < 0)
{
cerr<<"fctnl error"<<endl;
exit(0);
}
fcntl (fd,F_SETFL,fl|O_NONBLOCK);
}
- 使用F_GETFL将当前的文件描述符的属性取出来(这是一个位图)。
- 然后再使用F_SETFL将文件描述符设置回去。设置回去的同时, 加上一个O_NONBLOCK参数。
比如现在想让键盘输入变为非阻塞:
#include<iostream>
#include <unistd.h>
#include <fcntl.h>
#include<stdlib.h>
#include<errno.h>
using namespace std;
void SetNoBlock(int fd)
{
int fl=fcntl(fd,F_GETFL);
if(fl < 0)
{
cerr<<"fctnl error"<<endl;
exit(0);
}
fcntl (fd,F_SETFL,fl|O_NONBLOCK);
}
int main()
{
SetNoBlock(0);
while (1)
{
sleep(1);
char buf[1024] = {0};
ssize_t read_size = read(0, buf, sizeof(buf) - 1);
if(read_size>0)
{
cout<<"echo:"<<buf<<endl;
}
else if (read_size < 0&&errno==EAGAIN)
{
cout<<"read cond not ok!"<<endl;
}
else
{
cout<<"read error"<<endl;
}
}
return 0;
}
1.3. 信号驱动IO
信号驱动IO,预先在内核中设置一个回调函数,当某个事件发生时,内核使用信号(SIGIO)通知进程来处理(运行回调函数)。 它也可以看成是一种异步IO,因为检测fd是否有数据和是否可读写是在两个流程中做的。
信号驱动IO特点:
-
IO准备就绪,到拷贝数据之间,实时性增强了。 -
代码更加复杂,流程控制更加困难,因为引入了信号。 -
它的优势是,进程没有收到SIGIO信号之前,不被阻塞,可以做其他事情。 -
它的劣势是,当数据量变大时,信号产生太频繁,性能会非常低。内核需要不断的把数据复制到用户态。
一般信号驱动IO用于UDP,不过这里为了简化说明,还是采用标准输入的形式:
#include<iostream>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#include<stdlib.h>
using namespace std;
void EchoSometime(int signal)
{
char buf[256] = {0};
ssize_t read_size = read(0, buf, sizeof(buf)-1);
if(read_size>0)
{
cout<<"echo:"<<buf<<endl;
}
}
void SetNoBlock(int fd)
{
int fl=fcntl(fd,F_GETFL);
if(fl < 0)
{
cerr<<"fctnl error"<<endl;
exit(0);
}
fcntl (fd,F_SETFL,fl|O_NONBLOCK|O_ASYNC);
}
int main()
{
struct sigaction act;
act.sa_flags = 0;
act.sa_handler = EchoSometime;
sigaction(SIGIO, &act, NULL);
fcntl(0, F_SETOWN, getpid());
SetNoBlock(0);
while (1);
return 0;
}
UDP的代码:
#include <fcntl.h>
#include <netinet/in.h>
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
int socket_fd = 0;
void do_sometime(int signal) {
struct sockaddr_in cli_addr;
socklen_t clilen = sizeof(cli_addr);
char buffer[256] = {0};
int len = recvfrom(socket_fd, buffer, 256, 0, (struct sockaddr *)&cli_addr,&clilen);
printf("Mes:%s\n", buffer);
sendto(socket_fd, buffer, len, 0, (struct sockaddr *)&cli_addr, clilen);
}
int main(int argc, char const *argv[]) {
socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
struct sigaction act;
act.sa_flags = 0;
act.sa_handler = do_sometime;
sigaction(SIGIO, &act, NULL);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8080);
servaddr.sin_addr.s_addr = INADDR_ANY;
fcntl(socket_fd, F_SETOWN, getpid());
int flags = fcntl(socket_fd, F_GETFL, 0);
flags |= O_NONBLOCK;
flags |= O_ASYNC;
fcntl(socket_fd, F_SETFL, flags);
bind(socket_fd, (struct sockaddr *)&servaddr, sizeof(servaddr));
while (1);
close(socket_fd);
return 0;
}
1.4. 多路转接IO
虽然从流程图上看起来和阻塞IO类似。实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态。 内核帮我们监控了多个文件描述符,当某一个或者若干个文件描述符就绪的时候,就会通知调用者,调用者调用系统调用函数针对就绪的文件描述符进行操作。
1.5. 异步IO
注:为了性能和效率的优先,C++默认采用的是异步IO的方式。 由内核在数据拷贝完成时, 通知应用程序(信号驱动IO是告诉应用程序何时可以开始拷贝数据,异步IO的拷贝工作由操作系统内核完成)。
实现流程:
-
自定义信号处理函数,通知数据拷贝完成。 -
发起一个异步IO调用,并且异步IO调用直接返回。 -
异步IO调用返回之后,执行流可以执行用户代码,由操作系统内核等待IO就绪和数据拷贝。 -
数据拷贝完成之后,内核通过信号通知调用者。
二、高级IO重要概念
2.1. 同步通信 vs 异步通信
同步与异步讨论的是调用者是否会主动等待调用结果
- 同步:调用者发出调用时,没有得到结果不会返回,阻塞等待,调用者主动等待该调用结果。
- 异步:与同步相反,发出调用后立即返回,调用内的工作由别人完成,自己并不参与,等待被调用者的通知,直接使用。
这里的同步通信和进程之间的同步是完全不想干的概念。
进程/线程同步也是进程/线程之间直接的制约关系。是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系.尤其是在访问临界资源的时候 。 以后在看到 “同步” 这个词, 一定要先搞清楚大背景是什么. 这个同步, 是同步通信异步通信的同步, 还是同步与互斥的同步.
2.2. 阻塞与非阻塞
阻塞与非阻塞讨论的是在等待调用结果时的状态。
- 阻塞调用是指在等待时,当前线程会被挂起. 调用线程只有在得到结果之后才会返回。
- 非阻塞调用指在等待时,该线程可以执行其他任务,不被操作系统挂起。
三、多路转接IO模型
作用:IO多路转接可以完成大量文件描述符的监控,监控的时间包括:可读事件、可写事件、异常事件。 监控文件描述符:那么个文件描述符就绪,就处理哪一个文件描述符。
多路转接适用于长链接的情况。 因为长链接连接时间比短链接更长,更多的时间是在等待。
3.1. select模型
select系统调用是用来让我们的程序监视多个文件描述符的状态变化的,程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变。
select、poll、epoll都只负责一件事情–等待,由于等待的是多个文件描述符,而IO主要是两个过程,等待和拷贝。多路转接,一次性等待多个文件描述符,等待的效率提高了,所以IO效率提高了。
实现流程:
-
将用户关心的文件描述符拷贝到内核之中,由内核进行监控 -
如果内核监控到某个文件描述符就绪,则返回该描述符 -
用户针对返回的描述符进行操作
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
nfds:最大文件描述符值+1(select遍历文件描述符数组),
比如要等的文件描述符是1和7,这个值填8
fd_set:文件描述符的位图,既做输入参数,又做输出型参数,
输入代表需要OS检测的文件描述符,输出为就绪的文件描述符,只能最多同时监控1024个
readfds:读事件位图
writefds:写事件位图
exceptfds:异常位图
timeout:
如果为NULL,则一直阻塞式等待,
0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生。
特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回
这个参数是一个结构体指针,其对应的结构体为:
struct timeval
{
long tv_sec:秒级
long tv_usec:微秒
}
返回值:
执行成功则返回所有就绪文件描述符的个数
0表示时间超过timeout
-1代表等待出错
系统提供了四个接口来操作fd_set位图
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
fd_set作为输入输出型参数,select每次调用后,原来的参数数据就被覆盖了,所以需要通过数组保存原来的数据,每次都需要对传入的参数如readfds进行重新设置。
select执行过程: 理解select模型的关键在于理解fd_set,为说明方便,取fd_set长度为1字节,fd_set中的每一bit可以对应一个文件描述符fd。则1字节长的fd_set最大可以对应8个fd。
- 执行fd_set set; FD_ZERO(&set);则set用位表示是0000,0000。
- 若fd=5,执行FD_SET(fd,&set);后set变为0001,0000(第5位置为1)
- 若再加入fd=2,fd=1,则set变为0001,0011。
- 执行select(6,&set,0,0,0)阻塞等待 。
- 若fd=1,fd=2上都发生可读事件,则select返回,此时set变为0000,0011。注意:没有事件发生的fd=5被清空。如果fd=3上发生可读事件,由于fd=3并没有被设置,所以对应的位图不会由0变1。
socket就绪条件 读就绪:
- socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT。此时可以无阻塞的读该文件描述符, 并且返回值大于0;
- socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;
- 监听的socket上有新的连接请求,也会被当做读事件处理。
- socket上有未处理的错误;
写就绪:
- socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0;
- socket的写操作被关闭(close或者shutdown)。对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号;
- socket使用非阻塞connect连接成功或失败之后,也会被当做写事件处理。
- socket上有未读取的错误;
代码演示:
#include<iostream>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include<stdlib.h>
using namespace std;
int main()
{
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(0,&readfds);
while(1)
{
struct timeval tv;
tv.tv_sec=3;
tv.tv_usec=0;
int ret=select(1,&readfds,NULL,NULL,&tv);
if(ret < 0)
{
cerr<<"select error"<<endl;
exit(1);
}
if(ret==0)
{
cout<<"time out"<<endl;
if(FD_ISSET(0,&readfds)==0)
{
cout<<" 0 fd is not in readfds"<<endl;
}
FD_SET(0,&readfds);
continue;
}
if(FD_ISSET(0,&readfds)!=0)
{
char buff[100]={0};
read(0,buff,sizeof(buff)-1);
cout<<"echo:"<<buff<<endl;
}
}
return 0;
}
select优缺点
优点:
- 遵循的是posix标准,可移植性强,即可以跨平台使用。
- select超时时间可以精确到微秒。
- 一次可以等待多个文件描述符,提高了等待的效率,即提高了IO的效率。
缺点:
- select的fd事件集合(位图)参数为输入输出型一体,因此每次调用select都需要再次设置fd事件集合,影响程序运行效率,比较麻烦。
- 每次调用select都需要将fd拷贝至内核之中,就绪的文件描述符也需要从内核之中拷贝至用户,当select面临的链接很多时,会因为拷贝数据而降低效率。
- 每次调用select都需要在内核遍历传递进来的所有fd事件集合,如果需要遍历的文件描述符过多,遍历的开销会很大。
- select的文件描述符个数有上限,在centos 7 之中为1024,无法进行大量的监控。
简易的select单进程服务器
tcp服务器大致有以下工作:1.创建套接字⒉.绑定IP和端口3.创建监听套接字4.accept获取连接然后开始提供服务。
select单进程服务器采用下面的实现方式:
- 创建一个辅助数组fd_array,用来存储已经打开的套接字,监听套接字放在下标为0的位置。
- 在每次select前,都要将fd_array中的套接字放到读事件位图中,并记录最大的套接字的值。因为每次select都会改变这个位图,所以需要重新设置。
- 当select返回值大于0时,表示有可读文件描述符就绪了。
- 如果就绪的文件描述符为监听套接字,说明有新的connect请求,此时要accept建立连接,然后将新的套接字放入辅助数组fd_array中。
- 如果就绪的文件描述符为连接的套接字,则用该文件描述符进行通信,通信完毕之后,将该文件描述符从辅助数组中去除并close关闭文件描述符。(如果close关闭掉文件描述符而不从辅助数组中移除,就会导致select出错,因为select在监听一个不存在的文件)。
相对于传统方式的优势: 传统方式中,由于accept和send、recv都是阻塞方式进行,因此单执行流一次只能有一个连接。如果想要多个链接同时运行,就需要创建多进程或者多线程的方式进行。 而采用select监控套接字的方式,可以一次性监控多个(1024个)套接字。一次性可以对多个套接字进行操作,用单执行流完成多执行流的事情。
Sock.h:
#pragma once
#include<iostream>
#include<string>
#include<sys/socket.h>
#include<netinet/in.h>
#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<arpa/inet.h>
#include<string.h>
#include<sys/select.h>
#include<stdio.h>
using namespace std;
class Sock{
public:
static int Socket()
{
int sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0)
{
cerr<<"socket error"<<endl;
exit(2);
}
return sock;
}
static void Bind(int sock,int port)
{
struct sockaddr_in local;
bzero(&local,sizeof(local));
local.sin_port=htons(port);
local.sin_family=AF_INET;
local.sin_addr.s_addr=htonl(INADDR_ANY);
if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
cerr<<"bind error"<<endl;
}
}
static void Listen(int sock)
{
if(listen(sock,5)<0)
{
cerr<<"listen error"<<endl;
exit(4);
}
}
static int Accept(int sock)
{
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
int fd=accept(sock,(struct sockaddr*)&peer,&len);
if(fd<0)
{
cerr<<"accept error"<<endl;
}
return fd;
}
static void Setsockopt(int listen_sock)
{
int opt=1;
setsockopt(listen_sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
}
};
SelectServer.h:
#pragma once
#include"Sock.h"
#define NUM (sizeof(fd_set)*8)
#define DFL_FD -1
class SelectServer{
private:
int listen_sock;
int port;
int fd_array[NUM];
public:
SelectServer(int _p=8080):port(_p)
{}
void InitServer()
{
for(int i=0;i<NUM;i++)
{
fd_array[i]=DFL_FD;
}
listen_sock=Sock::Socket();
Sock::Setsockopt(listen_sock);
Sock::Bind(listen_sock,port);
Sock::Listen(listen_sock);
fd_array[0]=listen_sock;
}
void AddFdToArray(int sock)
{
int i=0;
for(;i<NUM;i++)
{
if(fd_array[i]==DFL_FD)
{
break;
}
}
if(i>=NUM)
{
close(sock);
cerr<<"fd_array is full,close sock"<<endl;
}
else
{
fd_array[i]=sock;
cout<<"fd:"<<sock<<" add to select..."<<endl;
}
}
void service(int sock,int i)
{
char buff[655370];
ssize_t size=recv (sock,buff,sizeof(buff)-1,0);
if(size>0)
{
buff[size]='\0';
cout<<"client:"<<buff<<endl;
send(sock,buff,strlen(buff),0);
}
else if(size==0)
{
cout<<"client quit..."<<endl;
close(sock);
fd_array[i]=DFL_FD;
}
else
{
cout<<"client error..."<<endl;
close(sock);
fd_array[i]=DFL_FD;
}
}
void HandlerEvents(fd_set* read_fds)
{
for(int i=0;i<NUM;i++)
{
if(fd_array[i]==DFL_FD)
{
continue;
}
if(FD_ISSET(fd_array[i],read_fds))
{
if(fd_array[i]==listen_sock)
{
int sock=Sock::Accept(listen_sock);
if(sock>=0)
{
cout<<"get a new link..."<<endl;
AddFdToArray(sock);
}
}
else
{
service(fd_array[i],i);
}
}
}
}
void Start()
{
int max_fd=DFL_FD;
while (1)
{
fd_set read_fds;
FD_ZERO(&read_fds);
for(int i=0;i<NUM;i++)
{
if(fd_array[i]!=DFL_FD)
{
cout<<fd_array[i]<<" ";
FD_SET(fd_array[i],&read_fds);
if(max_fd<fd_array[i])
{
max_fd=fd_array[i];
}
}
}
cout<<endl;
struct timeval time_out={0,0};
switch(select(max_fd+1,&read_fds,NULL,NULL,NULL))
{
case 0:
cout<<"time_out..."<<endl;
break;
case -1:
perror("select error!\n");
break;
default:
HandlerEvents(&read_fds);
break;
}
}
}
};
Server.cpp:
#include"SelectServer.h"
void Usage(string proc)
{
cout<<"Usage:\n\t"<<proc<<" port"<<endl;
}
int main(int argc,char* argv[])
{
if(argc!=2)
{
Usage(argv[0]);
exit(1);
}
SelectServer* ssvr=new SelectServer(atoi(argv[1]));
ssvr->InitServer();
ssvr->Start();
return 0;
}
3.2. poll模型
poll解决了select的两个问题:
- 解决了select检测文件上限的问题。
- 将用户传给OS的需要检测的文件描述符与OS传给用户的就绪文件描述符的两个行为进行分离,所以不需要每次调用前重复添加检测的文件描述符。
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds:需要检测的文件描述符
nfds:结构体数组长度
timeout:轮询方式,与select一致
timeout =O非阻塞监控
timeout =-1阻塞监控
timeout>0超时时间为多少,单位是毫秒(ms)
返回值
小于0, 表示出错;
等于0, 表示poll函数等待超时;
大于0, 表示poll由于监听的文件描述符就绪而返回.
struct pollfd {
int fd;
short events;
short revents;
};
events和revents的取值:
POLLIN:数据可读
POLLOUT:数据可写
#include<iostream>
#include<poll.h>
#include<stdlib.h>
#include<unistd.h>
using namespace std;
int main()
{
struct pollfd arr[10];
arr[0].fd=0;
arr[0].events=POLLIN;
while(1)
{
int ret=poll(arr,1,2000);
if(ret==0)
{
cout<<"time out"<<endl;
continue;
}
else if(ret < 0)
{
cerr<<"poll error"<<endl;
exit(0);
}
else
{
char buff[100];
for(int i=0;i<ret;i++)
{
if(arr[i].events&POLLIN)
{
int size=read(arr[i].fd,buff,sizeof(buff)-1);
buff[size-1]=0;
cout<<"echo:"<<buff<<endl;
}
}
}
}
return 0;
}
poll优缺点
特点:
-
poll和select相比,跨平台移植性不如select,与epoll相比,监控效率不如epoll。 -
相较于select改进的点: a.不限制文件描述符的个数了,由用户自己定义结构体数组的数量。 b.相较于select之前的事件集合的方式,改进成为事件结构。事件结构告诉我们,关心的文件描述符是什么,关心的文件描述符发生事件是什么。
优点:
- pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便,简化了代码的编写。
- 不限制文件描述符的个数(但是数量过大后性能也是会下降)。
- 不需要在二次监控的时候重新添加文件描述符,因为输入型参数和输出型参数分开了。
缺点:
- 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符,随着文件描述符增多,性能下降。
- 不支持跨平台。
- 每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中,就绪的文件描述符也需要从内核之中拷贝至用户,当poll面临的链接很多时,会因为拷贝数据而降低效率。
- 同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降。
简易的poll单进程服务器
poll单进程服务器采用下面的实现方式:
- 创建一个struct pollfd数组,用来存储已经打开的套接字,监听套接字放在下标为0的位置。
- 当poll返回值大于0时,表示有可读文件描述符就绪了。
- 如果就绪的文件描述符为监听套接字,说明有新的connect请求,此时要accept建立连接,然后将新的套接字放入struct pollfd数组中。
- 如果就绪的文件描述符为连接的套接字,则用该文件描述符进行通信,通信完毕之后,将该文件描述符从struct pollfd数组中去除并close关闭文件描述符。(如果close关闭掉文件描述符而不从struct pollfd数组中移除,就会导致poll出错,因为poll在监听一个不存在的文件)。
PollServer.h:
#include"Sock.h"
#include<poll.h>
#define NUM 64
struct pollfd read_fds[NUM];
class PollServer{
private:
int listen_sock;
int port;
public:
PollServer(int _p=8080):port(_p)
{}
void InitServer()
{
listen_sock=Sock::Socket();
Sock::Setsockopt(listen_sock);
Sock::Bind(listen_sock,port);
Sock::Listen(listen_sock);
}
void AddFdToArray(int sock)
{
int i=0;
for(;i<NUM;i++)
{
if(read_fds[i].fd==-1)
{
break;
}
}
if(i>=NUM)
{
close(sock);
cerr<<"fd_array is full,close sock"<<endl;
}
else
{
read_fds[i].fd=sock;
read_fds[i].events|=POLLIN;
read_fds[i].revents=0;
cout<<"fd:"<<sock<<" add to poll..."<<endl;
}
}
void service(int sock,int i)
{
char buff[655370];
ssize_t size=recv (sock,buff,sizeof(buff)-1,0);
if(size>0)
{
buff[size]='\0';
cout<<"client:"<<buff<<endl;
send(sock,buff,strlen(buff),0);
}
else if(size==0)
{
cout<<"client quit..."<<endl;
close(sock);
read_fds[i].fd=-1;
read_fds[i].events=0;
read_fds[i].revents=0;
}
else
{
cout<<"client error..."<<endl;
close(sock);
read_fds[i].fd=-1;
read_fds[i].events=0;
read_fds[i].revents=0;
}
}
void HandlerEvents()
{
for(int i=0;i<NUM;i++)
{
if(read_fds[i].fd==-1)
{
continue;
}
if(read_fds[i].revents&POLLIN)
{
if(read_fds[i].fd==listen_sock)
{
int sock=Sock::Accept(listen_sock);
if(sock>=0)
{
cout<<"get a new link..."<<endl;
AddFdToArray(sock);
cout<<sock<<" "<<endl;
}
}
else
{
service(read_fds[i].fd,i);
}
}
}
}
void Start()
{
for(int i=0;i<NUM;i++)
{
read_fds[i].fd=-1;
read_fds[i].events=0;
read_fds[i].revents=0;
}
read_fds[0].fd=listen_sock;
read_fds[0].events|=POLLIN;
read_fds[0].revents=0;
while (1)
{
switch(poll(read_fds,NUM,-1))
{
case 0:
cout<<"time_out..."<<endl;
break;
case -1:
perror("poll error!\n");
break;
default:
HandlerEvents();
break;
}
}
}
};
3.4. epoll模型
epoll是目前公认的在linux操作系统下,监控性能最高的,是为了处理大批量句柄而作了改进的poll。 它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是先注册要监听的事件类型。
epoll的使用过程如下:
- 调用epoll_create创建一个epoll句柄。
- 调用epoll_ctl, 将要监控的文件描述符进行注册。
- 调用epoll_wait, 等待文件描述符就绪。
#include <sys/epoll.h>
int epoll_create(int size)
size:
本来含义是定义epoll最大能监控的文件描述符的个数。
内核2.6.8之后,弃用了。现在采用动态内存开辟的方式来进行扩容。size的值大于0。
返回值:
返回epoll操作句柄,本质是文件描述符
内核观点:在内核当中就是创建了一个struct eventpoll结构体,
在这个结构体之中,有两个重要的变量(红黑树、双向链表),
epoll的操作句柄其实就是用来找到struct eventpoll结构体,从而对结构体当中的变量进行操控。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epfd:epoll操作句柄
op:表示动作,用三个宏来表示:
EPOLL_CTL_ADD :注册新的fd到epfd中
EPOLL_CTL_MOD :修改已经注册的fd的监听事件
EPOLL_CTL_DEL :从epfd中删除一个fd
fd:需要监听的fd
event:告诉内核需要监听什么事
struct epoll_event {
用户对文件描述符所关心的事件
EPOLLIN:可读事件
EPOLLOUT:可写事件
uint32_t events;
epoll_data_t data;
};
typedef union epoll_data
{
void*ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
epfd: epoll操作句柄
events:事件结构数组,输出型参数,内核返回就绪的事件结构,一个文件描述符对应一个事件结构
maxevents:告之内核这个events有多大,
这个 maxevents的值不能大于创建epoll_create()时的size
timeout:
大于0︰超时时间
等于0:非阻塞
小于0:阻塞
返回值:
大于0:返回就绪的文件描述符个数
等于0:等待超时
小于0:监控出错
epoll的工作原理
- 当调用epoll_create函数时,会在内核创建一个eventpoll结构体,在该结构体中有一个rdlist成员和rbr成员,它两分别是一个双向链表和红黑树。
- 对于每一个事件,epoll都会建立一个epitem结构体。
- 而调用epoll_ctl函数添加、修改、删除文件描述符对应的事件集合其实是对红黑树中的节点进行相应的添加、修改、删除操作。
- 所有添加到epoll的事件都会与设备(网卡)驱动程序建立回调关系。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
- 当文件描述符准备就绪后,内核通过回调函数ep_poll_callback,将准备就绪的事件集合添加到rdlist双向链表中。
- 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。
- 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。这个操作的时间复杂度是O(1)。
epoll的优点
和 select 的缺点对应,这也是epoll高效的原因。
- 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效。不需要每次2循环都设置关注的文件描述符, 也做到了输入输出参数分离开。
- 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝)。
- 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪。这个操作时间复杂度O(1)。即使文件描述符数目很多, 效率也不会受到影响。
- 没有数量限制: 文件描述符数目无上限。
epoll单进程服务器
构建流程:
- epoll服务器初始化:创建epoll模型。
- epoll服务器开始运行:将监听套接字添加进去。
- 进行等待,如果监听套接字准备就绪,就获取链接套接字,并且以读的方式添加至epoll模型之中,事件中给每个文件描述符开辟一块指向的空间,用来接收多次请求。
- 如果链接套接字准备就绪,判断事件,再进行操作。
- 如果链接套接字为可读事件,则进行读取,开辟的空间读满后,修改对应的事件为可写事件。
- 写完之后,断开连接。
#include"Sock.h"
#include <sys/epoll.h>
#define SIZE 64
struct bucket{
char buffer[25];
int fd;
int pos;
bucket(int sock):fd(sock),pos(0)
{
memset(buffer,0,sizeof(buffer));
}
};
class EpollServer{
private:
int listen_sock;
int port;
int e_pfd;
public:
EpollServer(int _p=8080):port(_p)
{}
void InitServer()
{
listen_sock=Sock::Socket();
Sock::Setsockopt(listen_sock);
Sock::Bind(listen_sock,port);
Sock::Listen(listen_sock);
e_pfd=epoll_create(256);
if(e_pfd<0)
{
cerr<<"epoll_create error"<<endl;
exit(5);
}
}
void service(bucket* bk)
{
ssize_t s=recv(bk->fd,bk->buffer+bk->pos,sizeof(bk->buffer),0);
if(s>0)
{
bk->pos+=s;
cout<<"client# "<<bk->buffer<<endl;
if(bk->pos>=sizeof(bk->buffer))
{
bk->pos=0;
struct epoll_event temp;
temp.events=EPOLLOUT;
temp.data.ptr=bk;
epoll_ctl(e_pfd,EPOLL_CTL_MOD,bk->fd,&temp);
}
}
else if(s==0)
{
cout<<"client quit..."<<endl;
close(bk->fd);
epoll_ctl(e_pfd,EPOLL_CTL_DEL,bk->fd,NULL);
delete bk;
}
else
{
cout<<"client error..."<<endl;
close(bk->fd);
epoll_ctl(e_pfd,EPOLL_CTL_DEL,bk->fd,NULL);
delete bk;
}
}
void HandlerEvents(struct epoll_event revs[],int num)
{
for(int i=0;i<num;i++)
{
uint32_t ev=revs[i].events;
if(ev&EPOLLIN)
{
if(revs[i].data.ptr!=NULL)
{
bucket* bk=(bucket*)revs[i].data.ptr;
service(bk);
}
else
{
cout<<"get a new link..."<<endl;
int sock=Sock::Accept(listen_sock);
if(sock>0)
{
AddEventToEpoll(sock,EPOLLIN);
}
}
}
else if(ev&EPOLLOUT)
{
bucket* bk=(bucket*)revs[i].data.ptr;
size_t size=send(bk->fd,bk->buffer+bk->pos,sizeof(bk->buffer)-bk->pos,0);
bk->pos+=size;
if(bk->pos >= sizeof(bk->buffer))
{
cout<<"发送成功"<<endl;
close(bk->fd);
epoll_ctl(e_pfd,EPOLL_CTL_DEL,bk->fd,NULL);
delete bk;
}
}
}
}
void AddEventToEpoll(int sock,uint32_t event)
{
struct epoll_event ev;
ev.events=event;
if(sock==listen_sock)
{
ev.data.ptr=NULL;
}
else
{
ev.data.ptr=new bucket(sock);
}
epoll_ctl(e_pfd,EPOLL_CTL_ADD,sock,&ev);
}
void Start()
{
AddEventToEpoll(listen_sock,EPOLLIN);
int time_out=-1;
struct epoll_event revs[SIZE];
while (1)
{
int num=epoll_wait(e_pfd,revs,SIZE,time_out);
switch (num)
{
case 0:
cout<<"time out"<<endl;
break;
case -1:
cerr<<"epoll wait error"<<endl;
break;
default:
HandlerEvents(revs,num);
break;
}
}
}
~EpollServer()
{
close(listen_sock);
close(e_pfd);
}
};
poll对文件描述符就绪事件的触发方式
epoll有2种工作方式-水平触发(LT)和边缘触发(ET),默认是LT的。
LT与ET的差别在于就绪事件通知机制:
- LT:只要底层有数据就会一直通知上层读取数据。select、poll、epoll默认都是LT。
- ET:当底层的数据从无到有,从有到多变化时才会通知上层一次。
水平触发EPOLLLT(LT模式) 可读事件: 只要接收缓冲区当中的数据大于低水位标记(1字节),就会一直触发可读事件,直到接收缓冲区当中没有数据可读(接收缓冲区当中的数据低于低水位标记)。
可写事件: 只要发送缓冲区当中的空间大于低水位标记(1字节),就会一直触发可写事件,直到发送缓冲区当中没有空间可写(发送缓冲区当中的空间低于低水位标记)。
边缘触发EPOLLET(ET模式) epoll可以通过设置还实现ET模式。
设置文件描述符对应的事件结构的时候,只需要在事件结构当中的事件变量中按位或上EPOLLET即可:
struct epoll_event et;
ev.events = EPOLLIN|EPOLLET;
可读事件: 只有当新的数据到来的时候,才会触发可读,否则通知一次之后,就不再通知了,每次到来一个新的数据,只会通知一次,如果应用程序没有将接收缓冲区的数据读完(没有读完的数据留在缓冲区之中,下次触发就从这里开始),也不会再次通知,直到新的数据到来,才会触发可读事件,因此需要尽量将数据读完。
可写事件: 只有当发送缓冲区之中剩余空间从不可写变成可写的时候,才会触发一次可写事件就绪。
对于ET模式而言,如果就绪事件产生,一定要把握好机会,对于可读事件,将数据读完,对于可写事件,将数据写完。
ET模式结合了循环将数据进行读取和发送,不是频繁的进行通知,因此效率就比较高。
构建ET细节注意点:
-
如何判断数据读完了 : 设置size为期望读取的字节、ret为实际读取的字节,ret<size表示缓冲区之中一定没有数据了,此时说明数据已经都被读完了。ret==size,此时有可能有数据,也有可能没有数据,都需要再次进行读取,再次读取有可能会进入阻塞,因此需要将fd更改为非阻塞状态。 -
将数据发送出去: 同样需要构建循环进行发送,当缓冲区没有容量的时候,就循环发送,直至缓冲区有容量。 -
将描述符设置为非阻塞接口介绍:
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, … );
fd:要设置的文件描述符
cmd:操作方式:
F_GETFL获取当前文件描述符的属性
F_SETFL将非阻塞属性设置到文件描述符的属性当中(O_NONBLOCK)
- 代码
1). 对监听套接字设置为非阻塞,循环accept,直到accept失败,这样可以处理一批链接。 2). 对链接的套接字进行监控时,采用ET模式,ev.events = EPOLLIN|EPOLLET; 。 3). 由于是ET模式,应该通知的时候就需要将所有的数据读取或者发送完毕,因此需要采用循环来处理。 4). recv和send循环处理时,如果是阻塞的发送,那么最后一次处理会陷入阻塞的状态,因此需要将fd改为非阻塞的状态。 - epoll工作方式的对比
1).LT是 epoll 的默认行为。 使用 ET 能够减少 epoll 触发的次数.。但是代价就是一次处理完数据。 2).相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些.但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的。 3).同时ET 的代码复杂程度也更高。
|