gcc 1.c -o 1 -lpthread
ctrl+f搜索服务端代码和客户端代码获取代码 该方法也可以实现并发服务器
IO多路复用,IO多路转接
简介
IO多路复用中的IO并不是指单纯的标准输入和输出
这里的IO指的是程序和内存之间的IO
从程序将数据写入内存就叫做输出
从内存中将数据读入程序中就叫做输入
socket通信过程中也牵涉到大量的IO操作
在unix中,socket不过就是一个文件描述符,对应内核中的一块缓冲区
缓冲区分为读缓冲区和写缓冲区
IO多路复用具体指的就是对读写缓冲区的操作
IO多路复用能够同时监听多个文件描述符(内核中的缓冲区),可以提高程序的性能
在unix中,实现IO多路复用技术的系统调用有select、poll、epoll
如何知道哪个客户端发来了数据?
这个是阻塞模型,一次只能处理一个,在数据未达到前一直处于阻塞状态
使用多线程或者多进程解决并发问题,但是线程和进程会消耗较多的资源
阻塞模型的代码:
int lfd = socket(...);
bind(lfd, ...);
listen(lfd, ...);
while (1) {
int cfd = accept(lfd, ...);
}
线程和进程本身会消耗资源,对线程和进程进行调度也会消耗资源
但是最根本的问题还是blocking,它终究还是阻塞的
非阻塞、忙轮训模型
将read和accept设置为非阻塞
但是需要进行循环来检测accept和read是否收到数据
但是这种方式需要占用较高的CPU资源
最理想的解决方式就是使用IO多路复用技术select、poll、epoll
非阻塞模型,NIO
int lfd = socket(...);
bind(lfd, ...);
listen(lfd, ...);
int cfd = accept(lfd, ...);
read(cfd, ...);/recv(cfd, ...);
对文件描述符进行遍历,查看是否有数据到达
弊端:如果有一万个客户端进来,那么就要遍历一万个文件描述符
IO多路转接技术
select/poll
让内核帮我们检测哪些文件描述符中有数据到达
但是select并不会告诉你具体是哪一个文件描述符中有数据到达
通过bit位来标识文件描述符是否有数据到达
epoll
epoll会告诉我们具体是哪几个文件描述符中有数据到达
一前需要逐个遍历文件描述符(多路),现在只需要调用一次API,就可以确定哪些文件描述符中有数据到达
select
主旨思想:
- 构建一个文件描述符列表,里面是要监听的文件描述符,我们要检测哪些文件描述符中有数据到达
- 调用一个系统函数,监听该列表中的文件描述符,直到这些文件描述符中的一个或多个进行了IO操作,这个函数才会返回
- 该函数是阻塞的
- 该函数对文件描述符的检测操作是由内核完成的
- 在返回时,该函数会告诉进程有多少描述符要进行IO操作
#include<sys/time.h>
#include<sys/types.h>
#include<unistd.h>
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
- nfds : 委托内核检测的最大文件描述符+1
- readfds : 要检测的读文件描述符集合(就是委托内核检测读缓冲区中是否有数据进来)
- writefds : 要检测的写文件描述符集合(委托内核检测写缓冲区是否能写,没满就可以写)
- exceptfds : 没啥用
- timeout : 这个数据类型我们之前已经接触过了(一个结构体,包含s和ms两部分),用于设置超时时间
- 返回值 : -1表示失败, >0(n), 检测的集合中有n个描述符发生了变化
void FD_CLR(int fd, fd_set* set);
void FD_ISSET(int fd, fd_set* set);
void FD_SET(int fd, fd_set* set);
void FD_ZERO(fd_set* set);
使用select API进行代码的编写
poll api
select技术的缺点
每次调用select,都需要fd_set从用户态拷贝到内核态,然后再从内核态拷贝到用户态
而且还需要遍历fd_set
而且只支持1024个文件描述符
且fds集合不能重用,每次都要清空,在之前的代码中:tmp = read_set; ,因为内核会对fd_set进行修改
struct pollfd {
int fd;
short events;
short revents
}
int poll(struct pollfd* fds, nfds_t nfds, int timeout)
- fds : 需要检测的文件描述符集合
- nfds : 这是第一个参数数组中最后一个有效元素的下标+1
- timeout : 阻塞时长,0表示不阻塞,-1表示阻塞,当检测到需要检测的文件描述符发生变化时,接触阻塞,>0(n)阻塞时长n,单位为
返回值:
-1表示失败
>0(n)表示成功,表示检测到了n个文件描述符发生了变化
服务端代码:
#include <sys/mman.h>
#include <pthread.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
int asda =5;
int tickets =100;
pthread_mutex_t mutex;
pthread_mutex_t mutex1;
pthread_mutex_t mutex2;
void *sig_handler(void* arg) {
while(1){
pthread_mutex_lock(&mutex);
pthread_mutex_lock(&mutex);
if(tickets>0){
printf("子线程%ld正在卖地%d张哦票\n",pthread_self(),100-tickets+1);
tickets--;
}else {pthread_mutex_unlock(&mutex);break;}
pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
pthread_mutex_t asdasdasdasdasdas;
int numm =1;
void *sig_handler1(void* arg) {
while(1){
pthread_mutex_lock(&asdasdasdasdasdas);
numm++;
printf("++谢谢,tid:%ld,nummm:%d\n",pthread_self(),numm);
pthread_mutex_unlock(&asdasdasdasdasdas);
usleep(1);
}
return NULL;
}
void *sig_handler12232(void* arg) {
while(1){
pthread_mutex_lock(&asdasdasdasdasdas);
printf("++读读,tid:%ld,nummm:%d\n",pthread_self(),numm);
pthread_mutex_unlock(&asdasdasdasdasdas);
usleep(1);
}
return NULL;
}
#include<sys/time.h>
#include<time.h>
void sig_handler2(int num){
time_t tm2 = time(NULL);
struct tm* loc = localtime(&tm2);
char* str =asctime(loc);
int fd =open("time.txt",O_RDWR|O_CREAT|O_APPEND,0664);
write(fd,str,strlen(str));
close(fd);
}
#include<sys/shm.h>
pthread_mutex_t mutext;
pthread_cond_t cond;
#include <semaphore.h>
sem_t psem;
sem_t csem;
struct Node {
int num;
struct Node* next;
};
struct Node* head = NULL;
void* producer (void* arg) {
while(1) {
sem_wait(&psem);
pthread_mutex_lock(&mutex);
struct Node* new_node = (struct Node*)malloc(sizeof(struct Node));
new_node->next = head;
head = new_node;
new_node->num = rand() % 1000;
printf("add node, num %d, tid: %ld\n", new_node->num, pthread_self());
pthread_mutex_unlock(&mutex);
sem_post(&csem);
}
return NULL;
}
void* consumer (void* arg) {
while(1) {
sem_wait(&csem);
pthread_mutex_lock(&mutex);
struct Node* tmp = head;
head = head->next;
printf("delete node, num: %d, tid: %ld\n", tmp->num, pthread_self());
free(tmp);
pthread_mutex_unlock(&mutex);
sem_post(&psem);
}
return NULL;
}
#include<arpa/inet.h>
#include<bits/socket.h>
void recyle_child(int arg) {
while(1) {
int ret = waitpid(-1, NULL, WNOHANG);
if (-1 == ret) {
printf("所有的子进程回收完毕\n");
break;
} else if (ret == 0) {
break;
} else if (ret > 0) {
printf("子进程%d被回收了\n", ret);
}
}
}
#include<errno.h>
#include<ctype.h>
#include<ctype.h>
#include<ctype.h>
#include<sys/select.h>
#include<poll.h>
int main()
{
struct sigaction act;
act.sa_flags=0;
act.sa_sigaction = recyle_child;
sigemptyset(&act.sa_mask);
sigaction(SIGCHLD, &act, NULL);
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == lfd) {
perror("socket");
exit(-1);
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(9999);
int ret = bind(lfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (-1 == ret) {
perror("bind");
exit(-1);
}
ret = listen(lfd, 128);
if (-1 == ret) {
perror("listen");
exit(-1);
}
struct pollfd fds[1024];
for (size_t i = 0; i < 1024; i++)
{
fds[i].fd = -1;
fds[i].events = POLLIN;
}
fds[0].fd = lfd;
int nfds = 1023;
printf("等待客户端连接...\n");
while(1) {
int ret = poll(fds, nfds+1, -1);
if (-1 == ret) {
perror("poll");
exit(-1);
} else if (ret == 0) {
continue;
} else if (ret > 0) {
if (fds[0].revents & POLLIN) {
struct sockaddr_in client_addr;
int len = sizeof(client_addr);
int cfd = accept(fds[0].fd, (struct sockaddr*)&client_addr, &len);
printf("新的客户端进来了\n");
char client_ip[16];
inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, client_ip, sizeof(client_ip));
unsigned short client_port = ntohs(client_addr.sin_port);
printf("客户端IP:%s\n端口:%d\n", client_ip, client_port);
if (-1 == cfd) {
if (EINTR == errno ) {
continue;
}
perror("accept");
exit(-1);
}
for (size_t i = 0; i < 1024; i++)
{
if (fds[i].fd == -1) {
fds[i].fd = cfd;
fds[i].events = POLLIN;
break;
}
}
}
for (size_t i = 1; i < 1024; i++)
{
if (fds[i].revents & POLLIN) {
char buf[1024] = {0};
int len = read(fds[i].fd, buf, sizeof(buf));
if (-1 == len) {
perror("read");
exit(-1);
} else if (0 == len) {
printf("客户端断开连接了\n");
close(fds[i].fd);
fds[i].fd = -1;
} else if (len > 0) {
printf("读取到数据:%s\n", buf);
write(fds[i].fd, buf, strlen(buf)+1);
}
}
}
}
}
close(lfd);
return 0;
}
|