IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> C++知识库 -> C语言实现简单的线程池项目(附完整工程文件) -> 正文阅读

[C++知识库]C语言实现简单的线程池项目(附完整工程文件)

一、引言:通常情况下,多线程编程都是在需要的时候,创建一个新线程,然后让这个线程去完成指定的任务,
?? ?完成之后退出(线程相当于是短工),一般情况下是能够满足我们的需求。
?? ?但是当我们需要创建大量的线程的时候,并且执行一个非常简单的任务后线程需要销毁时,比如:
?? ??? ?文件夹的COPY
?? ??? ?网页服务器的响应
?? ??? ?Email邮件服务器等等
?? ??? ?那么像上面这些情况下,我们的程序需要面对大量的请求,但是同时这些请求执行的任务又非常的
?? ??? ?简单,占用的时间非常少,程序就有可能处于不停的创建线程和销毁线程的状态。
?? ??? ?
?? ??? ?如果我们需要频繁的创建或者销毁线程,并每一个线程的任务时间占比非常少,则
?? ??? ?程序的大部分CPU都有可能浪费在线程的开销上面。
?? ??? ?
?? ??? ?为了解决这个问题,我们引入了线程池:
?? ??? ??? ?线程池可以降低频繁创建和销毁线程带来的实际开销。

二、简介:线程池是一种多线程处理形式,大多用于高并发服务器上,它能合理有效的利用高并发服务器上的线程资源;处理方式是将需要线程执行的任务添加到“任务队列”。?然后创建一些线程去自动完成“任务队列”上的任务。

?一个线程池包括以下四个基本组成部分:
??????????????? 1、管理者线程(admin_thread):用于定时判断任务队列中任务个数以及线程个数,创建或销毁线程使其与任务队列达到平衡。主要功能包括创建,销毁线程,后续也可根据开发者需求添加新的任务,如任务队列执行完为空时检测任务队列为空为其添加新任务
??????????????? 2、工作线程(Pthread_pool):线程池中线程,个数不定,在没有任务时处于等待状态,可以循环的执行任务;
??????????????? 3、任务调配(Routine):每个线程必须必须执行的函数,以供任务调配函数调度任务的执行,它主要规定了任务的入口,任务的执行状态,以及唤醒线程,分配任务等功能;
??????????????? 4、任务队列(tasklist):用于存放没有处理的任务。以链式或队列形式存放。可动态增加。
????????????????

三、技术思路:一般采用预创建线程技术,也就是在应用启动的时候,就创建一定数量的线程以线程数组的形式存放,以及初始化一个管理者线程。
?? ??? ??? ?线程固定的执行一个“任务调配函数”,当“任务队列”中没有任务的时候,线程自动的休眠,
?? ??? ??? ?等待一个新的任务,当任务来到的时候,去唤醒陷入休眠的线程,再去执行任务。
? ? ? ? ? ? 同时由管理者线程判断任务队列中任务的个数,若任务较少线程过多则自动销毁多余线程(线程数不会少于初始创建线程个数),若任务队列中任务过多则增加线程个数,来达到线程与任务的一个动态平衡,起到合理利用资源的目的。
?? ??? ??? ?线程完成任务之后,不会被销毁,而是自动的去执行任务队列中的下一个任务。? ?
? ? ? ? ? ?

四、线程池所需要的基础信息:

#define MIN_WAIT_TASK_NUM 5       /*当任务数超过了它,就该添加新线程了*/
#define DEFAULT_THREAD_NUM 2      /*每次创建或销毁的线程个数*/

//任务节点
typedef struct task_list
{
	//do_task是一个指针,指向任务函数
	//所谓的执行一个任务实际上就是去执行一个函数
	
	void (*do_task)(void*);				/* 这个函数实际上就是我们的拷贝任务函数 */
	void * arg;							/* 任务函数需要的参数 */

}Task;

//线程池
typedef struct pthread_pool
{
	pthread_mutex_t lock;				/* 锁住整个任务队列 */
	pthread_mutex_t thread_counter;		/* 用于使用忙线程数时的锁 */
	pthread_cond_t task_not_full;			/* 条件变量,任务队列不为满 */
	pthread_cond_t task_not_empty;			/* 任务队列不为空 */
	
	
	Task *task_list;					/* 任务队列 */
	pthread_t *tids;					/* 指向线程池中所有线程ID的数组 */
	pthread_t admin_tid;                /* 管理者线程tid */
	
	/*线程池信息*/
	int active_threads;					/* 线程池中正在服役的线程数->工作线程的个数 */
	int min_thr_num;           			/* 线程池中最小线程数 */
   	int max_thr_num;          			/* 线程池中最大线程数 */
   	int live_thr_num;          			/* 线程池中存活的线程数 */
   	int wait_exit_thr_num;     			/* 需要销毁的线程数 */
	
	
	/*任务信息*/
	int cur_waiting_tasks;				/* 线程池中任务队列当前的任务数量 */
	int max_waiting_tasks;				/* 线程池任务队列最大的任务数量 */
	int task_front;                      /* 队头 */
   	int task_rear;                       /* 队尾 */

	/*状态信息*/
	int shutdown;						/* 是否退出程序 */
}Pthread_pool;

