本章主要将基于C++11实现的线程池,暂时没有考虑线程的动态申请和注销(可以根据自己需求实现)。线程数量值建议:cpu核数*2+1。线程过多会导致线程切换浪费大量资源,线程数量过少可能导致并发性能下降。
ThreadPool.h
#pragma once
#include <memory>
#include <queue>
#include <vector>
#include <atomic>
#include <thread>
#include <condition_variable>
namespace Jef{
class ITask{
public:
virtual ~ITask(){};
virtual void run() = 0;
};
class Semaphore{
public:
Semaphore(unsigned long init_sem = 0);
bool wait(int wait_milliseconds = 0x7fffffff);
void signal();
private:
std::mutex _mtx;
std::condition_variable _cond;
unsigned long _count;
};
class ThreadPool
{
public:
ThreadPool(int thread_num);
virtual ~ThreadPool();
virtual void on_idle();
void start();
void stop();
void addTask(std::shared_ptr<ITask> task);
protected:
std::shared_ptr<ITask> pop_task();
void add_thread();
void release_thread();
void _run();
private:
Semaphore _sem;
int _thread_num;
std::mutex _mtx;
std::atomic_bool _is_exit;
std::queue<std::shared_ptr<ITask> > _task;
std::vector<std::thread> _threads;
};
};
ThreadPool.cpp
#include "stdafx.h"
#include "ThreadPool.h"
#include <iostream>
namespace Jef{
Semaphore::Semaphore(unsigned long init_sem){
_count = init_sem;
}
bool Semaphore::wait(int wait_milliseconds)
{
std::unique_lock<std::mutex> lk(_mtx);
if (_cond.wait_for(lk, std::chrono::milliseconds(wait_milliseconds), [&](){return _count > 0; }))
{
--_count;
return true;
}
return false;
}
void Semaphore::signal()
{
std::unique_lock<std::mutex> lk(_mtx);
++_count;
_cond.notify_one();
}
ThreadPool::ThreadPool(int thread_num)
{
_thread_num = thread_num;
_is_exit = false;
}
ThreadPool::~ThreadPool()
{
stop();
}
void ThreadPool::start()
{
for (int i = 0; i < _thread_num; ++i)
{
add_thread();
}
}
void ThreadPool::stop()
{
_is_exit = true;
for (auto &it : _threads)
{
if (it.joinable())
{
it.join();
}
}
}
void ThreadPool::addTask(std::shared_ptr<ITask> task)
{
{
std::lock_guard<std::mutex> lk(_mtx);
_task.push(task);
}
_sem.signal();
}
std::shared_ptr<ITask> ThreadPool::pop_task()
{
std::shared_ptr<ITask> task = nullptr;
std::lock_guard<std::mutex> lk(_mtx);
if (!_task.empty())
{
task = _task.front();
_task.pop();
}
return task;
}
void ThreadPool::add_thread()
{
std::thread t = std::thread(std::bind(&Jef::ThreadPool::_run, this));
_threads.push_back(std::move(t));
}
void ThreadPool::release_thread()
{
}
void ThreadPool::_run()
{
while (!_is_exit)
{
if (_sem.wait(100))
{
std::shared_ptr<ITask> task = pop_task();
if (task != nullptr)
{
task->run();
}
}
else
{
on_idle();
}
}
std::ostringstream oss;
oss << std::this_thread::get_id();
std::string stid = oss.str();
unsigned long long tid = std::stoull(stid);
LOG_INFO_F(_T("thread:[%I64d] exit"), tid);
}
void ThreadPool::on_idle()
{
std::ostringstream oss;
oss << std::this_thread::get_id();
std::string stid = oss.str();
unsigned long long tid = std::stoull(stid);
LOG_INFO_F(_T("thread:[%I64d] on_idle"), tid);
}
}
基本使用:
class TaskA :public Jef::ITask{
public:
TaskA(){
}
TaskA(int a, int b)
{
this->a = a;
this->b = b;
}
virtual void run(){
cout << a << "----" << b << "---" << "TaskA" << endl;
}
private:
int a;
int b;
};
Jef::ThreadPool pool(4);
pool.start()
pool.addTask(shared_ptr<TaskA>(new TaskA(a, i)));
|