一、切割I/O
我们前面的回声客户端是基于过程的,一读一写都是按照顺序来的,客户端发送数据后需要等待服务端回应,而不能继续发送数据,需要一直等,但如果切割了进程由父子进程负责读和写的职能,那就不需要阻塞在一个部分中了。
实现呢?主要由父进程代码负责接收数据的代码,子进程负责发送数据的代码即可。这也是我们使用软件聊天的常见景象,我们还在编辑数据或者发送数据,那边就来信息了。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define BUF 30
void errorHandling(char *message);
void readRoutine(int sock, char *buf);
void writeRoutine(int sock, char *buf);
int main(int argc, char *argv[]) {
pid_t pid;
int sock;
char buf[BUF];
struct sockaddr_in serv_addr;
if (argc != 3) {
printf("Usage: %s <IP> <port>\n", argv[0]);
exit(-1);
}
sock = socket(PF_INET, SOCK_STREAM, 0);
if (sock == -1)
errorHandling("socket() error!");
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
serv_addr.sin_port = htons(atoi(argv[2]));
if (connect(sock, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) == -1)
errorHandling("connect() error!");
pid = fork();
if (pid == 0)
writeRoutine(sock, buf);
else if (pid == -1)
errorHandling("fork() error!");
else
readRoutine(sock, buf);
close(sock);
return 0;
}
void writeRoutine(int sock, char *buf) {
while (1) {
fgets(buf, BUF, stdin);
if (!strcmp(buf, "q\n")||!strcmp(buf, "Q\n")) {
shutdown(sock, SHUT_WR);
return;
}
write(sock, buf, strlen(buf));
}
}
void readRoutine(int sock, char *buf) {
while(1) {
int str_len = read(sock, buf, BUF);
if (str_len == 0)
return;
buf[str_len] = 0;
printf("Mirror: %s", buf);
}
}
void errorHandling(char *message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(-1);
}
由于对分开了输入输出,所以输出有点乱,因为你随时都可以输入,然后父进程随时都可以收到服务端信息,所以干脆干掉了输入提示,至少能看一点。不过如果配合GUI编程有个输入框就会好很多。 另外上面用到了一个shutdown函数来进行输出流的关闭,这个后面再讲,需要知道的是,TCP连接后,会形成两个流:输入流和输出流,这里的shutdown是对于流的关闭而不是socket套接字的close。
这个算是多进程的一个很有意思的实现,展示了多进程的实用功能,算是我的编程路的一个重要点。接下来是并发服务器的另一种实现。
二、基于I/O复用的服务端
紧接着前面进行的linux并发服务器的实现进行优化,前面已经进行了多进程分隔服务器职能,父进程主要负责监听和连接,子进程进行数据传输和断开连接,但进程间共享信息需要用进程通信机制且多进程会显得慢,所以我们找一下其他办法实现并发服务器。
那如何使用一个进程实现对多个客户端的服务?前面有说过网络对于主机来说是另一种IO设备,外界连接请求、用户输入数据都是IO事件,对于IO事件的并发处理,那我们就对IO进行多路复用。
这里可用的一个办法就是使用select函数,该函数要求内核挂起进程,然后监听IO事件,事件发生切换控制给程序。那是什么事件?就是当你的用于读取的文件描述符准备好读取时(这里是socket套接字准备好接收数据),或者用于写操作的文件描述符准备好写时(这里是无需阻塞传输数据的socket套接字准备好传输数据),以及一个等待IO时间过长乃至socket套接字异常都是select的监控范围。
#include <sys/select.h>
#include <sys/time.h>
int select(int maxfd, fd_set *readset, fd_set *writeset, fd_set *exceptset,
const struct timeval *timeout);
select函数最主要的应用,是处理fd_set类型,它常常叫做文件描述符集合。它是long类型的位数组,当元素值为1时,该位对应的文件描述符才算是文件描述符集合中的一个元素。
插入:关于文件描述符,需要知道的是,我们socket函数返回的文件描述符永远是大于2的正整数值,因为系统规定0为标准输入的文件描述符,1代表标准输出的文件描述符,2代表标准错误的文件描述符。
对于fd_set型变量,允许分配、可以同类型赋值、修改和检查只能用以下宏来进行:
FD_SET(int fd, fd_set *fds);
FD_CLR(int fd, fd_set *fds);
FD_ZERO(fd_set *fds);
FD_ISSET(int fd, fd_set *fds);
select函数就是告知内核在timeout时间内监听readset、writeset、exceptset集合中的文件描述符的事件,超时返回0,失败返回-1,正常返回则是发生事件的文件描述符个数,同时将上面三个文件描述符集合中没有发生事件的文件描述符置0。嗯,由于这个修改,经常需要用备份fd_set来操作,每次调用都对这个备份进行更新。
上面其实漏了一个东西,就是select的第一个参数maxfd,这个参数是指上面三个文件描述符集合的前maxfd个描述符就是需要监控的文件描述符,这点很重要,因为我后面的实例也只是涉及单方面读,并不是三个集合都有值,所以不需要进行取舍,当三个集合都需要进行监视,maxfd的值就显得尤其重要,但这里先不表,就提一下。
struct timeval {
long tv_sec;
long tv_usec;
};
虽说select函数可以监视到套接字事件发生,但具体是哪些,却并没有返回,所以我们查看select执行结果,还需要看其他参数。当select调用后,被监视的fd_set变量的监视位会发生变化,如果监视的文件描述符发生事件,则该位不变,没发生事件则置0。
现在来理一理select的使用流程:
三、基于I/O复用的并发服务器的select实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/select.h>
#define BUF 100
void errorHandling(char *message);
int main(int argc, char *argv[]) {
int serv_sock, clnt_sock;
socklen_t addr_size;
struct sockaddr_in serv_addr, clnt_addr;
char buf[BUF];
int str_len;
fd_set reads, temps;
struct timeval time;
int fd_max, fd_num, i;
if (argc != 2) {
printf("Usage: %s <port>\n", argv[0]);
exit(-1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
if (serv_sock == -1)
errorHandling("socket() error!");
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1)
errorHandling("bind() error!");
if (listen(serv_sock, 5) == -1)
errorHandling("listen() error!");
FD_ZERO(&reads);
FD_SET(serv_sock, &reads);
fd_max = serv_sock;
while(1) {
temps = reads;
time.tv_sec = 5;
time.tv_usec = 5000;
fd_num = select(fd_max + 1, &temps, 0, 0, &time);
if (fd_num == -1)
break;
else if(fd_num == 0)
continue;
for (i = 0; i < fd_max + 1; i++) {
if (FD_ISSET(i, &temps)) {
if (i == serv_sock) {
printf("serv_sock:%d\n", serv_sock);
addr_size = sizeof(clnt_addr);
clnt_sock = accept(serv_sock, (struct sockaddr*) &clnt_addr, &addr_size);
FD_SET(clnt_sock, &reads);
if (fd_max < clnt_sock)
fd_max = clnt_sock;
printf("已连接客户端:%s\n", inet_ntoa(clnt_addr.sin_addr));
} else {
str_len = read(i, buf, BUF);
if (str_len == 0) {
FD_CLR(i, &temps);
close(i);
printf("close client: %d\n", i);
} else {
write(i, buf, str_len);
}
}
}
}
}
close(serv_sock);
return 0;
}
void errorHandling(char *message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(-1);
}
以上就是select的linux系统中的实现,可以通过IO复用实现基本的并发服务器,使用最早的回声客户端来测试即可,开多个客户端的连接同时测试是ok的。然后windows下的select实现如下:
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <winsock2.h>
#define BUF 30
#pragma comment(lib, "ws2_32.lib")
void errorHandling(char* message);
int main(int argc, char* argv[]) {
WSADATA wsa_data;
SOCKET serv_sock, clnt_sock;
SOCKADDR_IN serv_addr, clnt_addr;
TIMEVAL time;
fd_set reads, temps;
char buf[BUF];
int str_len, addr_size, fd_num;
u_int i;
if (argc != 2) {
printf("Usage: %s <port>\n", argv[0]);
exit(-1);
}
if (WSAStartup(MAKEWORD(2, 2), &wsa_data) != 0)
errorHandling("WSAStartup() error!");
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
if (serv_sock == INVALID_SOCKET)
errorHandling("socket() error!");
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (SOCKADDR*) &serv_addr, sizeof(serv_addr)) == SOCKET_ERROR)
errorHandling("bind() error!");
if (listen(serv_sock, 5) == SOCKET_ERROR)
errorHandling("listen() error!");
FD_ZERO(&reads);
FD_SET(serv_sock, &reads);
while (1) {
temps = reads;
time.tv_sec = 5;
time.tv_usec = 5000;
fd_num = select(0, &temps, 0, 0, &time);
if (fd_num == SOCKET_ERROR)
break;
else if (fd_num == 0)
continue;
for (i = 0; i < reads.fd_count; i++) {
if (FD_ISSET(reads.fd_array[i], &temps)) {
if (reads.fd_array[i] == serv_sock) {
addr_size = sizeof(clnt_addr);
clnt_sock = accept(serv_sock, (SOCKADDR*) &clnt_addr, &addr_size);
FD_SET(clnt_sock, &reads);
printf("已连接客户端:%d, IP: %s\n", clnt_sock, inet_ntoa(clnt_addr.sin_addr));
}
else {
str_len = recv(reads.fd_array[i], buf, BUF - 1, 0);
if (str_len == 0) {
FD_CLR(reads.fd_array[i], &reads);
closesocket(temps.fd_array[i]);
printf("已关闭客户端%d\n", temps.fd_array[i]);
}
else {
send(reads.fd_array[i], buf, str_len, 0);
}
}
}
}
}
closesocket(serv_sock);
WSACleanup();
system("pause");
return 0;
}
void errorHandling(char* message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(-1);
}
关于windows的回声服务,好像linux下使用回声客户端测试有点问题,使用127.0.0.1回流linux下客户端会报错,但如果是网络IP连接又长时间尝试,我没有等结果就终止掉了。另一方面,使用linux下的初始状态的回声服务端,无论是linux客户端实现还是windows客户端实现都可连接并正常服务,同样的还有基于多进程的回声服务端和上面基于select的IO复用服务端,所以这里的测试干脆使用前面的回声客户端的windows版本算了,这样能成功(所以鸟叔那本书中排查故障的章节还是很有必要啃完的)。
四、基于epoll的IO复用
在上面的例子中,可以看到我们在对fd_set变量类型调用select时,都需要使用备份来进行,也就是每次调用select函数都需要向内核传递一次数据。对于程序来说,向内核传递数据会对程序造成极大的负担,对于网络应用来说这会造成性能上的硬伤。
对于select这种监视套接字变化的函数,是极需要内核帮助的,而另一些函数则不需要内核帮忙,比如一些简单运算。select的问题是过多的数据传递到内核,所以现在需要能够使得对内核的监视对象的传递只进行一次,这样的办法在linux中是epoll,而在windows中是IOCP(需要说明,epoll是对于select的一个补漏,并不代表它在什么情况下都是优于select的选择,很多模型都是针对特定情况的,通用的往往又不够深入,大家要根据情况进行判断)。
实现epoll服务端需要用到三个函数: epoll_create:创建保存epoll对象,返回对象文件描述符; epoll_ctl:操作epoll对象; epoll_wait:等待事件发生并返回事件集合。
epoll三个函数原型
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epofd, int flags, int fd, struct epoll_event *event);
int epoll_wait(int epofd, struct epoll_event *events, int maxevents, int time);
epoll_create函数
epoll函数中的epoll_create函数比较简单,主要就是申请size大小的epoll对象并返回对应文件描述符,但有意思的是,size是作为建议提供给内核参考,并不是作为最后的epoll对象大小,而且linux2.6.8以后的版本都是直接忽略该参数。
epoll_ctl函数细述及使用
epoll_ctl函数操作epofd文件描述符的方式主要通过flags参数决定,头文件中定义了以下常量来规定可行操作:
EPOLL_CTL_ADD:添加事件,将文件描述符fd注册到epofd文件描述符指示epoll对象中
EPOLL_CTL_DEL:从epofd文件描述符指示epoll对象中删除fd文件描述符
EPOLL_CTL_MOD:更改注册的文件描述符fd的关注事件大小
前面fd_set已经把会发生同类型事件的文件描述符统一在一个集合中,这样select函数就可以很清楚的监视文件描述符事件发生情况了,epoll_ctl函数则是明确使用epoll_event结构体指针来指示监视的事件类型。
struct epoll_event {
__uint32_t events;
epoll_data_t data;
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
}epoll_data_t;
如上所示,epoll会把发生事件的文件描述符集中在一起,但也可以用来注册需要关注的事件,这些事件主要保存在epoll_event类型变量的events成员中,使用以下常量进行标记:
EPOLLIN:监听fd的读事件;
EPOLLOUT:监听fd的写事件,要求其对应的发送数据缓冲区不能满了
EPOLLRDHUP:断开连接或半关闭情况
EPOLLPRI:收到OOB数据,即紧急数据可读事件
EPOLLERR:发生错误
EPOLLET:边缘触发的方式得到事件通知
EPOLLONESHOT:发生一次事件,相应文件描述符不再收到通知
在定义了epoll_event类型变量后,直接对该变量的events成员赋值为上面相应常量,data成员中的fd成员赋值为对应文件描述符(比如我们需要关注的某个套接字)即可完成初始化,就可以被epoll_ctl调用了。
epoll_wait函数
函数调用后,返回发生了关注事件的文件描述符数量,并在第二个参数中保存发生事件的文件描述符集合,到时候就集中针对这个集合进行处理即可,不必象select那样对所有文件描述符都迭代检查一遍。
需要注意的是,函数中的第二参数events指示的缓冲需要动态分配,就是我们常见的malloc申请动态内存。
基于epoll的回声服务端实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#define BUF 100
#define EPOLLSIZE 50
void errorHandling(char *message);
int main(int argc, char *argv[]) {
int serv_sock, clnt_sock;
struct sockaddr_in serv_addr, clnt_addr;
socklen_t addr_size;
int str_len, i;
char buf[BUF];
struct epoll_event *events, event;
int epoll_fd, event_num;
if (argc != 2) {
printf("Usage: %s <port>\n", argv[0]);
exit(-1);
}
serv_sock = socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(atoi(argv[1]));
if (bind(serv_sock, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) == -1)
errorHandling("bind() error!");
if (listen(serv_sock, 5) == -1)
errorHandling("listen() error!");
epoll_fd = epoll_create(EPOLLSIZE);
events = malloc(sizeof(struct epoll_event) *EPOLLSIZE);
event.events = EPOLLIN;
event.data.fd = serv_sock;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serv_sock, &event);
while(1) {
event_num = epoll_wait(epoll_fd, events, EPOLLSIZE, -1);
if (event_num == -1) {
puts("epoll_wait() error!");
break;
}
for (i =0; i < event_num; i++) {
if (events[i].data.fd == serv_sock) {
addr_size = sizeof(clnt_addr);
clnt_sock = accept(serv_sock, (struct sockaddr*) &clnt_addr, &addr_size);
event.events = EPOLLIN;
event.data.fd = clnt_sock;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clnt_sock, &event);
printf("已连接客户端:%d\n", clnt_sock);
} else {
str_len = read(events[i].data.fd, buf, BUF);
if (str_len == 0) {
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
close(events[i].data.fd);
printf("客户端%d关闭\n", events[i].data.fd);
} else {
write(events[i].data.fd, buf, str_len);
}
}
}
}
close(serv_sock);
close(epoll_fd);
return 0;
}
void errorHandling(char *message) {
fputs(message, stderr);
fputc('\n', stderr);
exit(-1);
}
可以看到,epoll和select的实现走的流程都是一样的,监控IO,发生事件,然后就针对特定发生事件的文件描述符进行处理,如果是连接就进行连接,如果是收到信息就读取信息;不过需要注意的是,select中要得到需要处理的文件描述符,是会一直迭代检测文件描述符集合的,而不是像epoll那样把发生事件的都集合起来进行处理,另外epoll也不像select那样,调用函数通知内核需要监控的文件描述符需要在循环中一直进行,多次传输数据给内核,这也就消除了select中的要命的缺陷。
|