epoll 是
为处理大批量句柄而作了改进的
poll
它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)
它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法
epoll接口
epoll提供了三个接口,把之前select和poll的不足,把用户告诉OS需要检测的文件描述符和
epoll_create()
创建一个epoll模型
#include <sys/epoll.h>
int epoll_create(int size);
参数:
- size:自Linux2.6.8以来,size参数被忽略,但必须大于零;
返回值:
int类型,返回的是一个文件描述符,也就是一个epoll模型的操作句柄
用完之后, 必须调用close()关闭
调用epoll_create(),OS会在内核创建对应的数据结构,epoll模型
epoll_ctl()
epoll的事件注册函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数:
epoll_event结构
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events;
epoll_data_t data;
};
events可以设置具体的事件,events|=对应事件的宏值 就能进行操作
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭)
- EPOLLOUT : 表示对应的文件描述符可以写
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来)
- EPOLLERR : 表示对应的文件描述符发生错误
- EPOLLHUP : 表示对应的文件描述符被挂断
- EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的
- EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里
返回值:
成功返回0
失败返回-1
epoll_wait()
收集在epoll监控的事件中已经就绪的事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
参数:
- epfd:epoll_create时返回的epoll模型的操作句柄
- events:在进行epoll_wait轮询收集在epoll监控的事件中已经就绪的事件时,可以创建一个epoll_event数组用来保存从就绪队列上获取到的epoll_event节点,events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存
- maxevents:告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size
- timeout:超时时间 (单位ms,0会立即返回,-1是永久阻塞)
返回值:
- 成功,返回对应I/O上已就绪的文件描述符数目
- 返回0表示已超时
- 返回小于0表示出错
epoll工作原理
- 当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关
struct eventpoll {
....
struct rb_root rbr;
struct list_head rdlist;
....
};
- 每一个
epoll对象 都有一个独立的eventpoll结构体 ,用于存放通过epoll_ctl 方法向epoll对象中添加进来的事件 - 这些事件都会
挂载在红黑树中 ,如此,重复添加的事件就可以通过红黑树而高效的识别出来 - 而所有
添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系 ,也就是说,当响应的事件发生时会调用这个回调方法 - 这个
回调方法在内核中叫ep_poll_callback ,它会将发生的事件添加到rdlist双链表中 - 在
epoll 中,对于每一个事件 ,都会建立一个epitem结构体
struct epitem{
struct rb_node rbn;
struct list_head rdllink;
struct epoll_filefd ffd;
struct eventpoll *ep;
struct epoll_event event;
}
- 当
调用epoll_wait检查是否有事件发生 时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可 - 如果
rdlist不为空 ,则把发生的事件复制到用户态 ,同时将事件数量返回给用户 . 这个操作的时间复杂度是O(1)
在用户调用epoll_create方法时,OS会在内核中创建出三个数据结构
- 红黑树:用于
保存用户epoll_ctl传进来的需要系统监听的文件描述符和对应事件的映射关系 ,每次epoll_ctl ,都是对这个红黑树的节点进行操作,增加,删除,修改 。如此,重复添加的事件就可以通过红黑树而高效的识别出来。 - 注册回调方法:每一个
添加到红黑树中的事件 ,都会与设备(网卡)驱动程序建立回调关系 ,也就是说,当监听的就绪事件发生时会调用这个回调方法 - 就绪队列:当
OS检测到有事件就绪 后,操作系统就会与红黑树中的映射关系进行匹配 ,如果就绪事件是用户要监听的事件 ,OS就会调用注册的回调方法 ,把对应的事件就绪信息和文件描述符信息添加到就绪队列中 ,这样用户就不用自己遍历所有监听的文件描述符和对应的事件信息 ,而是直接从就绪队列中读取节点信息 ,而就绪队列中的节点信息都是已经就绪事件信息 ,这样就会遍历无用的信息,提高了效率
这几个内核中创建数据结构 ,统称为epoll模型
epoll的使用过程
- 调用
epoll_create 创建一个epoll句柄 - 调用
epoll_ctl , 将要监控的文件描述符进行注册 - 调用
epoll_wait , 等待文件描述符就绪
epoll的优点(和 select 的缺点对应)
- 没有数量限制: 文件描述符数量无上限
- 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
- 数据拷贝轻量: 只在合适的时候调用
EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll都是每次循环都要进行拷贝) - 事件回调机制: 避免使用遍历, 而是
使用回调函数 的方式, 将就绪的文件描述符结构加入到就绪队列 中,epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪. 这个操作时间复杂度O(1). 即使文件描述符数目很多, 效率也不会受到影响
epoll工作方式
LT模式
水平触发(Level Triggered )
事件就绪后,只要数据没有被处理 ,或者数据没有被处理完 ,epoll_wait 在下次调用时依然会通知事件就绪
epoll默认状态下就是LT工作模式
ET模式
边缘触发(Edge Triggered )
事件就绪后,事件就要立即被处理,如果数据没有处理,或者数据没有处理完 ,epoll_wait 在下次调用时就不再通知这个事件就绪了,如要处理剩余数据,就得等到事件下次就绪才会通知
- 如果监听一个普通套接字读事件,读时间就绪,
epoll_wait只会通知一次 ,这时读数据,如果没有把缓冲区中的数据读完,那么下次轮询就不会再通知,也就不能再去读了,要等到下次读事件就绪才会读取缓冲区中的剩余内容 - 也就是说, ET模式下,监听的文件描述符上的事件就绪后, 只有一次处理机会
这种方式通知的次数会变少,也就意味着,ET模式下,一次响应就绪过程中就把所有的数据都处理完
这样看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到每次就绪的文件描述符都立刻处理 , 不让这个就绪被重复提示 的话, 其实性能也是一样的
ET模式的epoll要使用非阻塞读写
这个不是接口上的要求, 而是 “工程实践” 上的要求
-
原因是ET模式下,由于事件只会通知一次 ,所以在进行读写数据时,要保证一次读或者写完缓冲区中的所有数据,也就是说,读取或者发送数据时 ,需要循环发送或者读取,直到缓冲区中的数据被处理完 -
但是,如何判断缓冲区中的数据被处理完了呢,就要通过读或者写操作的返回值来看,看读或者写到的实际数据的大小是否和期望读到或者写的数据大小是否一致 ,如果实际读到或者写的数据小于期望读或写的数据,则就能证明缓冲区中的数据已经被处理完了 -
假设一个缓冲区中有3000个字节的数据,而一次recv ,期望读到的数据是1000个字节,那么在读完三次后,缓冲区中的数据已经被读完,但是,recv的返回值,也就是实际读到的数据和期望读到的数据完全吻合 ,那么此时的判断结果就是缓冲区中还有数据,那么recv就会再次读取 ,这时recv读取缓冲区,缓冲区中刚好没有了数据,此时recv就会被阻塞,发生问题
所以为了避免这种情况的发生,在ET模式下进行读写操作,要把文件描述符设为非阻塞模式
epoll的使用场景
epoll的高性能, 是有一定的特定场景 的. 如果场景选择的不适宜, epoll的性能可能适得其反
- 对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用epoll
例如, 典型的一个需要处理上万个客户端的服务器 , 例如各种互联网APP的入口服务器 , 这样的服务器就很适合epoll.如果只是系统内部, 服务器和服务器之间进行通信, 只有少数的几个连接, 这种情况下用epoll就并不合适 . 具体要根据需求和场景特点来决定使用哪种IO模型
基于epoll LT模式的TCP read服务器
#pragma once
#include "sock.hpp"
#include <sys/epoll.h>
#define BACKLOG 5
#define SIZE 256
#define TIMEOUT -1
#define MAXNUM 64
namespace ns_epoll {
class EpollServer {
private:
int port;
int epfd;
int listen_sock;
public:
EpollServer(int _port) :port(_port) {}
public:
void InitEpollServer() {
listen_sock = ns_sock::Sock::Socket();
ns_sock::Sock::Bind(listen_sock, port);
ns_sock::Sock::Listen(listen_sock, BACKLOG);
if ((epfd = epoll_create(SIZE)) < 0) {
std::cerr << "epoll_creat error!!" << std::endl;
exit(4);
}
std::cout << "server start" << std::endl;
}
void Run() {
AddEvent(listen_sock, EPOLLIN);
struct epoll_event revs[MAXNUM];
while (true) {
int num = epoll_wait(epfd, revs, MAXNUM, TIMEOUT);
if (num > 0) {
for (int i = 0; i < num; ++i) {
if (revs[i].events & EPOLLIN) {
if (revs[i].data.fd == listen_sock) {
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int sock = accept(listen_sock, (struct sockaddr*)&peer, &len);
if (sock < 0) {
std::cerr << "accept error!!!" << std::endl;
continue;
}
uint16_t peer_port = htons(peer.sin_port);
std::string peer_ip = inet_ntoa(peer.sin_addr);
std::cout << "有一个新连接 " << "ip :[" << peer_ip << "] port: [" << peer_port << "]" << std::endl;
if (!AddEvent(sock, EPOLLIN)) {
std::cerr << "Add error!!!" << std::endl;
close(sock);
}
}
else {
char buffer[1024];
ssize_t size = recv(revs[i].data.fd, buffer, sizeof(buffer) - 1, 0);
if (size > 0) {
buffer[size] = 0;
std::cout << "echo#" << buffer;
}
else if (size == 0) {
std::cout << "client close!!" << std::endl;
if (!DelEvent(revs[i].data.fd)) {
std::cerr << "del event error" << std::endl;
}
close(revs[i].data.fd);
}
}
}
else if (revs[i].events == EPOLLOUT) {
}
else {
}
}
}
else if (num == 0) {
std::cout << "timeout!!!" << std::endl;
}
else {
std::cerr << "epoll error!!!" << std::endl;
}
}
}
bool AddEvent(int sock, int event) {
struct epoll_event ev;
ev.events = 0;
ev.events |= event;
ev.data.fd = sock;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev) < 0) {
std::cerr << "epoll_ctr error fd:" << sock << std::endl;
return false;
}
return true;
}
bool DelEvent(int sock) {
if (epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr) < 0) {
std::cerr << "epoll_ctl error fd:" << sock << std::endl;
return false;
}
return true;
}
~EpollServer() {
if (listen_sock >= 0) close(listen_sock);
if (epfd >= 0) close(epfd);
}
};
}
#include "EpollServer.hpp"
#include <string>
static void Usage(std::string proc){
std::cerr << "Usage :" << "\n\t" << proc <<" port "<<std::endl;
}
int main(int argc, char* argv[]){
if(argc != 2){
Usage(argv[0]);
exit(4);
}
ns_epoll::EpollServer* epollsrv = new ns_epoll::EpollServer(atoi(argv[1]));
epollsrv->InitEpollServer();
epollsrv->Run();
}
|