#pragma once
#include <vector> #include <functional> #include <mutex> #include <chrono> #include <iostream>
//接口类 class IJob { public: ?? ?virtual void operator()() = 0; }; //无参,函数调用 class ZeroParam : public IJob { public: ?? ?typedef void(*Func)(); ?? ?void operator()() ?? ?{ ?? ??? ?m_function(); ?? ?} ?? ?void init(Func theFunc) ?? ?{ ?? ??? ?m_function = theFunc; ?? ?} private: ?? ?Func m_function; };
//一个参数,函数调用 template<typename param> class OneParam : public IJob { public: ?? ?typedef void(*Func)(param); ?? ?void operator()() ?? ?{ ?? ??? ?m_function(m_param); ?? ?} ?? ?void init(Func theFunc, param t) ?? ?{ ?? ??? ?m_function = theFunc; ?? ??? ?m_param = t; ?? ?} private: ?? ?Func m_function; ?? ?param m_param; };
//类的成员函数调用,一个参数 template<typename ClassType,typename param> class OneClassParam : public IJob { public: ?? ?typedef void (ClassType::*Func)(param); ?? ?void operator()() ?? ?{ ?? ??? ?(m_objName->*m_function)(m_param); ?? ?} ?? ?void init(ClassType *objName,Func theFunc, param t) ?? ?{ ?? ??? ?m_function = theFunc; ?? ??? ?m_param = t; ?? ??? ?m_objName = objName; ?? ?} private: ?? ?Func m_function; ?? ?param m_param; ?? ?ClassType* m_objName; }; //工作线程类 class Worker { public: ?? ?void run(); ?? ?bool GetBusy(); ?? ?void SetNumber(int id) ?? ?{ ?? ??? ?m_id = id; ?? ?} ?? ?bool addJob(void(*theFunc)());
?? ?template<typename param1> ?? ?bool addJob(void(*theFunc)(param1), param1 param);
?? ?template<typename ClassType,typename param1> ?? ?bool addJob(ClassType * obj,void(ClassType::*theFunc)(param1), param1 param); private:? ?? ?bool m_busy; ?? ?int m_id; ?? ?std::mutex m_mutex; ?? ?std::condition_variable m_condition; ?? ?std::vector<IJob *> m_vectJob; }; bool Worker::GetBusy() { ?? ?return m_busy; }
bool Worker::addJob(void(*theFunc)()) { ?? ?std::lock_guard<std::mutex> lock{ m_mutex }; ?? ?ZeroParam * zeroParam = new ZeroParam(); ?? ?zeroParam->init(theFunc); ?? ?m_vectJob.push_back(zeroParam); ?? ?m_condition.notify_one(); ?? ?return true; }
template<typename param1> bool Worker::addJob(void(*theFunc)(param1),param1 param) { ?? ?std::lock_guard<std::mutex> lock{ m_mutex }; ?? ?OneParam<param1> * oneParam = new OneParam<param1>; ?? ?oneParam->init(theFunc, param); ?? ?m_vectJob.push_back(oneParam); ?? ?m_condition.notify_one(); ?? ?return true; }
template<typename ClassType, typename param1> bool Worker::addJob(ClassType * obj, void(ClassType::*theFunc)(param1), param1 param) { ?? ?std::lock_guard<std::mutex> lock{ m_mutex }; ?? ?OneClassParam<ClassType,param1> * oneclassParam = new OneClassParam<ClassType,param1>; ?? ?oneclassParam->init(obj,theFunc, param); ?? ?m_vectJob.push_back(oneclassParam); ?? ?m_condition.notify_one(); ?? ?return true; } void Worker::run() { ?? ?while (true) ?? ?{ ?? ??? ?std::unique_lock<std::mutex> lock(m_mutex); ?? ??? ?if (m_vectJob.size() <= 0) ?? ??? ?{ ?? ??? ??? ?m_busy = false; ?? ??? ??? ?m_condition.wait_for(lock, std::chrono::seconds(10)); ?? ??? ?} ?? ??? ?else ?? ??? ?{ ?? ??? ??? ?printf("id=%d\n", m_id); ?? ??? ??? ?m_busy = true; ?? ??? ??? ?IJob * job = m_vectJob.back(); ?? ??? ??? ?(*job)(); ?? ??? ??? ?delete job; ?? ??? ??? ?job = NULL; ?? ??? ??? ?m_vectJob.pop_back(); ?? ??? ?} ?? ?} }
class MThreadPool { public: ?? ?static MThreadPool * GetInstace(); ?? ?void Init(int count); ?? ?void AddTask(void(*thefun)());
?? ?template<typename param1> ?? ?void AddTask(void(*thefun)(param1), param1 param);
?? ?template<typename ClassType, typename param1> ?? ?void AddTask(ClassType *obj,void(ClassType::*thefun)(param1), param1 param); ?? ?bool Start(); ?? ?bool Stop(); private: ?? ?MThreadPool();? private: ?? ?int m_count; ?? ?int m_currentID; ?? ?Worker *m_work;? };
MThreadPool * MThreadPool::GetInstace() { ?? ?static MThreadPool pool; ?? ?return &pool; }
void MThreadPool::Init(int count) { ?? ?this->m_count = count; ?? ?Start(); }
bool MThreadPool::Start() { ?? ?m_work = new Worker[m_count]; ?? ?for (int i = 0; i < m_count; i++) ?? ?{ ?? ??? ?m_work[i].SetNumber(i + 1); ?? ??? ?std::thread th(&Worker::run, &m_work[i]); ?? ??? ?th.detach(); ?? ??? ?std::this_thread::sleep_for(std::chrono::milliseconds(100)); ?? ?} ?? ?return true; }
void MThreadPool::AddTask(void(*thefun)()) { ?? ?m_work[m_currentID%m_count].addJob(thefun); ?? ?m_currentID++; }
template<typename param1> void MThreadPool::AddTask(void(*thefun)(param1),param1 param) {
?? ?m_work[m_currentID%m_count].addJob(thefun, param); ?? ?m_currentID++; }
template<typename ClassType, typename param1> void MThreadPool::AddTask(ClassType * obj, void(ClassType::*thefun)(param1), param1 param) { ?? ?m_work[m_currentID%m_count].addJob(obj,thefun, param); ?? ?m_currentID++; }
bool MThreadPool::Stop() {? ?? ?return false; }
MThreadPool::MThreadPool() { }
主函数调用使用示例
? #include <functional> #include <iostream> #include "MThreadPool.hpp"? #include <thread> #include <chrono>
void testdata(int x) { ?? ?for (size_t i = 0; i < x; i++) ?? ?{ ?? ??? ?printf("testdata \t %d\n", i); ?? ??? ?std::this_thread::sleep_for(std::chrono::milliseconds(50)); ?? ?} } void test1() { ?? ?for (size_t i = 0; i < 10; i++) ?? ?{ ?? ??? ?printf("test1 \t %d\n", i);? ?? ??? ?std::this_thread::sleep_for(std::chrono::milliseconds(50)); ?? ?} }
void test2() { ?? ?for (size_t i = 0; i < 10; i++) ?? ?{ ?? ??? ?printf("test2 \t %d\n", i); ? ?? ??? ?std::this_thread::sleep_for(std::chrono::milliseconds(50)); ?? ?} } int max(int x, int y) { ?? ?if (x > y) return x; ?? ?else ?return y; } class testClass { public: ?? ?testClass(); ?? ?~testClass();
?? ?void calcNumber(int n) ?? ?{ ?? ??? ?for (size_t i = 0; i < n; i++) ?? ??? ?{ ?? ??? ??? ?printf("testClass,%d\n", i); ?? ??? ??? ?std::this_thread::sleep_for(std::chrono::milliseconds(50)); ?? ??? ?} ?? ?} private:
};
testClass::testClass() { }
testClass::~testClass() { } int main(int argc, char *argv[]) { ?? ?MThreadPool::GetInstace()->Init(5);
?? ?MThreadPool::GetInstace()->AddTask(&testdata,10); ?? ?MThreadPool::GetInstace()->AddTask(&test1); ?? ?testClass c; ?? ?MThreadPool::GetInstace()->AddTask(&c,&testClass::calcNumber,10); ?? ?getchar(); ? ? return true; }
写得很简单,希望大神们多多指正。 ?
|