#include<iostream>
#include <vector>
#include <pthread.h>
#include <atomic>
using namespace std;
class CThreadPool
{
public:
CThreadPool()
{
}
~CThreadPool()
{
}
public:
bool Create(int threadNum);
void StopAll();
void Call();
private:
static void* ThreadFunc(void *threadData);
private:
struct ThreadItem
{
pthread_t _Handle;
CThreadPool *_pThis;
bool ifrunning;
ThreadItem(CThreadPool *pthis):_pThis(pthis),ifrunning(false)
{
}
~ThreadItem()
{
}
};
private:
static pthread_mutex_t m_pthreadMutex;
static pthread_cond_t m_pthreadCond;
static bool m_shutdown;
int m_iThreadNum;
std::atomic<int> m_iRunningThreadNum;
std::vector<ThreadItem *> m_threadVector;
std::list<char *> m_MsgRecvQueue;
};
bool CThreadPool::Create(int threadNum)
{
ThreadItem *pnew;
m_iThreadNum = threadNum;
int ret = 0;
for(int i = 0; i < m_iThreadNum; i++)
{
m_threadVector.push_back(pnew = new ThreadItem(this));
ret = pthread_create(&pnew->_Handle, NULL, ThreadFunc, pnew);
if(ret !=0 )
{
printf("create error\n");
}
}
std::vector<ThreadItem *>::iterator pos;
lbfor:
for(pos = m_threadVector.begin(); pos != m_threadVector.end(); pos++)
{
if((*pos)->ifrunning == false)
{
usleep(100 * 1000);
goto lbfor;
}
}
return true;
}
void* CThreadPool::ThreadFunc(void* threadData)
{
ThreadItem *pThread = static_cast<ThreadItem*>(threadData);
CThreadPool *pThreadPoolObj = pThread->_pThis;
CMemory *p_memory = CMemory::GetInstance();
int err;
pthread_t tid = pthread_self();
while(true)
{
err = pthread_mutex_lock(&m_pthreadMutex);
if(err != 0)
{
}
while((pThreadPoolObj->m_MsgRecvQueue.size() == 0) && m_shutdown == false)
{
if(pThread->ifrunning == false)
{
pThread->ifrunning = true;
}
pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
}
if(m_shutdown)
{
pthread_mutex_unlock(&m_pthreadMutex);
break;
}
err = pthread_mutex_unlock(&m_pthreadMutex);
if(err != 0)
{
}
++pThreadPoolObj->m_iRunningThreadNum;
--pThreadPoolObj->m_iRunningThreadNum;
}
return (void*)0;
}
void CThreadPool::StopAll()
{
if(m_shutdown == true)
{
return;
}
m_shutdown = true;
int err = pthread_cond_broadcast(&m_pthreadCond);
if(err != 0)
{
return;
}
std::vector<ThreadItem*>::iterator iter;
for(iter = m_threadVector.begin(); iter != m_threadVector.end(); iter++)
{
pthread_join((*iter)->_Handle, NULL);
}
pthread_mutex_destroy(&m_pthreadMutex);
pthread_cond_destroy(&m_pthreadCond);
for(iter = m_threadVector.begin(); iter != m_threadVector.end(); iter++)
{
if(*iter)
delete *iter;
}
m_threadVector.clear();
return;
}
void CThreadPool::Call()
{
int err = pthread_cond_signal(&m_pthreadCond);
if(err != 0 )
{
}
}
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;
bool CThreadPool::m_shutdown = false;
CThreadPool g_threadpool;
int main(void)
{
}
|