转载:醍醐灌顶全方位击破C++线程池及异步处理 - 知乎 (zhihu.com)
重点:
转载的代码有点乱,他有两种方法,只测试了第二种方法。
代码是看了,但无法验证这个线程池的暂停是否有效。等后续再想想,测试暂停的有效性。
Threadpool.h
#pragma once
#include <functional>
#include <thread>
#include <queue>
#include <condition_variable>
#include <future>
using namespace std;
using Task = function<void()>;
class ThreadPool
{
public:
ThreadPool(size_t size = 4);
~ThreadPool();
public:
template<typename T, typename...Args>
auto Commit(T&& t, Args&&...args)->future<decltype(t(args...))>
{
if (m_stop.load())
{
throw runtime_error("task has closed commit");
}
using ResType = decltype(t(args...));
auto task = make_shared<packaged_task<ResType()>>(
bind(forward<T>(t), forward<Args>(args)...));
unique_lock<mutex> lock(mu);
m_tasks.emplace([task]() {
(*task)();
});
m_cv.notify_all(); //唤醒等待线程
future<ResType> fu = task->get_future();
return fu;
}
public:
void ShutDown(); //停止任务提交
void Restart(); //重启任务提交
private:
Task GetOneTask();//获取一个待执行的task
void Schedual(); //任务调度
private:
vector<thread> m_pool;
mutex mu;
queue<Task> m_tasks;
condition_variable m_cv;
atomic<bool> m_stop;
};
ThreadPool.cpp
#include "ThreadPool.h"
#include <future>
ThreadPool::ThreadPool(size_t size) :m_stop{false}
{
size = size < 1 ? 1 : size;
for (size_t i=0;i<size;++i)
{
m_pool.emplace_back(&ThreadPool::Schedual, this);
}
}
ThreadPool::~ThreadPool()
{
for (auto&t:m_pool)
{
t.detach(); //让线程自身自灭
//t.join(); //等任务结束,前提:线程一定会执行完
}
}
void ThreadPool::ShutDown()
{
m_stop.store(true);//对内存进行访问memory_order_seq_cst,采用store
}
void ThreadPool::Restart()
{
m_stop.store(false);//对内存进行访问memory_order_seq_cst,采用store
}
Task ThreadPool::GetOneTask()
{
unique_lock<mutex> lock(mu);
m_cv.wait(lock, [this] {return !m_tasks.empty(); });
Task task(move(m_tasks.front()));
m_tasks.pop();
return task;
}
void ThreadPool::Schedual()
{
while (true)
{
if (Task task =GetOneTask())
{
task();
}
else
{
return; //结束
}
}
}
Test.cpp
// Test.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include <iostream>
#include <future>
#include "ThreadPool.h"
using namespace std;
void fun()
{
for (int i = 0; i < 100000; ++i)
{
cout << "hello"<<i << endl;
}
}
struct Gan
{
int operator()() {
cout << "hello,gan" << endl;
return 42;
}
};
int main() {
try
{
ThreadPool task(10);
future<void> ff = task.Commit(fun);
future<int> fg = task.Commit(Gan());
future<string> fs = task.Commit([]()->string {
return "hello,fs";
});
task.ShutDown();
ff.get();
cout << "fg.get : " << fg.get ()<< endl;
this_thread::sleep_for(chrono::seconds(5));
task.Restart(); //重启任务
cout << "end " << endl;
return 0;
}
catch (const std::exception& e)
{
cout << "soming is wrong "<< e.what() << endl;
}
return 0;
}
|