什么是线程池
线程池就是一个容纳多个线程的容器,对于一线线程我们可以多次对此线程进行重复使用,从而省去频繁创建线程对象的操作。
为什么要使用线程池
频繁的进行进程的创建与销毁将带来很多开销。不但如此,进程间频繁的切换也将减低 CPU 的利用率。 如果能复用之前创建的进程,而不是为每个并发任务创建一个进程,能有效降低进程创建与销毁的开销并减少进程间切换,从而减少对 CPU 资源的浪费。 虽然线程创建与销毁的代价小于进程创建与销毁,隶属同一进程的线程间切换的代价也小于进程间切换,但复用之前创建的线程,也能有效降低线程创建与销毁的开销并减少线程间切换,从而减少对 CPU 资源的浪费。 当任务很多的时,我们就可以调用线程池,从而有效的的对CPU资源的浪费。 图解:
线程池的工作流程图
任务队列
我们应该以任务队列的形式存储任务,线程池中的线程只需要从任务队列中拿任务就可以了
typedef struct task
{
void(*run)(void*arg);
void*arg;
struct task *next;
}task;
这样我们就把任务队列创建好了。
线程池的创建
typedef struct threadpool
{
task*first;
task*end;
int threadNUM;
int tasksize;
pthread_mutex_t mutexpool;
pthread_cond_t notempty;
int shutdown;
}threadpool;
mutexpool是一个互斥锁,他的作用就是,当某一个线程需要拿任务或者要修改线程池中的某个成员时,为了防止其他线程同时也在对线程池进行访问,从而导致数据错乱,我们就需要使用互斥锁,让某个线程在访问线程池的时候将线程池锁住,不让其他线程进行访问。 notempty是一个条件变量,其作用就是当任队列为空时,我们就需要拿该条件变量将线程阻塞,当有新任务添加进来时,再对线程进行唤醒。
此时我们已经创建出了线程池,我们接下来所需要实现的接口。
threadpool*threadpoolinit(int nmberu);
int threadpooldestroy(threadpool*pool);
void threadpoolAdd(threadpool*pool,void(*run)(void*),void*arg);
void*worker(void*arg);
线程池的初始化
我们所要完成的任务就是,使用malloc函数将线程在堆区创建出来,然后对线程池中的互斥锁和条件变量进行初始化时。 然后对一些成员附上初始值。 线程池初始化时,还有很重要的一步,就是创建线程。 代码如下:
threadpool *threadpoolinit(int number)
{
threadpool *pool = (threadpool *)malloc(sizeof(threadpool));
if (pool == NULL)
{
printf("malloc error\n");
return NULL;
}
pool->threadNUM = number;
pool->tasksize=0;
pool->first = NULL;
pool->end = NULL;
pthread_mutex_init(&pool->mutexpool, NULL);
pthread_cond_init(&pool->notempty, NULL);
pool->shutdown=0;
for (int i = 0; i < number; i++)
{
pthread_t tid;
pthread_create(&tid, NULL, worker, pool);
}
return pool;
}
工作者
工作者所担任的任务就是,执行任务。当没有任务时,线程就进入阻塞状态。
void *worker(void *arg)
{
threadpool *pool = (threadpool *)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexpool);
while (pool->first == NULL && !pool->shutdown)
{
pthread_cond_wait(&pool->notempty, &pool->mutexpool);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexpool);
pthread_exit(NULL);
}
task *t = pool->first;
pool->first = t->next;
pool->tasksize--;
pthread_mutex_unlock(&pool->mutexpool);
t->run(t->arg);
free(t);
t=NULL;
}
return NULL;
}
当线程池收到销毁命令时,也就是pool->shutdown==1(后面会有对应接口),解锁(防止进入死锁),然后用pthread_exit函数进行终止线程。
添加任务
void threadpoolAdd(threadpool*pool,void(*run)(void*),void*arg)
{
if(pool->shutdown)
{
return;
}
task*t=(task*)malloc(sizeof(task));
t->arg=arg;
t->run=run;
t->next=NULL;
pthread_mutex_lock(&pool->mutexpool);
if(pool->first==NULL)
{
pool->first=t;
pool->end=t;
}
else
{
pool->end->next=t;
pool->end=t;
}
pool->tasksize++;
pthread_cond_signal(&pool->notempty);
pthread_mutex_unlock(&pool->mutexpool);
}
线程池的销毁
线程池中下线程的销毁,并不是在该函数中销毁的,而是给线程一个信号,也就是 pool->shutdown=1,然后让线程自己销毁
int threadpooldestroy(threadpool*pool)
{
if(pool==NULL)
{
return -1;
}
pool->shutdown=1;
for(int i=0;i<pool->threadNUM;i++)
{
pthread_cond_signal(&pool->notempty);
}
pthread_mutex_destroy(&pool->mutexpool);
pthread_cond_destroy(&pool->notempty);
free(pool);
pool=NULL;
return 0;
}
完整代码
threadpool.h
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
typedef struct task
{
void(*run)(void*arg);
void*arg;
struct task *next;
}task;
typedef struct threadpool
{
task*first;
task*end;
int threadNUM;
int tasksize;
pthread_mutex_t mutexpool;
pthread_cond_t notempty;
int shutdown;
}threadpool;
threadpool*threadpoolinit(int nmberu);
int threadpooldestroy(threadpool*pool);
void threadpoolAdd(threadpool*pool,void(*run)(void*),void*arg);
void*worker(void*arg);
pthreadpool.c
#include "threadpool.h"
void *worker(void *arg)
{
threadpool *pool = (threadpool *)arg;
while (1)
{
pthread_mutex_lock(&pool->mutexpool);
while (pool->first == NULL && !pool->shutdown)
{
pthread_cond_wait(&pool->notempty, &pool->mutexpool);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexpool);
pthread_exit(NULL);
}
task *t = pool->first;
pool->first = t->next;
pool->tasksize--;
pthread_mutex_unlock(&pool->mutexpool);
t->run(t->arg);
free(t);
t=NULL;
}
return NULL;
}
threadpool *threadpoolinit(int number)
{
threadpool *pool = (threadpool *)malloc(sizeof(threadpool));
if (pool == NULL)
{
printf("malloc error\n");
return NULL;
}
pool->threadNUM = number;
pool->tasksize=0;
pool->first = NULL;
pool->end = NULL;
pthread_mutex_init(&pool->mutexpool, NULL);
pthread_cond_init(&pool->notempty, NULL);
pool->shutdown=0;
for (int i = 0; i < number; i++)
{
pthread_t tid;
pthread_create(&tid, NULL, worker, pool);
}
return pool;
}
void threadpoolAdd(threadpool*pool,void(*run)(void*),void*arg)
{
if(pool->shutdown)
{
return;
}
task*t=(task*)malloc(sizeof(task));
t->arg=arg;
t->run=run;
t->next=NULL;
pthread_mutex_lock(&pool->mutexpool);
if(pool->first==NULL)
{
pool->first=t;
pool->end=t;
}
else
{
pool->end->next=t;
pool->end=t;
}
pool->tasksize++;
pthread_cond_signal(&pool->notempty);
pthread_mutex_unlock(&pool->mutexpool);
}
int threadpooldestroy(threadpool*pool)
{
if(pool==NULL)
{
return -1;
}
pool->shutdown=1;
for(int i=0;i<pool->threadNUM;i++)
{
pthread_cond_signal(&pool->notempty);
}
pthread_mutex_destroy(&pool->mutexpool);
pthread_cond_destroy(&pool->notempty);
free(pool);
pool=NULL;
return 0;
}
总结
通过此次任务,学会了线程的创建销毁与使用,学会了互斥锁,条件变量的使用。完成本次任务的过程还算不错,通过查资料,看视频,翻阅大佬们博客,完成本次任务。
|