I/O 复用使得程序能同时监听多个文件描述符,这对于提高程序的性能至关重要。通常, 网络程序在下列情况下需要使用 I/O 复用技术:
? TCP 服务器同时要处理监听套接字和连接套接字。
? 服务器要同时处理 TCP 请求和 UDP 请求。
? 程序要同时处理多个套接字。
? 客户端程序要同时处理用户输入和网络连接。
? 服务器要同时监听多个端口。
需要指出的是,I/O 复用虽然能同时监听多个文件描述符,但它本身是阻塞的。并且当多个文件描述符同时就绪时,如果不采取额外的措施,程序就只能按顺序依处理其中的每一个文件描述符,这使得服务器看起来好像是串行工作的。如果要提高并发处理的能力,可以配合使用多线程或多进程等编程方法。
select
1.1 select 的接口介绍
select 系统调用的用途是:在一段指定时间内,监听用户感兴趣的文件描述符的可读、可写和异常等事件。
#include <sys/select.h>
int select(int maxfd, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct ti
meval *timeout);
socket可读的情况:
?socket内核接受缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞的读取该socket,并且读操作返回的字节数大于0。
?socket通信的对方关闭链接。此时对该socket的读操作将返回0。
?监听socket上有新的连接请求。
?socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
socket可写的情况:
?socket内核发送缓存区中的可用字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞的写该socket,并且写操作返回的字节数大于0。
?socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
?socket使用非阻塞connect连接成功或失败(超时)之后。
?socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
编码示例
服务端:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/select.h>
#define MAX_FD 128
#define DATALEN 1024
int InitSocket()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1) return -1;
struct sockaddr_in saddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(6000);
saddr.sin_addr.s_addr = inet_addr("127.0.0.1");
int res = bind(sockfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(res == -1) return -1;
res = listen(sockfd, 5);
if(res == -1) return -1;
return sockfd;
}
void InitFds(int fds[], int n)
{
int i = 0;
for (; i < n; ++i)
{
fds[i] = -1;
}
}
void AddFdToFds(int fds[], int fd, int n)
{
int i = 0;
for (; i < n; ++i)
{
if (fds[i] == -1)
{
fds[i] = fd;
break;
}
}
}
void DelFdFromFds(int fds[], int fd, int n)
{
int i = 0;
for (; i < n; ++i)
{
if (fds[i] == fd)
{
fds[i] = -1;
break;
}
}
}
描述符值
int SetFdToFdset(fd_set *fdset, int fds[], int n)
{
FD_ZERO(fdset);
int i = 0, maxfd = fds[0];
for (; i < n; ++i)
{
if (fds[i] != -1)
{
FD_SET(fds[i], fdset);
if (fds[i] > maxfd)
{
maxfd = fds[i];
}
}
}
return maxfd;
}
void GetClientLink(int sockfd, int fds[], int n)
{
struct sockaddr_in caddr;
memset(&caddr, 0, sizeof(caddr));
socklen_t len = sizeof(caddr);
int c = accept(sockfd, (struct sockaddr*)&caddr, &len);
if (c < 0)
{
return;
}
printf("A client connection was successful\n");
AddFdToFds(fds, c, n);
}
void DealClientData(int fds[], int n, int clifd)
{
char data[DATALEN] = { 0 };
int num = recv(clifd, data, DATALEN - 1, 0);
if (num <= 0)
{
DelFdFromFds(fds, clifd, n);
close(clifd);
printf("A client disconnected\n");
}
else
{
printf("%d: %s\n", clifd, data);
send(clifd, "OK", 2, 0);
}
}
void DealReadyEvent(int fds[], int n, fd_set *fdset, int sockfd)
{
int i = 0;
for (; i < n; ++i)
{
if (fds[i] != -1 && FD_ISSET(fds[i], fdset))
{
if (fds[i] == sockfd)
{
GetClientLink(sockfd, fds, n);
}
else
{
DealClientData(fds, n, fds[i]);
}
}
}
}
int main()
{
int sockfd = InitSocket();
assert(sockfd != -1);
fd_set readfds;
int fds[MAX_FD];
InitFds(fds, MAX_FD);
AddFdToFds(fds, sockfd, MAX_FD);
while ( 1 )
{
int maxfd = SetFdToFdset(&readfds, fds, MAX_FD);
struct timeval timeout;
timeout.tv_sec = 2;
timeout.tv_usec = 0;
int n = select(maxfd + 1, &readfds, NULL, NULL, &timeout);
if (n < 0)
{
printf("select error\n");
break;
}
else if (n == 0)
{
printf("time out\n");
continue;
}
DealReadyEvent(fds,MAX_FD,&readfds,sockfd);
}
exit(0);
}
客户端:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/select.h>
#define MAX_FD 128
#define DATALEN 1024
int InitSocket()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1) return -1;
struct sockaddr_in saddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(6000);
saddr.sin_addr.s_addr = inet_addr("127.0.0.1");
int res = connect(sockfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(res == -1) return -1;
return sockfd;
}
int main()
{
int sockfd = InitSocket();
assert(sockfd != -1);
while ( 1 )
{
printf("please input: ");
char buff[128] = { 0 };
fgets(buff, 127, stdin);
if (strncmp(buff, "bye", 3) == 0)
{
break;
}
int n = send(sockfd, buff, strlen(buff) - 1, 0);
if (n <= 0)
{
printf("send error\n");
break;
}
memset(buff, 0, 128);
n = recv(sockfd, buff, 127, 0);
if (n <= 0)
{
printf("recv error\n");
break;
}
printf("%s\n", buff);
}
close(sockfd);
exit(0);
}
poll
poll 系统调用和 select 类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
事件 | 描述 | 是否可作****为输入 | 是否可作****为输出 |
---|
POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 | POLLRDNORM | 普通数据可读 | 是 | 是 | POLLRDBAND | 优先级带数据可读 | 是 | 是 | POLLPRI | 高优先级数据可读,比如 TCP 带外数据 | 是 | 是 | POLLOUT | 数据(包括普通数据和优先数据)可写 | 是 | 是 | POLLWRNORM | 普通数据可写 | 是 | 是 | POLLWRBAND | 优先级带数据可写 | 是 | 是 | POLLRDHUP | TCP 连接被对方关闭,或者对方关闭了写操****作,它由 GNU 引入,使用时,需要在代码开****始处定义_GNU_SOURCE | 是 | 是 | EPOLLERR | 错误 | 否 | 是 | POLLHUP | 挂起。比如管道的写端关闭后,读端描述符上****将收到POLLHUP 事件 | 否 | 是 | POLLNVAL | 文件描述符没有打开 | 否 | 是 |
服务器端:
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <poll.h>
#define DATALEN 1024
#define MAX_FD 128
int InitSocket()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1) return -1;
struct sockaddr_in saddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(6000);
saddr.sin_addr.s_addr = inet_addr("127.0.0.1");
int res = bind(sockfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(res == -1) return -1;
res = listen(sockfd, 5);
if(res == -1) return -1;
return sockfd;
}
void InitPollFds(struct pollfd fds[])
{
int i = 0;
for (; i < MAX_FD; ++i)
{
fds[i].fd = -1;
}
}
void InsertFdToPollfds(struct pollfd fds[], int fd, short events)
{
int i = 0;
for (; i < MAX_FD; ++i)
{
if (fds[i].fd == -1)
{
fds[i].fd = fd;
fds[i].events = events;
break;
}
}
}
void GetClinetLink(struct pollfd fds[], int sockfd)
{
struct sockaddr_in caddr;
memset(&caddr, 0, sizeof(caddr));
socklen_t len = sizeof(caddr);
int c = accept(sockfd, (struct sockaddr*)&caddr, &len);
if (c < 0)
{
return;
}
printf("A client connection was successful\n");
InsertFdToPollfds(fds, c, POLLIN | POLLRDHUP);
}
int DealClientData(int clifd)
{
char data[DATALEN] = { 0 };
int n = recv(clifd, data, DATALEN - 1, 0);
if(n <= 0)
{
return -1;
}
printf("%d: %s\n", clifd, data);
send(clifd, "OK", 2, 0);
return 0;
}
void DealReadyEvent(struct pollfd fds[], int sockfd)
{
int i = 0;
for (; i < MAX_FD; ++i)
{
if (fds[i].fd == -1)
{
continue;
}
else if (fds[i].revents & POLLIN)
{
if (fds[i].fd == sockfd)
{
GetClinetLink(fds, sockfd);
}
else
{
if(-1 == DealClientData(fds[i].fd))
{
close(fds[i].fd);
fds[i].fd = -1;
printf("A client disconnected\n");
}
}
}
else
{
printf("error\n");
}
}
}
int main()
{
int sockfd = InitSocket();
assert(sockfd != -1);
struct pollfd fds[MAX_FD];
InitPollFds(fds);
InsertFdToPollfds(fds, sockfd, POLLIN);
while (1)
{
int n = poll(fds, MAX_FD, 2000);
if (n < 0)
{
printf("poll error\n");
continue;
}
else if (n == 0)
{
printf("timeout\n");
continue;
}
else
{
DealReadyEvent(fds, sockfd);
}
}
exit(0);
}
epoll
epoll 是 Linux 特有的 I/O 复用函数。它在实现和使用上与 select、poll 有很大差异。首先,epoll 使用一组函数来完成任务,而不是单个函数。其次,epoll 把用户关心的文件描述符上的事件放在内核里的一个事件表中。从而无需像 select 和 poll 那样每次调用都要重复传入文件描述符或事件集。但 epoll 需要使用一个额外的文件描述符,来唯一标识内核中的这个事件表。
? epoll_create()用于创建内核事件表
? epoll_ctl()用于操作内核事件表
?epoll_wait()用于在一段超时时间内等待一组文件描述符上的事件
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
LT&ET
epoll 对文件描述符有两种操作模式:LT(Level Trigger,电平触发)模式和 ET(Edge Trigger,边沿触发)模式。LT 模式是默认的工作模式。当往epoll 内核事件表中注册一个文件描述符上的EPOLLET 事件时,epoll 将以高效的ET 模式来操作该文件描述符。
对于 LT 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用 epoll_wait 时,还会再次向应用程序通告此事件,直到该事件被处理。
对于 ET 模式操作的文件描述符,当 epoll_wait 检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的 epoll_wait 调用将不再向应用程序通知这一事件。所以 ET 模式在很大程度上降低了同一个 epoll 事件被重复触发的次数,因此效率比 LT 模式高。
服务端:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#define MAX_FD 128
#define DATALEN 1024
#define EPOLLSIZE 5
#define LT 0
#define ET 1
int InitSocket()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1) return -1;
struct sockaddr_in saddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_port = htons(6000);
saddr.sin_addr.s_addr = inet_addr("127.0.0.1");
int res = bind(sockfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(res == -1) return -1;
res = listen(sockfd, 5);
if(res == -1) return -1;
return sockfd;
}
void SetNoWait(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
}
void CloseClient(int epfd, int fd)
{
close(fd);
printf("A Client disconnected\n");
if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL) == -1)
{
printf("epoll_ctl del error\n");
}
}
void GetClientLink(int sockfd, int epfd, int flag)
{
struct sockaddr_in caddr;
socklen_t len = sizeof(caddr);
int c = accept(sockfd, (struct sockaddr*)&caddr, &len);
if (c < 0)
{
printf("Client Link error\n");
return;
}
struct epoll_event ev;
ev.data.fd = c;
if (flag)
{
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
SetNoWait(c);
}
else
{
ev.events = EPOLLIN | EPOLLRDHUP;
}
if (epoll_ctl(epfd, EPOLL_CTL_ADD, c, &ev) == -1)
{
printf("epoll_ctl add error\n");
}
}
void LTDealClientData(int epfd, int fd)
{
char buff[DATALEN] = { 0 };
int n = recv(fd, buff, DATALEN - 1, 0);
if (n <= 0)
{
CloseClient(epfd, fd);
return;
}
printf("%d: %s\n", fd, buff);
send(fd, "OK", 2, 0);
}
void ETDealClientData(int epfd, int fd)
{
while (1)
{
char buff[DATALEN] = { 0 };
int n = recv(fd, buff, DATALEN - 1, 0);
if (n == -1)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
printf("read later\n");
break;
}
else
{
CloseClient(epfd, fd);
break;
}
}
else if (n == 0)
{
CloseClient(epfd, fd);
break;
}
else
{
printf("%d: %s\n", fd, buff);
send(fd, "OK", 2, 0);
}
}
}
void DealReadyEvent(struct epoll_event *events,int n,int sockfd,int epfd)
{
int i = 0;
for (; i < n; ++i)
{
int fd = events[i].data.fd;
if (fd == sockfd)
{
GetClientLink(sockfd, epfd, LT);
}
else if (events[i].events & EPOLLRDHUP)
{
CloseClient(epfd, fd);
}
else if (events[i].events & EPOLLIN)
{
LTDealClientData(epfd, fd);
}
else
{
printf("error\n");
}
}
}
int main()
{
int sockfd = InitSocket();
assert(sockfd != -1);
int epfd = epoll_create(EPOLLSIZE);
assert(epfd != -1);
struct epoll_event ev;
ev.data.fd = sockfd;
ev.events = EPOLLIN;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev) == -1)
{
printf("epoll_ctl add error\n");
exit(0);
}
while (1)
{
struct epoll_event events[MAX_FD];
int n = epoll_wait(epfd, events, MAX_FD, 2000);
if (n < 0)
{
printf("epoll_wait error\n");
continue;
}
else if (n == 0)
{
printf("timeout\n");
continue;
}
else
{
DealReadyEvent(events, n, sockfd, epfd);
}
}
}
|