一、引言:通常情况下,多线程编程都是在需要的时候,创建一个新线程,然后让这个线程去完成指定的任务, ?? ?完成之后退出(线程相当于是短工),一般情况下是能够满足我们的需求。 ?? ?但是当我们需要创建大量的线程的时候,并且执行一个非常简单的任务后线程需要销毁时,比如: ?? ??? ?文件夹的COPY ?? ??? ?网页服务器的响应 ?? ??? ?Email邮件服务器等等 ?? ??? ?那么像上面这些情况下,我们的程序需要面对大量的请求,但是同时这些请求执行的任务又非常的 ?? ??? ?简单,占用的时间非常少,程序就有可能处于不停的创建线程和销毁线程的状态。 ?? ??? ? ?? ??? ?如果我们需要频繁的创建或者销毁线程,并每一个线程的任务时间占比非常少,则 ?? ??? ?程序的大部分CPU都有可能浪费在线程的开销上面。 ?? ??? ? ?? ??? ?为了解决这个问题,我们引入了线程池: ?? ??? ??? ?线程池可以降低频繁创建和销毁线程带来的实际开销。
二、简介:线程池是一种多线程处理形式,大多用于高并发服务器上,它能合理有效的利用高并发服务器上的线程资源;处理方式是将需要线程执行的任务添加到“任务队列”。?然后创建一些线程去自动完成“任务队列”上的任务。
?一个线程池包括以下四个基本组成部分: ??????????????? 1、管理者线程(admin_thread):用于定时判断任务队列中任务个数以及线程个数,创建或销毁线程使其与任务队列达到平衡。主要功能包括创建,销毁线程,后续也可根据开发者需求添加新的任务,如任务队列执行完为空时检测任务队列为空为其添加新任务 ??????????????? 2、工作线程(Pthread_pool):线程池中线程,个数不定,在没有任务时处于等待状态,可以循环的执行任务; ??????????????? 3、任务调配(Routine):每个线程必须必须执行的函数,以供任务调配函数调度任务的执行,它主要规定了任务的入口,任务的执行状态,以及唤醒线程,分配任务等功能; ??????????????? 4、任务队列(tasklist):用于存放没有处理的任务。以链式或队列形式存放。可动态增加。 ????????????????
三、技术思路:一般采用预创建线程技术,也就是在应用启动的时候,就创建一定数量的线程以线程数组的形式存放,以及初始化一个管理者线程。 ?? ??? ??? ?线程固定的执行一个“任务调配函数”,当“任务队列”中没有任务的时候,线程自动的休眠, ?? ??? ??? ?等待一个新的任务,当任务来到的时候,去唤醒陷入休眠的线程,再去执行任务。 ? ? ? ? ? ? 同时由管理者线程判断任务队列中任务的个数,若任务较少线程过多则自动销毁多余线程(线程数不会少于初始创建线程个数),若任务队列中任务过多则增加线程个数,来达到线程与任务的一个动态平衡,起到合理利用资源的目的。 ?? ??? ??? ?线程完成任务之后,不会被销毁,而是自动的去执行任务队列中的下一个任务。? ? ? ? ? ? ? ?
四、线程池所需要的基础信息:
#define MIN_WAIT_TASK_NUM 5 /*当任务数超过了它,就该添加新线程了*/
#define DEFAULT_THREAD_NUM 2 /*每次创建或销毁的线程个数*/
//任务节点
typedef struct task_list
{
//do_task是一个指针,指向任务函数
//所谓的执行一个任务实际上就是去执行一个函数
void (*do_task)(void*); /* 这个函数实际上就是我们的拷贝任务函数 */
void * arg; /* 任务函数需要的参数 */
}Task;
//线程池
typedef struct pthread_pool
{
pthread_mutex_t lock; /* 锁住整个任务队列 */
pthread_mutex_t thread_counter; /* 用于使用忙线程数时的锁 */
pthread_cond_t task_not_full; /* 条件变量,任务队列不为满 */
pthread_cond_t task_not_empty; /* 任务队列不为空 */
Task *task_list; /* 任务队列 */
pthread_t *tids; /* 指向线程池中所有线程ID的数组 */
pthread_t admin_tid; /* 管理者线程tid */
/*线程池信息*/
int active_threads; /* 线程池中正在服役的线程数->工作线程的个数 */
int min_thr_num; /* 线程池中最小线程数 */
int max_thr_num; /* 线程池中最大线程数 */
int live_thr_num; /* 线程池中存活的线程数 */
int wait_exit_thr_num; /* 需要销毁的线程数 */
/*任务信息*/
int cur_waiting_tasks; /* 线程池中任务队列当前的任务数量 */
int max_waiting_tasks; /* 线程池任务队列最大的任务数量 */
int task_front; /* 队头 */
int task_rear; /* 队尾 */
/*状态信息*/
int shutdown; /* 是否退出程序 */
}Pthread_pool;
五、线程池代码的具体实现:
(1)线程池初始化:
/*
Init_Pool:线程池初始化函数
初始化pool指定的线程池,线程池中有min_thr_num个线程
返回值:成功返回0,失败的返回-1
*/
Pthread_pool* Init_Pool(int max_thr_num,int min_thr_num,int max_waiting_tasks)
{
Pthread_pool* pool=NULL; //初始化线程池结构体
int i,j; //启动线程个数计数,任务列表个数计数
if((pool=(Pthread_pool*)malloc(sizeof(Pthread_pool)))==NULL)
{
printf("malloc threadpool false! \n");
return NULL;
}
/*信息初始化*/
pool->active_threads=0;
pool->min_thr_num=min_thr_num;
pool->max_thr_num=max_thr_num;
pool->live_thr_num=min_thr_num;
pool->wait_exit_thr_num=0;
pool->cur_waiting_tasks=0;
pool->max_waiting_tasks=max_waiting_tasks;
pool->shutdown=false;
/*初始化互斥锁和条件变量*/
if ( pthread_mutex_init(&(pool->lock), NULL) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL) !=0 ||
pthread_cond_init(&(pool->task_not_empty), NULL) !=0 || pthread_cond_init(&(pool->task_not_full), NULL) !=0)
{
printf("init lock or cond false;\n");
return NULL;
}
/*创建max_thr_num个线程同时要将所有创建出来的线程的ID要保存起来*/
pool->tids=(pthread_t*)malloc(sizeof(pthread_t)*max_thr_num); /*根据最大线程数,给工作线程数组开空间*/
if (pool->tids == NULL)
{
printf("malloc threads false;\n");
return NULL;
}
memset(pool->tids, 0, sizeof(pthread_t) * max_thr_num); /*初始化0*/
/*任务队列(链表)开空间*/
pool->task_list = (Task *)malloc(sizeof(Task) * max_waiting_tasks);/*根据最大任务数量开辟任务队列空间*/
if (pool->task_list == NULL)
{
printf("malloc task queue false;\n");
return NULL;
}
memset(pool->task_list, 0, sizeof(Task)*max_waiting_tasks);
/*启动min_thr_num个工作线程*/
for (i=0; i<min_thr_num; i++)
{
/*pool指向当前线程池*/
pthread_create(&(pool->tids[i]), NULL,Routine, (void *)pool);//要创建出来的线程去执行任务调配函数(线程函数)
//打印调试信息
printf("[%lu] 启动线程[%d] : [%lu] is success!\n",pthread_self(),i,pool->tids[i]);
}
/*管理者线程*/
pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool);
printf("启动管理线程 : [%lu] is success!\n",pool->admin_tid);
return pool;
}
(2)任务调配(包括增加任务)
/*
任务调配函数(线程函数)
所有的线程刚开始都会执行此函数,此函数会不断的从线程池中的任务队列
中获取任务(摘任务结点)然后交由线程池中的线程去执行
arg:表示的是线程池的指针,在线程池中有任务队列,任务队列中有任务结点
每一个任务结点上都包含了线程要执行的任务(函数)的地址和执行函数需要的参数
*/
void *Routine(void *arg)
{
Pthread_pool* pool=(Pthread_pool*)arg;
Task task;
while(1)
{
//因为任务队列是共享资源所以需要先上锁
//获取线程互斥锁,上锁
pthread_mutex_lock(&(pool->lock));
while(pool->cur_waiting_tasks==0 && pool->shutdown==false)
{
//当前线程陷入休眠
printf("线程 %lu 等待中... \n",pthread_self());
pthread_cond_wait(&(pool->task_not_empty), &(pool->lock));
//判断是否需要清除线程,自杀功能
if (pool->wait_exit_thr_num > 0)
{
pool->wait_exit_thr_num--;
//判断线程池中的线程数是否大于最小线程数,是则结束当前线程
if (pool->live_thr_num > pool->min_thr_num)
{
printf("线程 %lu 成功退出 \n", pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);//结束线程
}
}
}
if(pool->shutdown==true)
{
pthread_mutex_unlock(&(pool->lock));
printf("线程 %lu 成功退出 \n",pthread_self());
pthread_exit(NULL); //线程自己结束自己
}
//当条件满足的时候,将任务结点从任务链表中取下来(将任务结点从链表中摘下来)
task.do_task=pool->task_list[pool->task_front].do_task; //将任务函数取下
task.arg=pool->task_list[pool->task_front].arg; //将任务函数的参数取下
pool->task_front=(pool->task_front + 1) % pool->max_waiting_tasks;
pool->cur_waiting_tasks--;
//通知可以添加新任务
pthread_cond_broadcast(&(pool->task_not_full));
//获取线程互斥锁,解锁
pthread_mutex_unlock(&(pool->lock));
//按照任务结点中的任务去执行即可
printf("线程 %lu working... \n",pthread_self());
pthread_mutex_lock(&(pool->thread_counter)); //锁住忙线程变量
pool->active_threads++;
pthread_mutex_unlock(&(pool->thread_counter));
(*(task.do_task))(task.arg);
//释放任务结点
printf("线程 %lu end working \n",pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->active_threads--;
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
/*
Add_Task:给任务队列增加任务
把fun_task指向的函数和fun_arg指向的函数参数保存到一个任务结点中去
同时将任务结点添加到pool所表示的线程池的任务队列中去
*/
int Add_Task(Pthread_pool *pool,void (*fun_task)(void *),void *fun_arg)
{
/*需要将fun_task和fun_arg封装到一个任务结点(结构体)中去
在往pool指向的线程池的任务队列中添加任务的时候,需要上锁和解锁
加入任务后要唤醒等待的线程*/
pthread_mutex_lock(&(pool->lock));//上锁
/*如果队列满了,调用wait阻塞*/
while ((pool->cur_waiting_tasks == pool->max_waiting_tasks) && (!pool->shutdown))
{
pthread_cond_wait(&(pool->task_not_full), &(pool->lock));
}
/*如果线程池处于关闭状态*/
if (pool->shutdown)
{
pthread_mutex_unlock(&(pool->lock));
return -1;
}
/*清空工作线程的回调函数的参数arg*/
if (pool->task_list[pool->task_rear].arg != NULL)
{
pool->task_list[pool->task_rear].arg = NULL;
free(pool->task_list[pool->task_rear].arg);
}
/*添加任务到任务队列*/
pool->task_list[pool->task_rear].do_task =fun_task;
pool->task_list[pool->task_rear].arg =fun_arg;
pool->task_rear = (pool->task_rear + 1) % pool->max_waiting_tasks; /* 逻辑环 */
pool->cur_waiting_tasks++;
/*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/
pthread_cond_signal(&(pool->task_not_empty));
pthread_mutex_unlock(&(pool->lock));//解锁
return 0;
}
(3)线程管理
/*管理线程*/
void *admin_thread(void *arg)
{
int i;
Pthread_pool *pool = (Pthread_pool *)arg;
while (!pool->shutdown)
{
printf("管理线程工作!-----------------\n");
sleep(1); /*隔一段时间再管理*/
pthread_mutex_lock(&(pool->lock)); /*加锁*/
int cur_waiting_tasks = pool->cur_waiting_tasks; /*任务数*/
int live_thr_num = pool->live_thr_num; /*存活的线程数*/
pthread_mutex_unlock(&(pool->lock)); /*解锁*/
pthread_mutex_lock(&(pool->thread_counter));
int active_threads = pool->active_threads; /*忙线程数*/
pthread_mutex_unlock(&(pool->thread_counter));
printf("admin busy:%d live:%d\n",active_threads,live_thr_num);
/*创建新线程 实际任务数量大于 最小正在等待的任务数量,存活线程数小于最大线程数*/
if (cur_waiting_tasks >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
{
printf("管理员添加线程-----------\n");
pthread_mutex_lock(&(pool->lock));
int add=0;
/*一次增加 DEFAULT_THREAD_NUM 个线程*/
for (i=0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM
&& pool->live_thr_num < pool->max_thr_num; i++) /*i小于最大线程数并且添加的线程个数小于默认一次添加的线程并且当前存活线程小于最大线程*/
{
if (pool->tids[i] == 0 || !is_thread_alive(pool->tids[i]))
{
pthread_create(&(pool->tids[i]),NULL,Routine, (void *)pool);
add++;
pool->live_thr_num++;
printf("新建线程[%d] %lu-----------------------\n",i,pthread_self());
}
}
pthread_mutex_unlock(&(pool->lock));
}
/*销毁多余的线程 忙线程x2 都小于 存活线程,并且存活的大于最小线程数*/
if ((cur_waiting_tasks*2) < live_thr_num && live_thr_num > pool->min_thr_num)
{
//printf("admin busy --%d--%d----\n", cur_waiting_tasks, live_thr_num);
/*一次销毁DEFAULT_THREAD_NUM个线程*/
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
pthread_mutex_unlock(&(pool->lock));
for (i=0; i<DEFAULT_THREAD_NUM; i++)
{
//通知正在处于空闲的线程,自杀
pthread_cond_signal(&(pool->task_not_empty));
printf("销毁空闲线程 --\n");
}
}
}
return NULL;
}
(4)线程销毁
/*线程是否存活*/
int is_thread_alive(pthread_t tid)
{
int kill_rc = pthread_kill(tid, 0); //发送0号信号,测试是否存活
if (kill_rc == ESRCH) //线程不存在
{
return false;
}
return true;
}
/*释放线程池*/
int free_Pool(Pthread_pool *pool)
{
if (pool == NULL)
{
return -1;
}
if (pool->task_list)
{
free(pool->task_list);
}
if (pool->tids)
{
free(pool->tids);
pthread_mutex_lock(&(pool->lock)); /*先锁住再销毁*/
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->task_not_empty));
pthread_cond_destroy(&(pool->task_not_full));
}
free(pool);
pool = NULL;
return 0;
}
/*
销毁线程池
*/
int Destory_Pool(Pthread_pool *pool)
{
int i;
if (pool == NULL)
{
return -1;
}
pool->shutdown = true;
/*销毁管理者线程*/
pthread_join(pool->admin_tid, NULL);
//通知所有线程去自杀(在自己领任务的过程中)
for (i=0; i<pool->live_thr_num; i++)
{
pthread_cond_broadcast(&(pool->task_not_empty));
}
/*等待线程结束 先是pthread_exit 然后等待其结束*/
for (i=0; i<pool->live_thr_num; i++)
{
pthread_join(pool->tids[i], NULL);
}
free_Pool(pool);
return 0;
}
六、任务接口(主函数调用示例)
? ? ? ??本工程中主要是下linux下调用文件拷贝函数,实现多线程多文件拷贝,包括普通文件,目录文件,多层包含的目录文件的拷贝,每一个文件拷贝的过程都是一次函数的调用,于是可以利用多线程,线程池的技术实现
#include "pthread_pool.h"
#include "cpdir.h"
int main(int argc,char *argv[])
{
if(argc != 3)
{
fprintf(stderr,"%s <src_dir> <dest_dir>\n",argv[0]);
exit(-1);
}
Pthread_pool *main_pool = malloc(sizeof(*main_pool));
main_pool=Init_Pool(50,5,50);
printf("初始化线程池成功!\n");
/*将目录argv[1]以及目录下所有的东西保持原有架构的基础上拷贝到argv[2]目录下*/
Cp_Dir(main_pool,argv[1],argv[2]);
Destory_Pool(main_pool);
printf("销毁线程池成功!\n");
return 0;
}
七、效果图?
????????由于Linux环境以及文件拷贝函数编写问题,部分特殊文件没有成功打开进行拷贝,但是几乎所有用户所能访问的文件都成功拷贝了,如图。
?
?
?
完整工程文件下载地址:https://download.csdn.net/download/weixin_56681901/21963165
|