基础概念
线程池:?当进行并行的任务作业操作时,线程的建立与销毁的开销是,阻碍性能进步的关键,因此线程池,由此产生。使用多个线程,无限制循环等待队列,进行计算和操作。帮助快速降低和减少性能损耗。
线程池的组成
- 线程池管理器:初始化和创建线程,启动和停止线程,调配任务;管理线程池
- 工作线程:线程池中等待并执行分配的任务
- 任务接口:添加任务的接口,以提供工作线程调度任务的执行。
- 任务队列:用于存放没有处理的任务,提供一种缓冲机制,同时具有调度功能,高优先级的任务放在队列前面
线程池工作的四种情况
- 2. 队列中任务数量,小于等于线程池中线程任务数量
线程池的C++实现
线程池的主要组成有上面三个部分:
- 任务队列(Task Quene)
- 线程池(Thread Pool)
- 完成队列(Completed Tasks)
ThreadPool.h
// file: ThreadPool.h
//
// summary: Implements the Thread Pool
/**************************************************************************************************
C++11中的std::future是一个模板类。
std::future提供了一种用于访问异步操作结果的机制。
std::future所引用的共享状态不能与任何其它异步返回的对象共享(与std::shared_future相反)
一个future是一个对象,它可以从某个提供者的对象或函数中检索值,如果在不同的线程中,则它可以正确地同步此访问
**************************************************************************************************/
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args> //返回一个基于F的函数对象,其参数被绑定到Args上。
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
int get_workers_num()
{
return workers.size();
}
queue< std::function<void()> > get_tasks()
{
return tasks;
}
bool get_stop()
{
return stop;
}
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization 同步互斥
std::mutex queue_mutex; //锁
std::condition_variable condition; //条件变量
bool stop;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
// Add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}
#endif
Test.cpp?
#include <atomic>
#include <iostream>
#include <thread> //需要包含的头
#include <time.h>
#include <vector>
#include <queue>
#include "Thred_Poll.h"
using namespace std;
int st(int i) //线程函数
{
std::cout << "hello " << i << std::endl;
//std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i * i;
}
int main()
{
ThreadPool pool(4); //池中创建四个线程
std::vector< std::future<int> > results; //std::future 提供访问异步操作结果的机制
pool.enqueue(st, 1);
pool.enqueue(st, 2);
//for (int i = 0; i < 8; ++i) {
// results.emplace_back(
// pool.enqueue([i] {
// return st(i);
// })
// );
//}
for (auto && result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
|