为了在Linux C聊天室项目里植入线程池,特地肝了线程池。后面完善了聊天室也会分享出来。现将代码分享,备注思路详细,本小白的代码小白应该也能看懂!最后,欢迎纠错指正!
直接上代码
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#define LL_ADD(item,list) \
item->prev = NULL; \
item->next = list; \
if(list != NULL)list->prev = item; \
list = item; \
#define LL_REMOVE(item,list) \
if(item->prev != NULL) item->prev->next = item->next; \
if(item->next != NULL) item->next->prev = item->prev; \
if(list == item) list = item->next; \
item->prev = item->next = NULL; \
struct NWORKER{
pthread_t thread;
struct NMANAGER *pool;
int num;
int terminate;
struct NWORKER *prev;
struct NWORKER *next;
};
struct NJOB{
void (*func)(void *arg);
void *user_data;
int num;
struct NJOB *prev;
struct NJOB *next;
};
typedef struct NMANAGER{
struct NWORKER *workers;
struct NJOB *jobs;
int jobs_num;
pthread_cond_t jobs_cond;
pthread_mutex_t jobs_mutex;
}nThreadPool;
static int count = 0;
static void *nThreadCallback(void *arg)
{
struct NWORKER *worker = (struct NWORKER*)arg;
while(1)
{
pthread_mutex_lock(&worker->pool->jobs_mutex);
while(NULL == worker->pool->jobs)
{
if(worker->terminate) break;
pthread_cond_wait(&worker->pool->jobs_cond,&worker->pool->jobs_mutex);
}
if(worker->terminate)
{
pthread_mutex_unlock(&worker->pool->jobs_mutex);
break;
}
struct NJOB *job = worker->pool->jobs;
LL_REMOVE(job,worker->pool->jobs);
count++;
job->user_data = &count;
printf("\033[35mcallBack:打印:%d \n\033[0m",count);
printf("callBack:线程 %d 收到新任务 %d ,开始执行\n",worker->num,job->num);
job->func((void *)job);
printf("callBack:线程 %d 完成任务 %d \n\n",worker->num,job->num);
usleep(500000);
pthread_mutex_unlock(&worker->pool->jobs_mutex);
usleep(100000);
}
printf("线程退出, ptid:%ld\n",pthread_self());
free(worker);
pthread_exit(NULL);
}
int nThreadPoolCreate(nThreadPool *pool,int numWorkers)
{
if(numWorkers < 1) numWorkers = 1;
if(NULL == pool) return -1;
memset(pool, 0, sizeof(nThreadPool));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));
pool->jobs_num = 0;
for(int i = 1; i <= numWorkers; i++)
{
struct NWORKER *worker = (struct NWORKER*)malloc(sizeof(struct NWORKER));
if(worker == NULL)
{
perror("malloc");
return -1;
}
memset(worker, 0, sizeof(struct NWORKER));
worker->pool = pool;
worker->num = i;
int ret = pthread_create(&worker->thread,NULL,nThreadCallback, worker);
if(ret)
{
perror("pthread_create");
free(worker);
return -3;
}
LL_ADD(worker,pool->workers);
}
return 0;
}
int nThreadPoolDestroy(nThreadPool *pool)
{
struct NWORKER *worker = NULL;
for(worker = pool->workers; worker != NULL; worker = worker->next)
{
worker->terminate = 1;
}
pthread_mutex_lock(&pool->jobs_mutex);
pthread_cond_broadcast(&pool->jobs_cond);
pthread_mutex_unlock(&pool->jobs_mutex);
}
void nThreadPoolPush(nThreadPool *pool,struct NJOB *job)
{
pthread_mutex_lock(&pool->jobs_mutex);
LL_ADD(job,pool->jobs);
printf("\n线程池里添加了一个新任务, 任务号:%d\n",job->num);
pthread_cond_signal(&pool->jobs_cond);
pthread_mutex_unlock(&pool->jobs_mutex);
}
#if 1
void func(void *arg)
{
struct NJOB *job = (struct NJOB *)arg;
int count = *(int *)job->user_data;
printf("\033[36mfunc:");
printf(" 打印count:%d ,任务号为:%d\n\033[0m",count,job->num);
}
int main()
{
nThreadPool *pool = (nThreadPool *)malloc(sizeof(nThreadPool));
if(0 != nThreadPoolCreate(pool,10))
{
perror("nThreadPoolCreate");
exit(1);
}
for(int i = 1; i <= 100; i++)
{
struct NJOB *job = (struct NJOB *)malloc(sizeof(struct NJOB));
job->num = i;
job->func = func;
nThreadPoolPush(pool,job);
}
while(pool->jobs != NULL);
usleep(100000);
printf("\n\n\n所有任务已完成!\n");
nThreadPoolDestroy(pool);
usleep(100000);
return 0;
}
#endif
#Ubuntu下运行结果(截取部分):
|