五、线程池代码的具体实现

(1)线程池初始化:

/*
	Init_Pool:线程池初始化函数
	初始化pool指定的线程池,线程池中有min_thr_num个线程
	返回值:成功返回0,失败的返回-1
*/
Pthread_pool* Init_Pool(int max_thr_num,int min_thr_num,int max_waiting_tasks)
{
	Pthread_pool* pool=NULL;	//初始化线程池结构体
	int i,j;					//启动线程个数计数,任务列表个数计数
	if((pool=(Pthread_pool*)malloc(sizeof(Pthread_pool)))==NULL)
	{
		printf("malloc threadpool false! \n");
		return NULL;
	}
	/*信息初始化*/
	pool->active_threads=0;
    pool->min_thr_num=min_thr_num;
    pool->max_thr_num=max_thr_num;
    pool->live_thr_num=min_thr_num;
    pool->wait_exit_thr_num=0;

    pool->cur_waiting_tasks=0;
    pool->max_waiting_tasks=max_waiting_tasks;

    pool->shutdown=false;

	
	/*初始化互斥锁和条件变量*/
      if ( pthread_mutex_init(&(pool->lock), NULL) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL) !=0  || 
      pthread_cond_init(&(pool->task_not_empty), NULL) !=0  || pthread_cond_init(&(pool->task_not_full), NULL) !=0)
      {
         printf("init lock or cond false;\n");
         return NULL;
      }
	
	/*创建max_thr_num个线程同时要将所有创建出来的线程的ID要保存起来*/
	pool->tids=(pthread_t*)malloc(sizeof(pthread_t)*max_thr_num);	/*根据最大线程数,给工作线程数组开空间*/
	if (pool->tids == NULL)
	{
		printf("malloc threads false;\n");
		return NULL;
	}
	memset(pool->tids, 0, sizeof(pthread_t) * max_thr_num);	/*初始化0*/

	/*任务队列(链表)开空间*/									
	
	pool->task_list = (Task *)malloc(sizeof(Task) * max_waiting_tasks);/*根据最大任务数量开辟任务队列空间*/
	if (pool->task_list == NULL)
	{
		printf("malloc task queue false;\n");
		return NULL;
	}
	memset(pool->task_list, 0, sizeof(Task)*max_waiting_tasks);

	/*启动min_thr_num个工作线程*/
	for (i=0; i<min_thr_num; i++)
	{
		/*pool指向当前线程池*/
		pthread_create(&(pool->tids[i]), NULL,Routine, (void *)pool);//要创建出来的线程去执行任务调配函数(线程函数)
		//打印调试信息
		printf("[%lu] 启动线程[%d] : [%lu] is success!\n",pthread_self(),i,pool->tids[i]);
	}
	/*管理者线程*/
	pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool);
	printf("启动管理线程 : [%lu] is success!\n",pool->admin_tid);
	return pool;	
}

(2)任务调配(包括增加任务)

