概述
1、一个线程 pool,所有线程阻塞等待唤醒(有任务时唤醒) 2、任务队列 queue,队列中添加任务后就唤醒线程,线程从队头取走任务执行,典型的生产者-消费者模型。 3、mutex对队列上锁, 保证队列任务添加和取走的同步性 4、当线程数不足时可以动态增加线程数量。
代码
threadpool.hpp头文件
#pragma once
#include <iostream>
#include<stdlib.h>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<vector>
#include<functional>
#include<queue>
#include<atomic>
#define N 10
#define THREADPOOL_MAX_NUM 500
using namespace std;
class ThreadPool {
public:
using Task = function<void()>;
size_t initnum;
vector<thread> _pool;
queue<Task> _tasks;
mutex _mutex;
condition_variable _cond;
atomic<bool> _run;
atomic<int> _idlThreadNum;
public:
ThreadPool(int cnt = N) :_run(true), _idlThreadNum(cnt)
{
addThread(cnt);
}
~ThreadPool()
{
_run = false;
_cond.notify_all();
for (thread& thrd : _pool) {
if (thrd.joinable())
thrd.join();
}
}
public:
void addTask(const Task& f)
{
if (_run) {
unique_lock<mutex>lk(_mutex);
_tasks.push(f);
cout << "---------Add a task---------" << endl;
if (_idlThreadNum < 1 && _pool.size() < THREADPOOL_MAX_NUM)
{
addThread(1);
cout << "new thread" << endl;
}
_cond.notify_one();
}
}
void addThread(int num)
{
for (; num > 0 && _pool.size() < THREADPOOL_MAX_NUM; num--) {
_pool.push_back(thread(&ThreadPool::runTask, this));
}
}
void runTask()
{
while (_run) {
Task task;
{
unique_lock<mutex> lock(_mutex);
_cond.wait(lock, [this] {
return !_run || !_tasks.empty();
});
if (!_run && _tasks.empty())
return;
task = move(_tasks.front());
_tasks.pop();
}
_idlThreadNum--;
task();
_idlThreadNum++;
}
}
};
测试程序
#include<iostream>
#include"threadpool.hpp"
using namespace std;
void func(int i) {
cout << "pthread_id=" << this_thread::get_id() << endl;
cout << "task id" << "------>" << i << endl;
this_thread::sleep_for(chrono::seconds(rand() % 5));
}
int main()
{
srand(time(nullptr));
ThreadPool p(5);
int i = 0;
int num = 20;
while (num--) {
i++;
this_thread::sleep_for(chrono::milliseconds(100));
auto task = bind(func, i);
p.addTask(task);
}
return 0;
}
测试结果:
|