select函数族如下:
服务器端使用select机制监听可读的文件描述符(客户端)的一般流程如下:
- 执行fd_set myset; 实例化一个fd_set对象。
- FD_ZERO(&myset); 把myset的所有位置为0,例如 0000 0000.
- 若新连接的客户端的文件描述符为2,用一个变量fd来记录该文件描述符,执行FD_SET(fd , &myset); 使myset的从右往左的第三位置为1,因此myset变为 0010 0000.
- 若此时又有两个新客户端连接,例如fd=3、fd=5,则再次调用FD_SET函数使myset变为 0011 0100.
- 执行select(6 , &myset ,NULL, NULL , 0); 第一个参数是此时的最大文件描述符再加1,因为我这里步骤4新增的客户端种fd最大为5,所以第一个参数应该填为6;第二个参数是表示用去监听可读文件描述符的fd_set对象,这里就是myset;第三个参数是监听可写文件描述符的fd_set对象,因为我这里只监听可读的文件描述符,所以这里填NULL;第四个参数是监听异常的,这里也不使用,所以填NULL;第五个参数是阻塞的时间,填0表示永久阻塞,知道有某个文件描述符可读,如果不想永久阻塞,可以填入一个stuct timeval 对象。
- 执行完select后,myset的所有位被置为0。此时如果只有客户端fd=2发生了可读事件,则第五步中select的返回值大于0,同时myset从左往右第三位被重新置为1( 0010 0000);如果没有客户端文件描述符发生可读事件,同时又阻塞的时间到,则返回值为0;如果返回值小于0,则表示发生了错误。
- select返回之后,最后调用FD_ISSET( 2 , &myset); 来判断fd=2 是否可读,也就是判断myset的第三位是否被置为1。如果fd=2可读(也就是myset的第三位被置为1),则 FD_ISSET返回值为 非零,否则则返回0。
- 至此select的使用流程结束。
下面将说下 如何将select来实现多并发的双向通信:
1. 首先主要的思想是启动三个线程。 2. 第一个线程用select阻塞并监听客户端的fd,判断该客户端是否可读。 3. 第二个线程从标准输入获取要发送的信息,并将信息发送给各个客户端。 4. 第三个线程是主线程,用来accpet阻塞,时刻监听是否有客户端发起tcp连接请求。
代码如下:
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/poll.h>
#include <ctime>
#include <unordered_set>
using namespace std;
pthread_mutex_t mutex;
unordered_set<int> Client_Fd_Set;
time_t t = time(0);
[[noreturn]] void *recv_thread(void *arg)
{
struct timeval tv;
tv.tv_sec=10;
tv.tv_usec=0;
while(1)
{
pthread_mutex_lock(&mutex);
while(Client_Fd_Set.size()==0);
fd_set Fd_set;
FD_ZERO(&Fd_set);
int max_fd=0;
unordered_set<int>::iterator it;
for(it = Client_Fd_Set.begin() ; it!=Client_Fd_Set.end() ;++it)
{
FD_SET(*it,&Fd_set);
max_fd=max(max_fd,*it);
}
int res = select(max_fd+1,&Fd_set , NULL ,NULL,&tv);
if(res<0)
{
perror("select error");
}
else if(res==0)
{
cout<<"no new message"<<endl;
}
else
{
char rx_buf[1024];
cout<<"have new message"<<endl;
for(it=Client_Fd_Set.begin() ; it!=Client_Fd_Set.end() ; ++it)
{
if( FD_ISSET(*it,&Fd_set)!=0)
{
memset(rx_buf,0,sizeof(rx_buf));
int len = recv(*it,rx_buf,sizeof(rx_buf),0);
if(len<=0){
cout<<"len:"<<len<<endl;
close(*it);
Client_Fd_Set.erase(*it);
close(*(int*)arg);
exit(0);
}
t = time(0);
struct tm *timemsg = localtime(&t);
string time;
time+="(";time+= to_string(timemsg->tm_hour);time+=':';time+=to_string(timemsg->tm_min);time+=':';time+=to_string(timemsg->tm_sec);time+=")";
cout<<time<<"recv from "<<*it<<":"<<rx_buf<<endl;
if(strcmp(rx_buf,"end")==0)
{
close(*it);
cout<<"Close Client fd:"<<*it<<endl;
Client_Fd_Set.erase(*it);
cout<<"recv_thread exit"<<endl;
pthread_exit(0);
}
#if 0
send(*it,rx_buf,len ,0);
cout<<time<<"send to "<<*it<<":"<<rx_buf<<endl;
#endif
}
}
}
pthread_mutex_unlock(&mutex);
sleep(0.01);
}
}
[[noreturn]] void *send_thread(void *arg)
{
while(true)
{
string temp_buf;
getline(cin,temp_buf);
if(temp_buf=="end")
{
cout<<"send_thread exit"<<endl;
close(*(int *)arg);
pthread_exit(0);
}
unordered_set<int>::iterator it;
for(it = Client_Fd_Set.begin();it!=Client_Fd_Set.end();++it)
{
char *send_buf;
send_buf = (char *)temp_buf.data();
send(*it,send_buf,strlen(send_buf) ,0);
t = time(0);
struct tm *timemsg = localtime(&t);
string time;
time+="(";time+= to_string(timemsg->tm_hour);time+=':';time+=to_string(timemsg->tm_min);time+=':';time+=to_string(timemsg->tm_sec);time+=")";
cout<<time<<"send to "<<*it<<":"<<send_buf<<endl;
}
sleep(0.01);
}
}
int main() {
pthread_t thread_write,thread_read;
std::cout << "Server start!" << std::endl;
int server_fd = socket(AF_INET, SOCK_STREAM , 0);
std::cout << "Server fd="<<server_fd << std::endl;
struct sockaddr_in server_addr , client_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(995);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if(-1 == bind(server_fd,(struct sockaddr*)&server_addr ,sizeof(server_addr)))
{
perror("bind error");
exit(1);
}
if(-1 == listen(server_fd,20))
{
perror("listen error");
exit(1);
}
pthread_mutex_init(&mutex,NULL);
pthread_create(&thread_write ,NULL , send_thread ,&server_fd);
pthread_detach(thread_write);
pthread_create(&thread_read ,NULL , recv_thread ,&server_fd);
pthread_detach(thread_read);
socklen_t client_addr_len = sizeof(client_addr);
while(true)
{
int client_fd = accept(server_fd,(struct sockaddr*)&client_addr ,&client_addr_len);
if(client_fd < 0 ){
perror("accept error");
break;
}
cout<<"New Client fd="<<client_fd<<endl;
pthread_mutex_lock(&mutex);
Client_Fd_Set.insert(client_fd);
pthread_mutex_unlock(&mutex);
}
unordered_set<int>::iterator it;
for(it = Client_Fd_Set.begin() ; it!=Client_Fd_Set.end();++it)
{
close(*it);
}
close(server_fd);
cout<<"main_thread exit"<<endl;
exit(0);
return 0;
}
|