IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> C++知识库 -> 基于c++11,尽量简单的语法实现线程池 -> 正文阅读

[C++知识库]基于c++11,尽量简单的语法实现线程池


前言

本人菜鸟一枚,最近在学c++11多线程特性(也是在看面试八股文),看了网上很多代码后手写了一个线程池。代码中如有错误之处,希望各位大佬不吝赐教。


一、用到的c++11

  1. mutex互斥锁
  2. condition_variable 条件变量
  3. functional 函数包装器
  4. atomic 原子操作
  5. thread 线程类
  6. 其它一些c++11常用技术

二、代码

1.threadPool.h

代码如下:

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_

#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <vector>
#include <atomic>

using Task=std::function<void()>;

class Thread_pool{
public:
    Thread_pool(int max,int min);
    ~Thread_pool();

    //向任务队列中添加新任务
    void add_work_queue(Thread_pool* pool,Task task);
    //工作线程执行函数
    static void worker_fun(void* arg);
    //管理者线程执行函数
    static void manager_fun(void* arg);
    //销毁线程池
    bool destroy();
    //得到当前线程池内线程数量
    int get_threads_num();

private:
    std::queue<Task> task_queue;   //任务队列
    std::vector<std::thread> work_threads;   //工作线程
    std::thread manager_thread;     //管理者线程--根据实际运行可1.5倍动态增长线程数量(模仿vector只增不减,实际工作中可能并不适合 哈哈)
    int max_num;    //线程池中最大、最小线程数量
    int min_num;
    std::atomic_int cur_num;        //线程池当前的线程数量--原子操作
    std::atomic_int cur_work_num;   //当前正在工作的线程数量--原子操作
    bool isShutdown;    //是否开始销毁线程池

    //声明互斥锁和条件变量:用于同步任务队列
    std::mutex task_queue_mutex;
    std::condition_variable task_queue_cond;
};


#endif

2.threadPool.cpp

代码如下:

#include "threadPool.h"
#include <iostream>

Thread_pool::Thread_pool(int max=15,int min=3)
{
    this->max_num=max;
    this->min_num=min;
    this->cur_work_num=0;   //当前正在工作的线程数量为0
    this->cur_num=min;      //起始时创建min个线程
    this->isShutdown=false;
    
    //创建管理者线程
    manager_thread=std::thread(Thread_pool::manager_fun,this);
    //创建工作线程
    for(int i=0;i<min;i++){
        work_threads.emplace_back(std::thread(Thread_pool::worker_fun,this));
    }
}
//向任务队列中添加新任务
void Thread_pool::add_work_queue(Thread_pool* pool,Task task)
{
    if(pool->isShutdown)
        return;
    //注意线程同步
    std::unique_lock<std::mutex> lck(pool->task_queue_mutex);
    pool->task_queue.push(task);
    //唤醒一个阻塞的线程
    pool->task_queue_cond.notify_one();	
}
//管理者线程
void Thread_pool::manager_fun(void* arg)
{
    Thread_pool* p=(Thread_pool*)arg;
    while(1){
        if(p->cur_num<=0)
            break;

        //管理者线程每隔3s苏醒一次,检查当前线程池线程数量
        //管理策略:忙碌线程数量>=线程池线程数量*80%   自动扩容1.5倍
        std::this_thread::sleep_for(std::chrono::seconds(3));
        int thread_num=p->cur_num.load();   //原子取值操作
        int work_num=p->cur_work_num.load();
     
        if(work_num>=thread_num*0.8 && thread_num*1.5<=p->max_num){
            int new_num=thread_num*0.5;
            //此处需加锁???
            while(new_num--){
                p->work_threads.emplace_back(std::thread(Thread_pool::worker_fun,p));
                p->cur_num++;   //原子操作
            }
        }
    }
    std::cout<<"manager_thread exit\n";
}

//工作线程
void Thread_pool::worker_fun(void* arg)
{
    Thread_pool* p=(Thread_pool*) arg;
    while(1){
        if(p->task_queue.empty()&&p->isShutdown)
            break;

        std::unique_lock<std::mutex> lck(p->task_queue_mutex);
        p->task_queue_cond.wait(lck,[=](){return p->task_queue.size()>0||p->isShutdown;});
        //加上p->isShutdown为了解决死锁问题
        if(p->task_queue.empty())
            break;
        //取出任务
        auto t=p->task_queue.front();
        p->task_queue.pop();
        //释放任务队列锁
        lck.unlock(); //在执行任务前必须释放任务队列锁

        p->cur_work_num++;  //忙碌线程数加一
        t();    //执行任务
        p->cur_work_num--;  //忙碌线程数减一
    }
    p->cur_num--;	//线程退出
    std::cout<<"worker_thread exit\n";
}
//销毁线程池
bool Thread_pool::destroy()
{
    this->isShutdown=true;
    this->task_queue_cond.notify_all();	//唤醒所有阻塞的线程,前面添加新任务只唤醒一个
    //销毁之前队列中存在的所有任务
    // while(!this->task_queue.empty()){
    // }
    for(int i=0;i<this->work_threads.size();i++){
        work_threads[i].join();
    }
	this->manager_thread.join();
	
    return true;
}
//得到当前线程池内线程数量
int Thread_pool::get_threads_num()
{
    return this->cur_num.load();    //原子
}

Thread_pool::~Thread_pool()
{

}

3.main.cpp

#include "threadPool.h"
#include <iostream>
#include <pthread.h>
//这里用到了linux的线程库pthread.h,用来获得本线程的id
//我的实验环境是deepin+vscode,若在windows环境运行 可删去
std::mutex m;
void fun()
{
    //cout不是线程安全的
    std::unique_lock<std::mutex> lck(m);
    std::cout<<"Thread: "<<pthread_self()<<"is working--"<<rand()%20<<std::endl;
    lck.unlock();
    std::this_thread::sleep_for(std::chrono::milliseconds(50));
}

int main()
{
    srand(time(nullptr));

    Task Func;
    Thread_pool *pool=new Thread_pool(15,3);
    std::cout<<pool->get_threads_num()<<std::endl;
	//执行1000个任务
    for(int i=0;i<1000;i++){
        Func=fun;
        pool->add_work_queue(pool,fun);
    }
    
    pool->destroy();
    delete pool;

    return 0;
}


总结

期间经历了一个处理死锁的过程: 当主线程发出销毁线程池的信号后,不再给任务队列添加新任务,如果此时任务队列为空,会导致正处于条件变量wait状态的线程死锁!(最后加了个判断操作 得以解决)
  C++知识库 最新文章
【C++】友元、嵌套类、异常、RTTI、类型转换
通讯录的思路与实现(C语言)
C++PrimerPlus 第七章 函数-C++的编程模块(
Problem C: 算法9-9~9-12:平衡二叉树的基本
MSVC C++ UTF-8编程
C++进阶 多态原理
简单string类c++实现
我的年度总结
【C语言】以深厚地基筑伟岸高楼-基础篇(六
c语言常见错误合集
上一篇文章      下一篇文章      查看所有文章
加:2021-10-11 17:20:07  更:2021-10-11 17:21:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 3:17:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码