1.线程池原理
线程池(thread pool)技术是指能够保证所创建的任一线程都处于繁忙状态,而不需要频繁地为了某一任务而创建和销毁线程,因为系统在创建和销毁线程时所耗费的cpu资源很大。如果任务很多,频率很高,为了单一一个任务而起线程而后销线程,那么这种情况效率相当低下的。线程池技术就是用于解决这样一种应用场景而应运而生的。 pthread线程池实现原理,首先创建多个线程组成线程池,其中每个线程都运行一个被阻塞的循环函数,等待管理者线程唤醒,被唤醒的线程通过函数指针来执行用户所指定的函数,其中线程的阻塞和唤醒使用信号量机制。 ps:该项目改编自知乎文章:https://zhuanlan.zhihu.com/p/44971598 ps:笔者在大佬代码的基础上修改了线程池销毁过程,当线程池中的任务未全部完成时,线程池不会被立刻销毁,直到任务全部执行完成。 ps:在提交的任务函数中不要有sleep这种带有线程取消点的函数,否则在sleep过程中,如果调用destroy_tpool会被主线程默认取消。
2.项目结构
提供的api有
一、创建线程池,create_tpool
二、销毁线程池,destroy_tpool
三、分派任务,add_task_2_tpool
项目源代码
tpool.h
#ifndef T_POOL
#define T_POOL
#include <pthread.h>
#include <ctype.h>
typedef struct tpool_work{
void* (*work_routine)(void*);
void* args;
struct tpool_work* next;
}tpool_work_t;
typedef struct tpool{
size_t shutdown;
size_t maxnum_thread;
pthread_t *thread_id;
tpool_work_t* tpool_head;
pthread_cond_t queue_ready;
pthread_mutex_t queue_lock;
}tpool_t;
int create_tpool(tpool_t** pool,size_t max_thread_num);
void destroy_tpool(tpool_t* pool);
int add_task_2_tpool(tpool_t* pool,void* (*routine)(void*),void* args);
#endif
tpool.c
#include "tpool.h"
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
int is_taskover(tpool_t *pool){
if(pool->tpool_head!=NULL)return 0;
return 1;
}
static void* work_routine(void* args)
{
tpool_t* pool = (tpool_t*)args;
tpool_work_t* work = NULL;
while(1){
pthread_mutex_lock(&pool->queue_lock);
while(!pool->tpool_head && !pool->shutdown){
pthread_cond_wait(&pool->queue_ready,&pool->queue_lock);
}
if(pool->shutdown){
pthread_mutex_unlock(&pool->queue_lock);
pthread_exit(NULL);
}
work = pool->tpool_head;
pool->tpool_head = (tpool_work_t*)pool->tpool_head->next;
pthread_mutex_unlock(&pool->queue_lock);
work->work_routine(work->args);
free(work);
}
return NULL;
}
int create_tpool(tpool_t** pool,size_t max_thread_num)
{
(*pool) = (tpool_t*)malloc(sizeof(tpool_t));
if(NULL == *pool){
printf("in %s,malloc tpool_t failed!,errno = %d,explain:%s\n",__func__,errno,strerror(errno));
exit(-1);
}
(*pool)->shutdown = 0;
(*pool)->maxnum_thread = max_thread_num;
(*pool)->thread_id = (pthread_t*)malloc(sizeof(pthread_t)*max_thread_num);
if((*pool)->thread_id == NULL){
printf("in %s,init thread id failed,errno = %d,explain:%s",__func__,errno,strerror(errno));
exit(-1);
}
(*pool)->tpool_head = NULL;
if(pthread_mutex_init(&((*pool)->queue_lock),NULL) != 0){
printf("in %s,initial mutex failed,errno = %d,explain:%s",__func__,errno,strerror(errno));
exit(-1);
}
if(pthread_cond_init(&((*pool)->queue_ready),NULL) != 0){
printf("in %s,initial condition variable failed,errno = %d,explain:%s",__func__,errno,strerror(errno));
exit(-1);
}
for(int i = 0; i < max_thread_num; i++){
if(pthread_create(&((*pool)->thread_id[i]),NULL,work_routine,(void*)(*pool)) != 0){
printf("pthread_create failed!\n");
exit(-1);
}
}
return 0;
}
int add_task_2_tpool(tpool_t* pool,void* (*routine)(void*),void* args)
{
tpool_work_t* work,*member;
if(!routine){
printf("rontine is null!\n");
return -1;
}
work = (tpool_work_t*)malloc(sizeof(tpool_work_t));
if(!work){
printf("in %s,malloc work error!,errno = %d,explain:%s\n",__func__,errno,strerror(errno));
return -1;
}
work->work_routine = routine;
work->args = args;
work->next = NULL;
pthread_mutex_lock(&pool->queue_lock);
member = pool->tpool_head;
if(!member){
pool->tpool_head = work;
}
else{
while(member->next){
member = (tpool_work_t*)member->next;
}
member->next =(tpool_work_t*)work;
}
pthread_cond_signal(&pool->queue_ready);
pthread_mutex_unlock(&pool->queue_lock);
return 0;
}
void destroy_tpool(tpool_t* pool)
{
tpool_work_t* tmp_work;
while(pool->shutdown || !is_taskover(pool));
pool->shutdown = 1;
pthread_mutex_lock(&pool->queue_lock);
pthread_cond_broadcast(&pool->queue_ready);
pthread_mutex_unlock(&pool->queue_lock);
for(int i = 0; i < pool->maxnum_thread; i++){
pthread_join(pool->thread_id[i],NULL);
}
free(pool->thread_id);
while(pool->tpool_head){
tmp_work = pool->tpool_head;
pool->tpool_head = (tpool_work_t*)pool->tpool_head->next;
free(tmp_work);
}
pthread_mutex_destroy(&pool->queue_lock);
pthread_cond_destroy(&pool->queue_ready);
free(pool);
}
demo.c
#include "tpool.h"
#include <stdio.h>
#include <unistd.h>
#include <time.h>
void* fun(void* args)
{
int thread = (int)args;
printf("running the thread of %d\n",thread);
return NULL;
}
int main(int argc, char* args[])
{
tpool_t* pool = NULL;
if(0 != create_tpool(&pool,5)){
printf("create_tpool failed!\n");
return -1;
}
for(int i = 0; i <= 1000; i++){
add_task_2_tpool(pool,fun,(void*)i);
}
destroy_tpool(pool);
return 0;
}
线程池完整源码见
https://github.com/trriger/pthreadpool
|