实现TCP并发服务器—IO多路复用
1.服务器模型
1.1 概念
服务器模型主要分为两种,循环服务器,并发服务器
循环服务器:服务器在同一时间只能处理一个客户端的请求。
并发服务器:服务器在同一时间内能同时处理多个客户端的请求。
TCP的服务器默认的就是一个循环服务器 有两个阻塞函数( accept recv) 会相互影响
UDP的服务器默认的就是一个并发服务器,因为只有一个阻塞函数( recvfrom)
1.2 TCP并发服务器
有些场景下,我们既要保证数据可靠,又要支持并发,这就需要用到TCP并发服务器
如何实现TCP并发服务器?
? (1)使用多进程实现TCP并发服务器
? (2)使用多线程实现TCP并发服务器
? (3)使用多路IO复用实现TCP并发服务器
对于实际开发过程中,使用多进程实现TCP并发服务器,并发量大的时候,对系统的资源占用量也会很大
如果使用多线程,业务逻辑复杂的时候,又涉及到临近资源访问的问题
比较好的方式是使用多路IO复用实现TCP并发服务器
2.select实现TCP并发
2.1select函数说明
功能:
实现多路IO复用
头文件:
#include <sys/select.h>
函数原型:
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
参数:
nfds: 监视的最大的文件描述符 +1
readfds: 要监视的读文件描述符集合,如果不关心,可以传NULL
writefds: 要监视的写文件描述符集合,如果不关心,可以传NULL
exceptfds:要监视的异常的文件描述符集合,如果不关心,可以传NULL
(一般我们只关心readfds)
timeout:超时时间
0 非阻塞
NULL 永久阻塞
结构体 阻塞一定的时间
--暂时先不用管,后面网络超时检测再讲
返回值:
成功 就绪的文件描述符的个数
失败 -1
超时 0
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
注意:
1.select只能监视小于 FD_SETSIZE(1024) 的文件描述符
本质是用数组(128字节)保存 每一个bit位监视一个fd
2.select函数在返回时会将没有就绪的文件描述符在表中擦除
所以,在循环中调用select时,每次需要重新填充集合
2.2select代码说明
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#define ERRLOG(msg) \
do { \
printf("%s %s %d\n", __FILE__, __func__, __LINE__); \
perror(msg); \
exit(-1); \
} while (0)
int main(int argc, const char* argv[])
{
if (3 != argc) {
printf("Usage: %s <Ip> <Port>", argv[0]);
return -1;
}
int sockfd = 0;
if (-1 == (sockfd = socket(AF_INET, SOCK_STREAM, 0))) {
ERRLOG("socket error");
}
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = inet_addr(argv[1]);
serveraddr.sin_port = ntohs(atoi(argv[2]));
socklen_t serveraddr_len = sizeof(serveraddr);
if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, serveraddr_len)) {
ERRLOG("bind error");
}
if (-1 == listen(sockfd, 5)) {
ERRLOG("listen error");
}
struct sockaddr_in clientaddr;
socklen_t clientaddr_len = sizeof(clientaddr);
fd_set readfds;
FD_ZERO(&readfds);
fd_set readfds_temp;
FD_ZERO(&readfds_temp);
int max_fd = 0;
FD_SET(sockfd, &readfds);
max_fd = max_fd > sockfd ? max_fd : sockfd;
int ret = 0;
int i = 0;
char buf[128] = { 0 };
int nbytes = 0;
while (1) {
readfds_temp = readfds;
int acceptfd = 0;
if (-1 == (ret = select(max_fd + 1, &readfds_temp, NULL, NULL, NULL))) {
ERRLOG("select error");
} else {
for (i = 3; i < max_fd + 1; i++) {
if (FD_ISSET(i, &readfds_temp)) {
ret--;
if (i == sockfd) {
if (-1 == (acceptfd = accept(sockfd, (struct sockaddr*)&clientaddr, &clientaddr_len))) {
ERRLOG("accept error");
}
printf("客户端[%d]连接到服务器...\n", acceptfd);
FD_SET(acceptfd, &readfds);
max_fd > acceptfd ? max_fd : acceptfd;
} else {
memset(buf, 0, sizeof(buf));
if (-1 == (nbytes = recv(i, buf, sizeof(buf), 0))) {
ERRLOG("recv error");
} else if (0 == nbytes) {
printf("客户端[%d]断开了连接...\n", i);
close(i);
FD_CLR(i, &readfds);
continue;
}
if (!strcmp(buf, "quit")) {
printf("客户端[%d]退出了...\n", i);
close(i);
FD_CLR(i, &readfds);
continue;
}
printf("客户端[%d]发来数据[%s]\n", i, buf);
strcat(buf, "1115");
if (-1 == send(i, buf, sizeof(buf), 0)) {
ERRLOG("send error");
}
}
}
}
}
}
close(sockfd);
return 0;
}
3.poll实现TCP并发
3.1poll函数说明
功能:
实现多路IO复用
头文件:
#include <poll.h>
函数原型:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
参数:
fds:要监视的文件描述符集合的结构体数组的首地址
使用的下面的结构体
struct pollfd {
int fd;
short events;
short revents;
};
events:使用位运算来或起来的 感兴趣的事件
POLLIN 读事件 --我们一般只关注读
POLLOUT 写事件
POLLERR 异常事件
revents:实际发生的事件
nfds:数组中实际有效的文件描述符的个数
timeout:超时时间 单位毫秒
-1 永久阻塞
0 非阻塞
5000 阻塞 5 秒
返回值:
成功 就绪的文件描述符的个数
失败 -1
超时 0
3.2poll函数代码说明
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <poll.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#define ERRLOG(msg) \
do { \
printf("%s %s %d \n", __FILE__, __func__, __LINE__); \
perror(msg); \
exit(-1); \
} while (0)
int main(int argc, const char* argv[])
{
if (3 != argc) {
printf("Usage:%s <Ip> <Port>", argv[0]);
return -1;
}
int sockfd = 0;
if (-1 == (sockfd = socket(AF_INET, SOCK_STREAM, 0))) {
ERRLOG("socket error");
}
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = inet_addr(argv[1]);
serveraddr.sin_port = htons(atoi(argv[2]));
socklen_t serveraddr_len = sizeof(serveraddr);
if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, serveraddr_len)) {
ERRLOG("bind error");
}
if (-1 == listen(sockfd, 5)) {
ERRLOG("listen error");
}
char buf[128] = { 0 };
int acceptfd = 0;
int max_fd = 0;
int nbytes = 0;
int i = 0;
int j = 0;
int ret = 0;
struct pollfd fds[1024];
for (i = 0; i < 1024; i++) {
fds[i].fd = -1;
}
fds[0].fd = sockfd;
fds[0].events = POLLIN;
max_fd = max_fd > sockfd ? max_fd : sockfd;
while (1) {
if (-1 == (ret = poll(fds, max_fd, 5000))) {
ERRLOG("poll error");
} else if (0 == ret) {
printf("poll timeout ...\n");
continue;
} else {
for (i = 0; i < max_fd && ret != 0; i++) {
if (fds[i].revents & POLLIN != 0) {
ret--;
if (fds[i].fd == sockfd) {
if (-1 == (acceptfd = accept(fds[i].fd, NULL, NULL))) {
ERRLOG("accept error");
}
printf("客户端[%d]连接到服务器..\n", acceptfd);
for (j = 0; j < 1024; j++) {
if (fds[j].fd == -1) {
fds[j].fd = acceptfd;
fds[j].events = POLLIN;
break;
}
}
if (j == 1024) {
close(acceptfd);
}
max_fd = max_fd > acceptfd ? max_fd : acceptfd;
} else {
memset(buf, 0, sizeof(buf));
if (-1 == (nbytes = recv(fds[i].fd, buf, sizeof(buf), 0))) {
ERRLOG("recv error");
} else if (0 == nbytes) {
printf("客户端[%d]断开了连接..\n", fds[i].fd);
close(fds[i].fd);
fds[i].fd = -1;
continue;
}
if (!strcmp(buf, "quit")) {
printf("客户端[%d]退出了..\n", fds[i].fd);
close(fds[i].fd);
fds[i].fd = -1;
continue;
}
printf("客户端[%d]发来数据:[%s]\n", fds[i].fd, buf);
strcat(buf, "--hqyj");
if (-1 == send(fds[i].fd, buf, sizeof(buf), 0)) {
ERRLOG("send error");
}
}
}
}
}
}
close(sockfd);
return 0;
}
4.客户端代码
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#define ERRLOG(msg) do{\
printf("%s %s %d:", __FILE__, __func__, __LINE__);\
perror(msg);\
exit(-1);\
}while(0)
#define N 128
int main(int argc, const char *argv[]){
if(3 != argc){
printf("Usage : %s <IP> <PORT>\n", argv[0]);
return -1;
}
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(-1 == sockfd){
ERRLOG("socket error");
}
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_port = htons(atoi(argv[2]));
serveraddr.sin_addr.s_addr = inet_addr(argv[1]);
socklen_t serveraddr_len = sizeof(serveraddr);
char buff[128] = {0};
int nbytes = 0;
if(-1 == connect(sockfd, (struct sockaddr *)&serveraddr, serveraddr_len)){
ERRLOG("connect error");
}
printf("与服务器建立连接成功..\n");
while(1){
memset(buff, 0, sizeof(buff));
fgets(buff, N, stdin);
buff[strlen(buff)-1] = '\0';
if(-1 == send(sockfd, buff, sizeof(buff), 0)){
ERRLOG("send error");
}
if(-1 == (nbytes = recv(sockfd, buff, sizeof(buff), 0))){
ERRLOG("recv error");
}
if(0 == nbytes){
break;
}
printf("应答为:[%s]\n", buff);
}
close(sockfd);
return 0;
}
|