前言
本人菜鸟一枚,最近在学c++11多线程特性(也是在看面试八股文),看了网上很多代码后手写了一个线程池。代码中如有错误之处,希望各位大佬不吝赐教。
一、用到的c++11
- mutex互斥锁
- condition_variable 条件变量
- functional 函数包装器
- atomic 原子操作
- thread 线程类
- 其它一些c++11常用技术
二、代码
1.threadPool.h
代码如下:
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <vector>
#include <atomic>
using Task=std::function<void()>;
class Thread_pool{
public:
Thread_pool(int max,int min);
~Thread_pool();
void add_work_queue(Thread_pool* pool,Task task);
static void worker_fun(void* arg);
static void manager_fun(void* arg);
bool destroy();
int get_threads_num();
private:
std::queue<Task> task_queue;
std::vector<std::thread> work_threads;
std::thread manager_thread;
int max_num;
int min_num;
std::atomic_int cur_num;
std::atomic_int cur_work_num;
bool isShutdown;
std::mutex task_queue_mutex;
std::condition_variable task_queue_cond;
};
#endif
2.threadPool.cpp
代码如下:
#include "threadPool.h"
#include <iostream>
Thread_pool::Thread_pool(int max=15,int min=3)
{
this->max_num=max;
this->min_num=min;
this->cur_work_num=0;
this->cur_num=min;
this->isShutdown=false;
manager_thread=std::thread(Thread_pool::manager_fun,this);
for(int i=0;i<min;i++){
work_threads.emplace_back(std::thread(Thread_pool::worker_fun,this));
}
}
void Thread_pool::add_work_queue(Thread_pool* pool,Task task)
{
if(pool->isShutdown)
return;
std::unique_lock<std::mutex> lck(pool->task_queue_mutex);
pool->task_queue.push(task);
pool->task_queue_cond.notify_one();
}
void Thread_pool::manager_fun(void* arg)
{
Thread_pool* p=(Thread_pool*)arg;
while(1){
if(p->cur_num<=0)
break;
std::this_thread::sleep_for(std::chrono::seconds(3));
int thread_num=p->cur_num.load();
int work_num=p->cur_work_num.load();
if(work_num>=thread_num*0.8 && thread_num*1.5<=p->max_num){
int new_num=thread_num*0.5;
while(new_num--){
p->work_threads.emplace_back(std::thread(Thread_pool::worker_fun,p));
p->cur_num++;
}
}
}
std::cout<<"manager_thread exit\n";
}
void Thread_pool::worker_fun(void* arg)
{
Thread_pool* p=(Thread_pool*) arg;
while(1){
if(p->task_queue.empty()&&p->isShutdown)
break;
std::unique_lock<std::mutex> lck(p->task_queue_mutex);
p->task_queue_cond.wait(lck,[=](){return p->task_queue.size()>0||p->isShutdown;});
if(p->task_queue.empty())
break;
auto t=p->task_queue.front();
p->task_queue.pop();
lck.unlock();
p->cur_work_num++;
t();
p->cur_work_num--;
}
p->cur_num--;
std::cout<<"worker_thread exit\n";
}
bool Thread_pool::destroy()
{
this->isShutdown=true;
this->task_queue_cond.notify_all();
for(int i=0;i<this->work_threads.size();i++){
work_threads[i].join();
}
this->manager_thread.join();
return true;
}
int Thread_pool::get_threads_num()
{
return this->cur_num.load();
}
Thread_pool::~Thread_pool()
{
}
3.main.cpp
#include "threadPool.h"
#include <iostream>
#include <pthread.h>
std::mutex m;
void fun()
{
std::unique_lock<std::mutex> lck(m);
std::cout<<"Thread: "<<pthread_self()<<"is working--"<<rand()%20<<std::endl;
lck.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
int main()
{
srand(time(nullptr));
Task Func;
Thread_pool *pool=new Thread_pool(15,3);
std::cout<<pool->get_threads_num()<<std::endl;
for(int i=0;i<1000;i++){
Func=fun;
pool->add_work_queue(pool,fun);
}
pool->destroy();
delete pool;
return 0;
}
总结
期间经历了一个处理死锁的过程: 当主线程发出销毁线程池的信号后,不再给任务队列添加新任务,如果此时任务队列为空,会导致正处于条件变量wait状态的线程死锁!(最后加了个判断操作 得以解决)
|