IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> IO多路复用机制——Select -> 正文阅读

[系统运维]IO多路复用机制——Select


select函数族如下:

在这里插入图片描述

服务器端使用select机制监听可读的文件描述符(客户端)的一般流程如下:

  1. 执行fd_set myset; 实例化一个fd_set对象。
  2. FD_ZERO(&myset); 把myset的所有位置为0,例如 0000 0000.
  3. 若新连接的客户端的文件描述符为2,用一个变量fd来记录该文件描述符,执行FD_SET(fd , &myset); 使myset的从右往左的第三位置为1,因此myset变为 0010 0000.
  4. 若此时又有两个新客户端连接,例如fd=3、fd=5,则再次调用FD_SET函数使myset变为 0011 0100.
  5. 执行select(6 , &myset ,NULL, NULL , 0); 第一个参数是此时的最大文件描述符再加1,因为我这里步骤4新增的客户端种fd最大为5,所以第一个参数应该填为6;第二个参数是表示用去监听可读文件描述符的fd_set对象,这里就是myset;第三个参数是监听可写文件描述符的fd_set对象,因为我这里只监听可读的文件描述符,所以这里填NULL;第四个参数是监听异常的,这里也不使用,所以填NULL;第五个参数是阻塞的时间,填0表示永久阻塞,知道有某个文件描述符可读,如果不想永久阻塞,可以填入一个stuct timeval 对象。
  6. 执行完select后,myset的所有位被置为0。此时如果只有客户端fd=2发生了可读事件,则第五步中select的返回值大于0,同时myset从左往右第三位被重新置为1( 0010 0000);如果没有客户端文件描述符发生可读事件,同时又阻塞的时间到,则返回值为0;如果返回值小于0,则表示发生了错误。
  7. select返回之后,最后调用FD_ISSET( 2 , &myset); 来判断fd=2 是否可读,也就是判断myset的第三位是否被置为1。如果fd=2可读(也就是myset的第三位被置为1),则 FD_ISSET返回值为 非零,否则则返回0。
  8. 至此select的使用流程结束。

下面将说下 如何将select来实现多并发的双向通信:

1. 首先主要的思想是启动三个线程。
2. 第一个线程用select阻塞并监听客户端的fd,判断该客户端是否可读。
3. 第二个线程从标准输入获取要发送的信息,并将信息发送给各个客户端。
4. 第三个线程是主线程,用来accpet阻塞,时刻监听是否有客户端发起tcp连接请求。

代码如下:

#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h> //POSIX 操作系统API
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <sys/poll.h>
#include <ctime>
#include <unordered_set>

using namespace std;

pthread_mutex_t mutex;             //锁
unordered_set<int> Client_Fd_Set;  //存放客户端文件描述符的SET

time_t t = time(0);          //记录发送和接收数据的时间


[[noreturn]] void *recv_thread(void *arg) //接收线程
{
    struct timeval tv;
    tv.tv_sec=10;
    tv.tv_usec=0;
    while(1)
    {
        pthread_mutex_lock(&mutex);
        while(Client_Fd_Set.size()==0);
        fd_set Fd_set;
        FD_ZERO(&Fd_set);
        int max_fd=0;
        unordered_set<int>::iterator it;
        for(it = Client_Fd_Set.begin() ; it!=Client_Fd_Set.end() ;++it)
        {
            FD_SET(*it,&Fd_set);
            max_fd=max(max_fd,*it);
        }
        int res = select(max_fd+1,&Fd_set , NULL ,NULL,&tv);
        if(res<0)
        {
            perror("select error");
        }
        else if(res==0)
        {
            cout<<"no new message"<<endl;
        }
        else
        {
            char rx_buf[1024];
            cout<<"have new message"<<endl;
            for(it=Client_Fd_Set.begin() ; it!=Client_Fd_Set.end() ; ++it)
            {
                if( FD_ISSET(*it,&Fd_set)!=0)
                {
                    memset(rx_buf,0,sizeof(rx_buf));
                    int len = recv(*it,rx_buf,sizeof(rx_buf),0);//最后一个参数可改为MSG_WAITALL,会导致一直阻塞直到读取到指定字节
                    if(len<=0){
                        cout<<"len:"<<len<<endl;
                        close(*it);
                        Client_Fd_Set.erase(*it);
                        close(*(int*)arg);
                        exit(0);
                    }
                    t = time(0);
                    struct tm *timemsg = localtime(&t);
                    string time;
                    time+="(";time+= to_string(timemsg->tm_hour);time+=':';time+=to_string(timemsg->tm_min);time+=':';time+=to_string(timemsg->tm_sec);time+=")";
                    cout<<time<<"recv from "<<*it<<":"<<rx_buf<<endl;
                    if(strcmp(rx_buf,"end")==0)
                    {
                        close(*it);
                        cout<<"Close Client fd:"<<*it<<endl;
                        Client_Fd_Set.erase(*it);
//                        close(*(int*)arg);
                        cout<<"recv_thread exit"<<endl;
                        pthread_exit(0);
                    }
                    //数据回传,将接收到的数据回传给客户端,有需要的话把0改成1
                    #if 0
                    send(*it,rx_buf,len ,0);
                    cout<<time<<"send to "<<*it<<":"<<rx_buf<<endl;
                    #endif
                }
            }
        }
        pthread_mutex_unlock(&mutex);
        sleep(0.01);
    }
}