/*
	任务调配函数(线程函数)
		所有的线程刚开始都会执行此函数,此函数会不断的从线程池中的任务队列
	中获取任务(摘任务结点)然后交由线程池中的线程去执行
		arg:表示的是线程池的指针,在线程池中有任务队列,任务队列中有任务结点
		每一个任务结点上都包含了线程要执行的任务(函数)的地址和执行函数需要的参数
*/
void *Routine(void *arg)
{
	Pthread_pool* pool=(Pthread_pool*)arg;
	Task task;
	
	while(1)
	{
		//因为任务队列是共享资源所以需要先上锁
		//获取线程互斥锁,上锁
		pthread_mutex_lock(&(pool->lock));
		while(pool->cur_waiting_tasks==0 && pool->shutdown==false)
		{
			//当前线程陷入休眠
			printf("线程 %lu 等待中... \n",pthread_self());
       		pthread_cond_wait(&(pool->task_not_empty), &(pool->lock));
			//判断是否需要清除线程,自杀功能
			if (pool->wait_exit_thr_num > 0)
			{
				pool->wait_exit_thr_num--;
				//判断线程池中的线程数是否大于最小线程数,是则结束当前线程
				if (pool->live_thr_num > pool->min_thr_num)
				{
					printf("线程 %lu 成功退出 \n", pthread_self());
					pool->live_thr_num--;
					pthread_mutex_unlock(&(pool->lock));
					pthread_exit(NULL);//结束线程
				}
   			}
       		
		}	
		if(pool->shutdown==true)
		{
			pthread_mutex_unlock(&(pool->lock));
       		printf("线程 %lu 成功退出 \n",pthread_self());
       		pthread_exit(NULL); //线程自己结束自己
		}
		
		//当条件满足的时候,将任务结点从任务链表中取下来(将任务结点从链表中摘下来)
		task.do_task=pool->task_list[pool->task_front].do_task; //将任务函数取下
		task.arg=pool->task_list[pool->task_front].arg;			//将任务函数的参数取下

		pool->task_front=(pool->task_front + 1) % pool->max_waiting_tasks;
		pool->cur_waiting_tasks--;
		
		//通知可以添加新任务
    	pthread_cond_broadcast(&(pool->task_not_full));
		//获取线程互斥锁,解锁
		pthread_mutex_unlock(&(pool->lock));
		
		//按照任务结点中的任务去执行即可
		printf("线程 %lu working... \n",pthread_self());
		pthread_mutex_lock(&(pool->thread_counter));            //锁住忙线程变量
    	pool->active_threads++;
    	pthread_mutex_unlock(&(pool->thread_counter));

		(*(task.do_task))(task.arg);
		//释放任务结点
		printf("线程 %lu end working \n",pthread_self());
    	pthread_mutex_lock(&(pool->thread_counter));
    	pool->active_threads--;
    	pthread_mutex_unlock(&(pool->thread_counter));
	}
	pthread_exit(NULL);
}
/*
	Add_Task:给任务队列增加任务
		把fun_task指向的函数和fun_arg指向的函数参数保存到一个任务结点中去
		同时将任务结点添加到pool所表示的线程池的任务队列中去
*/
int Add_Task(Pthread_pool *pool,void (*fun_task)(void *),void *fun_arg)
{
	/*需要将fun_task和fun_arg封装到一个任务结点(结构体)中去	
	在往pool指向的线程池的任务队列中添加任务的时候,需要上锁和解锁	
	加入任务后要唤醒等待的线程*/

	pthread_mutex_lock(&(pool->lock));//上锁

	/*如果队列满了,调用wait阻塞*/
	while ((pool->cur_waiting_tasks == pool->max_waiting_tasks) && (!pool->shutdown))
	{
		pthread_cond_wait(&(pool->task_not_full), &(pool->lock));
	}
	/*如果线程池处于关闭状态*/
	if (pool->shutdown)
	{
		pthread_mutex_unlock(&(pool->lock));
		return -1;
	}

	/*清空工作线程的回调函数的参数arg*/
	if (pool->task_list[pool->task_rear].arg != NULL)
	{
		pool->task_list[pool->task_rear].arg = NULL;
		free(pool->task_list[pool->task_rear].arg);	
	}

	/*添加任务到任务队列*/
	pool->task_list[pool->task_rear].do_task =fun_task;
	pool->task_list[pool->task_rear].arg =fun_arg;
	pool->task_rear = (pool->task_rear + 1) % pool->max_waiting_tasks;  /* 逻辑环  */
	pool->cur_waiting_tasks++;

	/*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/
	pthread_cond_signal(&(pool->task_not_empty));
	pthread_mutex_unlock(&(pool->lock));//解锁

	return 0;
}

(3)线程管理

/*管理线程*/
void *admin_thread(void *arg)
{
   int i;
   Pthread_pool *pool = (Pthread_pool *)arg;
   while (!pool->shutdown)
   {
		printf("管理线程工作!-----------------\n");
		sleep(1);                             /*隔一段时间再管理*/
		pthread_mutex_lock(&(pool->lock));               /*加锁*/ 
		int cur_waiting_tasks = pool->cur_waiting_tasks; /*任务数*/
		int live_thr_num = pool->live_thr_num;           /*存活的线程数*/
		pthread_mutex_unlock(&(pool->lock));             /*解锁*/

		pthread_mutex_lock(&(pool->thread_counter));
		int active_threads = pool->active_threads;           /*忙线程数*/  
		pthread_mutex_unlock(&(pool->thread_counter));

		printf("admin busy:%d live:%d\n",active_threads,live_thr_num);
		/*创建新线程 实际任务数量大于 最小正在等待的任务数量,存活线程数小于最大线程数*/
		if (cur_waiting_tasks >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
		{
			printf("管理员添加线程-----------\n");
			pthread_mutex_lock(&(pool->lock));
			int add=0;

			/*一次增加 DEFAULT_THREAD_NUM 个线程*/
			for (i=0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM 	
					&& pool->live_thr_num < pool->max_thr_num; i++)	/*i小于最大线程数并且添加的线程个数小于默认一次添加的线程并且当前存活线程小于最大线程*/
			{
				if (pool->tids[i] == 0 || !is_thread_alive(pool->tids[i]))
				{
					pthread_create(&(pool->tids[i]),NULL,Routine, (void *)pool);
					add++;
					pool->live_thr_num++;
					printf("新建线程[%d] %lu-----------------------\n",i,pthread_self());
				}
			}

			pthread_mutex_unlock(&(pool->lock));
		}

		/*销毁多余的线程 忙线程x2 都小于 存活线程,并且存活的大于最小线程数*/
		if ((cur_waiting_tasks*2) < live_thr_num  &&  live_thr_num > pool->min_thr_num)
		{
			//printf("admin busy --%d--%d----\n", cur_waiting_tasks, live_thr_num);
			/*一次销毁DEFAULT_THREAD_NUM个线程*/
			pthread_mutex_lock(&(pool->lock));
			pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
			pthread_mutex_unlock(&(pool->lock));

			for (i=0; i<DEFAULT_THREAD_NUM; i++)
			{
				//通知正在处于空闲的线程,自杀
				pthread_cond_signal(&(pool->task_not_empty));
				printf("销毁空闲线程 --\n");
			}
		}
   }
  
   return NULL;
}

(4)线程销毁

/*线程是否存活*/
int is_thread_alive(pthread_t tid)
{
   int kill_rc = pthread_kill(tid, 0);     //发送0号信号,测试是否存活
   if (kill_rc == ESRCH)  //线程不存在
   {
      return false;
   }
   return true;
}

/*释放线程池*/
int free_Pool(Pthread_pool *pool)
{
   if (pool == NULL)
   {
     return -1;
   }

   if (pool->task_list)
   {
      free(pool->task_list);
   }
   if (pool->tids)
   {
      free(pool->tids);
      pthread_mutex_lock(&(pool->lock));               /*先锁住再销毁*/
      pthread_mutex_destroy(&(pool->lock));
      pthread_mutex_lock(&(pool->thread_counter));
      pthread_mutex_destroy(&(pool->thread_counter));
      pthread_cond_destroy(&(pool->task_not_empty));
      pthread_cond_destroy(&(pool->task_not_full));
   }
   free(pool);
   pool = NULL;

   return 0;
}

/*
	销毁线程池
*/
int Destory_Pool(Pthread_pool *pool)
{
	int i;
	if (pool == NULL)
	{
		return -1;
	}
	pool->shutdown = true;
	
	/*销毁管理者线程*/
   pthread_join(pool->admin_tid, NULL);

   //通知所有线程去自杀(在自己领任务的过程中)
   for (i=0; i<pool->live_thr_num; i++)
   {
   		pthread_cond_broadcast(&(pool->task_not_empty));
   }

   /*等待线程结束 先是pthread_exit 然后等待其结束*/
   for (i=0; i<pool->live_thr_num; i++)
   {
     	pthread_join(pool->tids[i], NULL);
   }

   free_Pool(pool);
   return 0;
}

六、任务接口(主函数调用示例)

? ? ? ??本工程中主要是下linux下调用文件拷贝函数,实现多线程多文件拷贝,包括普通文件,目录文件,多层包含的目录文件的拷贝,每一个文件拷贝的过程都是一次函数的调用,于是可以利用多线程,线程池的技术实现

#include "pthread_pool.h"
#include "cpdir.h"
int main(int argc,char *argv[])
{
	if(argc != 3)
	{
		fprintf(stderr,"%s <src_dir> <dest_dir>\n",argv[0]);
		exit(-1);
	}
	Pthread_pool *main_pool = malloc(sizeof(*main_pool));
	main_pool=Init_Pool(50,5,50);
	printf("初始化线程池成功!\n");

	/*将目录argv[1]以及目录下所有的东西保持原有架构的基础上拷贝到argv[2]目录下*/
	Cp_Dir(main_pool,argv[1],argv[2]);
	
	Destory_Pool(main_pool);
	printf("销毁线程池成功!\n");
	return 0;
}

七、效果图?

????????由于Linux环境以及文件拷贝函数编写问题,部分特殊文件没有成功打开进行拷贝,但是几乎所有用户所能访问的文件都成功拷贝了,如图。

?

?

?

完整工程文件下载地址:https://download.csdn.net/download/weixin_56681901/21963165

  C++知识库 最新文章
【C++】友元、嵌套类、异常、RTTI、类型转换
通讯录的思路与实现(C语言)
C++PrimerPlus 第七章 函数-C++的编程模块(
Problem C: 算法9-9~9-12:平衡二叉树的基本
MSVC C++ UTF-8编程
C++进阶 多态原理
简单string类c++实现
我的年度总结
【C语言】以深厚地基筑伟岸高楼-基础篇(六
c语言常见错误合集
上一篇文章      下一篇文章      查看所有文章
加:2021-09-06 10:59:24  更:2021-09-06 10:59:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/27 20:46:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计