#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <iostream>
#include <thread>
#include <functional>
#include <queue>
#include <vector>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <chrono>
class CThreadPool
{
using Task = std::function<void()>;
private:
CThreadPool();
~CThreadPool();
CThreadPool(const CThreadPool&) = delete;
CThreadPool& operator=(const CThreadPool&) = delete;
public:
static CThreadPool& GetInstance();
void SetMaxThreadNum(const int iCount);
void OnStart();
void OnStop();
void AppendTask(const Task& _task);
private:
void DoWork();
private:
std::atomic_bool m_bIsRunning{false};
unsigned short m_usThreadNum{2};
std::vector<std::thread> m_vctThreads;
std::queue<Task> m_queTasks;
std::condition_variable m_cv;
std::mutex m_mtx;
};
#endif /* THREADPOOL_H */
#include "../include/threadpool.h"
CThreadPool::CThreadPool()
{
}
CThreadPool::~CThreadPool()
{
OnStop();
}
CThreadPool& CThreadPool::GetInstance()
{
static CThreadPool instance;
return instance;
}
void CThreadPool::SetMaxThreadNum(const int _iCount)
{
m_usThreadNum = _iCount;
}
void CThreadPool::OnStart()
{
m_bIsRunning = true;
for (int iIndex = 0; iIndex < m_usThreadNum; ++iIndex)
{
m_vctThreads.emplace_back(std::thread(&CThreadPool::DoWork, this));
}
}
void CThreadPool::OnStop()
{
{
std::unique_lock<std::mutex> ulock(m_mtx);
m_bIsRunning = false;
m_cv.notify_all();
}
for (auto& td : m_vctThreads)
{
if (td.joinable())
{
td.join();
}
}
}
void CThreadPool::AppendTask(const Task& _task)
{
std::unique_lock<std::mutex> ulock(m_mtx);
m_queTasks.push(_task);
m_cv.notify_one();
}
void CThreadPool::DoWork()
{
while(m_bIsRunning)
{
Task task;
{
std::unique_lock<std::mutex> ulock(m_mtx);
if (!m_queTasks.empty())
{
task = m_queTasks.front();
m_queTasks.pop();
}
else if (m_queTasks.empty() && m_bIsRunning)
{
m_cv.wait(ulock);
}
}
if (task)
{
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
task();
std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
std::cout << (std::chrono::time_point_cast<std::chrono::microseconds>(end) -
std::chrono::time_point_cast<std::chrono::microseconds>(start)).count() /1000000.00<< std::endl;
}
}
}
#include "include/threadpool.h"
std::mutex mtx;
void Test(int iValue)
{
std::unique_lock<std::mutex> ulock(mtx);
std::cout<<std::this_thread::get_id()<<" thread execute...."<<iValue<<std::endl;
}
int main(int argc, char* argv[])
{
CThreadPool::GetInstance().SetMaxThreadNum(4);
CThreadPool::GetInstance().OnStart();
for (int i=0; i<100; i++)
{
CThreadPool::GetInstance().AppendTask(std::bind(Test, i));
}
std::this_thread::sleep_for(std::chrono::seconds(10));
CThreadPool::GetInstance().OnStop();
return 0;
}
|