[[noreturn]] void *send_thread(void *arg)  //发送线程
{
    while(true)
    {
        string temp_buf;
        getline(cin,temp_buf);
        if(temp_buf=="end")             //当标准输入 输入end时,关闭嵌套字,导致主线程里会跳出while
        {
            cout<<"send_thread exit"<<endl;
            close(*(int *)arg);
            pthread_exit(0);
        }
        unordered_set<int>::iterator it;
        for(it = Client_Fd_Set.begin();it!=Client_Fd_Set.end();++it) //这里是往所有客户端发信息,可以再优化
        {
            char *send_buf;
            send_buf = (char *)temp_buf.data();
            send(*it,send_buf,strlen(send_buf) ,0);

            t = time(0);
            struct tm *timemsg = localtime(&t);
            string time;
            time+="(";time+= to_string(timemsg->tm_hour);time+=':';time+=to_string(timemsg->tm_min);time+=':';time+=to_string(timemsg->tm_sec);time+=")";
            cout<<time<<"send to "<<*it<<":"<<send_buf<<endl;
        }
        sleep(0.01);
    }
}


int main() {
    pthread_t thread_write,thread_read;
    std::cout << "Server start!" << std::endl;
    int server_fd = socket(AF_INET, SOCK_STREAM , 0);
    std::cout << "Server fd="<<server_fd << std::endl;
    struct sockaddr_in server_addr , client_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(995);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 字符串转网络字节序
//    server_addr.sin_addr.s_addr = inet_addr("0,0,0,0"); 跟上面一句等价
    if(-1 == bind(server_fd,(struct  sockaddr*)&server_addr ,sizeof(server_addr)))
    {
        perror("bind error");
        exit(1);
    }

    if(-1 == listen(server_fd,20))
    {
        perror("listen error");
        exit(1);
    }
    pthread_mutex_init(&mutex,NULL);
    pthread_create(&thread_write ,NULL , send_thread ,&server_fd);
    pthread_detach(thread_write);
    pthread_create(&thread_read ,NULL , recv_thread ,&server_fd);
    pthread_detach(thread_read);

    socklen_t client_addr_len = sizeof(client_addr);
    while(true)
    {
        int client_fd = accept(server_fd,(struct sockaddr*)&client_addr ,&client_addr_len);
        if(client_fd < 0 ){
            perror("accept error");
            break;
        }
        cout<<"New Client fd="<<client_fd<<endl;
        pthread_mutex_lock(&mutex);
        Client_Fd_Set.insert(client_fd);
        pthread_mutex_unlock(&mutex);
    }

    unordered_set<int>::iterator  it;
    for(it = Client_Fd_Set.begin() ; it!=Client_Fd_Set.end();++it)
    {
        close(*it);
    }
    close(server_fd);
    cout<<"main_thread exit"<<endl;
    exit(0);
    return 0;
}

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-03-22 21:01:04  更:2022-03-22 21:05:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/9 1:24:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码