目录
线程安全(下)?
同步的引入
同步
同步的概念
条件变量原理
条件变量接口
加上条件变量后的代码
条件变量夺命追问
死锁
死锁的两种场景
死锁的必要条件
怎样预防死锁
生产消费模型
123规则
代码实现
优点
信号量
信号量原理
信号量接口
信号量的使用
使用信号量完成生产消费模型代码
线程池?
概念
原理
代码实现
线程安全(下)?
同步的引入
现在来一小段代码,来演示一下多个线程访问临界资源的合理性问题。假如有两个工作线程,一个工作线程做面,一个工作线程吃面,我们来约定几个规则:1.同一时刻吃面线程去吃面的时候,做面线程不能去做面,反之亦是这样,也就是保证互斥。2.没有面时,吃面线程不能去吃面,有面时,做面线程不能去做面,也就是保证面的数量只能是1或者0(合理访问临界区资源)3.做面吃面尽可能的快。代码如下:
1 #include <stdio.h>
2 #include <pthread.h>
3 #include <unistd.h>
4 #define N 1
5 int noodles=0;//假定初始时,面的数量为0
6 pthread_mutex_t g_lock;//互斥锁对象
7 void* Eat(void* arg){//吃面线程执行的代码
8 while(1){
9 pthread_mutex_lock(&g_lock);//加锁
10 if(noodles<1){//没有面时,解锁并且跳出当前代码执行逻辑
11 pthread_mutex_unlock(&g_lock);
12 continue;
13 }
14 printf("我是吃面线程,我吃了:%d碗面\n",noodles--);//有面时就吃面
15 pthread_mutex_unlock(&g_lock);//吃完面后解锁
16 }
17 return NULL;
18 }
19 void* Make(void* arg){//做面线程执行的代码
20 while(1){
21 pthread_mutex_lock(&g_lock);//加锁
22 if(noodles>=1){//有面时,解锁并跳出循环
23 pthread_mutex_unlock(&g_lock);
24 continue;
25 }
26 printf("我是做面线程,我做了:%d碗面\n",++noodles);
27 pthread_mutex_unlock(&g_lock);//做完面后解锁
28 }
29 return NULL;
30 }
31 int main(){
32 pthread_mutex_init(&g_lock,NULL);//初始化互斥锁变量
33 pthread_t eat[N];//保存吃面线程的标识符
34 pthread_t make[N];//保存做面线程的标识符
35 int i=0;
36 for(;i<N;i++){//创建两个工作线程,分别去做面、吃面
37 pthread_create(&eat[i],NULL,Eat,NULL);
38 pthread_create(&make[i],NULL,Make,NULL);
39 }
40 i=0;
41 for(;i<2;i++){//回收吃面线程与做面线程的退出状态信息
42 pthread_join(eat[i],NULL);
43 pthread_join(make[i],NULL);
44 }
45 pthread_mutex_destroy(&g_lock);//销毁互斥锁
46 return 0;
47 }
代码逻辑很简单,主线程创建两个工作线程,定义全局变量noodles,两个线程分别对其进行加1减1操作(保证互斥),模拟吃面做面的过程,并且保证合理访问临界区资源。
?保存编译运行,一段时间过,按"ctrl+c"终止进程,查看运行结果:
运行后,执行结果符合我们的预期,遵从做一碗面,吃一碗面的逻辑,但是这段代码真的没有问题吗??接下来在吃面线程和做面线程的代码中分别添加一行代码:
?保存结果编译运行,将打印结果重定向到文件1.txt,便于观察现象:
打开"1.txt"文件,第一次抢到互斥锁的是吃面线程,:
增加了这两行代码后,吃面线程打印了208602行“碗里没有面,我就不吃了”,做面线程才做了一碗面。
这个结果充分说明了一个问题:吃面线程在碗里没有面时,还一直在占用CPU资源,做了很多无效的操作。深入分析一下,就是说吃面线程在加锁后,发现碗里没有面,进行了解锁并且continue,而之后这把锁又被吃面线程拿到了,持续拿到这把锁208601次。
那么做面线程为什么208601次后才抢到这把锁?
首先这台机器是单核CPU的,同一时刻只有一个线程在CPU上执行自己的代码,在吃面线程的时间片范围内,做面线程不可能来抢这把锁。所以在吃面线程的时间片内,它自己一直在做加锁解锁的操作。直到被CPU剥离下去,并且此时锁恰好被释放,做面线程才拿到了这把锁。做面线程拿到锁后,同样会做大量无效操作。即便是多核数CPU,同一个线程也有可能多次获取互斥锁,做很多无效操作,只不过这种情况在单核CPU机器上尤为严重。
怎样才能让吃面线程和做面线程保证做一碗,吃一碗,大量减少无效操作,如何解决这个问题?同步。
同步
同步的概念
多个线程保证了互斥之后,也就保证了同一时刻只有一个线程能够独占访问临界资源。但是并不能保证每个线程对临界资源的访问都是合理的。同步是为了保证多个线程对临界资源访问的合理性,这个合理性建立在多个线程保证互斥的情况下。
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问 题,叫做同步。
条件变量原理
线程在加锁后,判断临界资源是否可用,如果可用,直接访问临界区资源,如果不可用,则调用等待接口,让线程等待。本质上是将线程PCB放到PCB等待队列中。
条件变量接口
1.动态初始化
此函数与pthread_cond_destory()的返回值相似,调用成功返回0,失败返回错误码
2.静态初始化
3.等待接口
?谁调用谁等待,就将谁放到条件变量对应的PCB等待队列当中
4.唤醒接口
调用pthread_cond_signal可以唤醒PCB等待队列中至少一个线程
?调用pthread_cond_broadcast可以唤醒PCB等待队列当中所有线程
?5.条件变量的销毁
动态初始化条件变量,在进程退出时,需要调用pthread_cond_destroy进行销毁。
加上条件变量后的代码
使用条件变量后,可以让吃面线程在发现碗里没有面(临界资源不可用)时,就将自己放到PCB等待队列中,直到做面线程做好了一碗面去通知吃面线程,此时吃面线程会被唤醒,做面线程发现碗里有面,就将自己放到PCB等待队列中,而吃面线程在吃完这碗面后又去唤醒做面线程,然后吃面线程又发现碗里没有面,又将自己放到PCB等待队列中。一直重复上述过程。
简单来说,加上条件变量后,保证吃面线程和做面线程都不可能连续两次以上加锁成功,这样就提高了做面吃面的效率。代码改进如下:
1 #include <stdio.h>
2 #include <pthread.h>
3 #include <unistd.h>
4 #define N 1
5 int noodles=0;//假定初始时,面的数量为0
6 pthread_cond_t g_cond;//定义条件变量
7 pthread_mutex_t g_lock;//互斥锁对象
8 void* Eat(void* arg){//吃面线程执行的代码
9 while(1){
10 pthread_mutex_lock(&g_lock);//加锁
11 if(noodles<1){
12 printf("碗里没有面,我就不吃了\n");
13 pthread_cond_wait(&g_cond,&g_lock);//条件变量不可用,将当前线程放到等待队列中
14 }
15 printf("我是吃面线程,我吃了:%d碗面\n",noodles--);//有面时就吃面
16 pthread_mutex_unlock(&g_lock);//吃完面后解锁
17 pthread_cond_signal(&g_cond);//唤醒做面线程
18 }
19 return NULL;
20 }
21 void* Make(void* arg){//做面线程执行的代码
22 while(1){
23 pthread_mutex_lock(&g_lock);//加锁
24 if(noodles>=1){
25 printf("碗里有面,我就不做了\n");
26 pthread_cond_wait(&g_cond,&g_lock);//条件变量不可用,将当前线程放到等待队列中
27 }
28 printf("我是做面线程,我做了:%d碗面\n",++noodles);
29 pthread_mutex_unlock(&g_lock);//做完面后解锁
30 pthread_cond_signal(&g_cond);//唤醒吃面线程
31 }
32 return NULL;
33 }
34 int main(){
35 pthread_mutex_init(&g_lock,NULL);//初始化互斥锁变量
36 pthread_cond_init(&g_cond,NULL);//初始化条件变量
37 pthread_t eat[N];//保存吃面线程的标识符
38 pthread_t make[N];//保存做面线程的标识符
39 int i=0;
40 for(;i<N;i++){//创建两个工作线程,分别去做面、吃面
41 pthread_create(&eat[i],NULL,Eat,NULL);
42 pthread_create(&make[i],NULL,Make,NULL);
43 }
44 i=0;
45 for(;i<2;i++){//回收吃面线程与做面线程的退出状态信息
46 pthread_join(eat[i],NULL);
47 pthread_join(make[i],NULL);
48 }
49 pthread_mutex_destroy(&g_lock);//销毁互斥锁
50 pthread_cond_destroy(&g_cond);//销毁条件变量
51 return 0;
52 }
编译运行,并且将打印结果重定向到"2.txt"当中:
观察运行结果,还是非常规律的,做面线程和吃面线程都是连续两次拿到互斥锁,访问临界区资源,这也符合在单核CPU下运行预期结果,但是如果在多核CPU下运行,运行结果不一定会这么规律,但是运行结果一定是对的。
那么以上代码就真的没有问题了吗,做如下修改,将N的数量改成2,也就是说分别创建两个做面线程(称为做面线程1和2),两个吃面线程(称为吃面线程1和2):
?编译运行,并且将打印结果重定向到"3.txt"当中,打开文件后:
?程序运行结果又不对了,出现了吃2碗面、吃0碗面的情况。为什么?
先来了解一下pthread_cond_wait这个函数,此函数调用成功后,会将当前线程的PCB放到PCB等待队列中,然后释放互斥锁,给其它线程抢锁的机会。函数返回时,当前线程会被从PCB等待队列中拿出来,并且还要去抢夺互斥锁,加锁成功后,代码才能继续往下执行。
分析其中的一种不符合条件的情况:如果一开始互斥锁被吃面线程1和吃面线程2分别拿到,那么这两个线程都会被放到等待队列中,直到任一做面线程将他们都通知出来,此时这两个吃面线程的代码执行逻辑都是从pthread_cond_wait返回后开始执行,假如说互斥锁又分别被这两个线程抢到,那就是说,这两个线程会按顺序去吃面,但是只有一碗面,所以就会把面吃成负数。那么代码如何改进?
很简单,只需要在pthread_wait函数返回后再去判断临界资源可不可用即可,也就是将if改为while,做出修改如下:
此时在编译运行,将结果打印到屏幕,观察结果:
运行后没有在出现面的数量大于或者小于1,也就是说吃面做面的逻辑正常,但是代码直接卡死了,明明是死循环,它应该一直打印,但是此时却停止打印了,是某一个工作线程退出时将锁带走导致死锁了吗?
使用命令pstack [进程号]来查看四个工作线程的状态:
四个工作线程都在,但是都被放到PCB等待队列中去了,所以代码卡死并不是因为死锁导致的,是因为此时四个工作线程都在等待,如果不去人为干预,那么当前进程就不会退出了。为什么会出现这种情况?
分析:如果一开始互斥锁被吃面线程1和吃面线程2分别拿到,那么这两个线程都会被放到等待队列中,如果此时做面线程1做好了面,然后去通知等待队列中的吃面线程,但是只有吃面线程1被唤醒,吃面线程1的pthread_cond_wait函数返回时,吃面线程1也要去抢互斥锁,但是恰巧这把互斥锁又分别被做面线程2和做面线程1(因为做面线程1执行的代码是死循环,在单核CPU下这把锁必定会被做面线程1连续两次拿到)抢到,两个做面线程发现碗里有面,就都将自己放到等待队列中去了。此时,三个线程都在等待队列中了,只有吃面线程1执行自己的代码,吃面线程1加锁吃面后,去通知PCB等待队列,恰好只是将吃面线程2唤醒,然后此时吃面线程1和2都去抢这把互斥锁,但是不管谁先抢到,这两个吃面线程都会将自己放到PCB等待队列中,所以代码就卡死了。
总结一下以上问题:也就是说吃面线程可能会将吃面线程从PCB等待队列中唤醒,做面线程可能会将做面线程从PCB等待队列中唤醒,只要保证吃面线程唤醒的是做面线程,做面线程唤醒的是吃面线程,那么上述问题就不会出现了。
如何解决?很简单,只需要初始化两个条件变量,也就是说吃面线程和做面线程有各自的PCB等待队列,而pthread_cond_signal函数调用时,分别去通知对方的PCB等待队列即可。如下图所示:
最终版本代码如下,顺便打印线程的标识符:
1 #include <stdio.h>
2 #include <pthread.h>
3 #include <unistd.h>
4 #define N 2
5 int noodles=0;//假定初始时,面的数量为0
6 pthread_cond_t g_eat_cond;//定义吃面线程的条件变量
7 pthread_cond_t g_make_cond;//定义吃做线程的条件变量
8 pthread_mutex_t g_lock;//互斥锁对象
9 void* Eat(void* arg){//吃面线程执行的代码
10 while(1){
11 pthread_mutex_lock(&g_lock);//加锁
12 while(noodles<1){
13 printf("碗里没有面,我就不吃了,%lu\n",pthread_self());
14 pthread_cond_wait(&g_eat_cond,&g_lock);//条件变量不可用,将当前线程放到等待队列中
15 }
16 printf("我是吃面线程,我吃了:%d碗面,%lu\n",noodles--,pthread_self());//有面时就吃面
17 pthread_mutex_unlock(&g_lock);//吃完面后解锁
18 pthread_cond_signal(&g_make_cond);
19 }
20 return NULL;
21 }
22 void* Make(void* arg){//做面线程执行的代码
23 while(1){
24 pthread_mutex_lock(&g_lock);//加锁
25 while(noodles>=1){
26 printf("碗里有面,我就不做了,%lu\n",pthread_self());
27 pthread_cond_wait(&g_make_cond,&g_lock);//条件变量不可用,将当前线程放到等待队列中
28 }
29 printf("我是做面线程,我做了:%d碗面,%lu\n",++noodles,pthread_self());
30 pthread_mutex_unlock(&g_lock);//做完面后解锁
31 pthread_cond_signal(&g_eat_cond);//唤醒吃面线程
32 }
33 return NULL;
34 }
35 int main(){
36 pthread_mutex_init(&g_lock,NULL);//初始化互斥锁变量
37 pthread_cond_init(&g_eat_cond,NULL);//初始化吃面线程条件变量
38 pthread_cond_init(&g_make_cond,NULL);//初始化做面线程条件变量
39 pthread_t eat[N];//保存吃面线程的标识符
40 pthread_t make[N];//保存做面线程的标识符
41 int i=0;
42 for(;i<N;i++){//创建两个工作线程,分别去做面、吃面
43 pthread_create(&eat[i],NULL,Eat,NULL);
44 pthread_create(&make[i],NULL,Make,NULL);
45 }
46 i=0;
47 for(;i<2;i++){//回收吃面线程与做面线程的退出状态信息
48 pthread_join(eat[i],NULL);
49 pthread_join(make[i],NULL);
50 }
51 pthread_mutex_destroy(&g_lock);//销毁互斥锁
52 pthread_cond_destroy(&g_eat_cond);//销毁吃面线程条件变量
53 pthread_cond_destroy(&g_make_cond);//销毁做面线程条件变量
54 return 0;
55 }
编译运行:
条件变量夺命追问
1.条件变量的等待接口第二个参数为什么会有互斥锁?
答:要进行解锁。线程进入PCB等待队列中就不会在执行自己的代码了,如果不解锁,那么其它线程就永远访问不了临界区资源。
2.调用pthread_cond_wait函数后是先释放互斥锁还是先将线程放到PCB等待队列中?
答:先将线程PCB放到PCB等待队列中,在进行解锁。假设有两个线程,线程1和线程2,线程1调用了pthread_cond_wait函数,如果说先释放互斥锁,那么在线程1进入PCB等待队列之前,互斥锁可能就被线程2拿到了,线程2访问完临界资源后会去通知线程1所在的PCB等待队列,尝试将其唤醒,但是此时线程1还没有在自己的PCB等待队列中,而线程2在访问完临界资源后可能也会进入PCB等待队列,最终就会造成线程1和线程2都被放到PCB等待队列中。
3.线程被唤醒了之后需要在获取互斥锁吗?
答:需要。pthread_cond_wait函数在返回时会在其内部进行加锁操作,以保证线程被唤醒后访问临界区资源是合法的。在加锁时:
抢到了:pthread_cond_wait函数返回,代码向下执行。
没抢到:一直处于抢锁逻辑,直到抢到互斥锁,pthread_cond_wait函数返回。
死锁
死锁的两种场景
第一种场景:线程加锁后,不释放互斥锁
第二种场景:两个线程分别拿着一把锁,还想请求对方的锁
死锁的必要条件
1.不可剥夺:线程获取到互斥锁之后,除了自己释放,其他线程不能进行释放
2.循环等待:线程A拿着1锁请求2锁,同时线程B拿着2锁,请求1锁
3.互斥条件:一个互斥锁同一时间只能被一个线程拥有
4.请求与保持:吃着碗里的,看着锅里的
怎样预防死锁
1.破坏必要条件:循环等待,请求与保持
2.加锁顺序一致,都先加1锁在加2锁
3.避免锁没有被释放:在所有可能线程退出的地方都进行解锁
4.资源一次性分配,多个资源在代码中有可能每一个资源都需要使用不同的锁进行保护,例如:全局变量A,需要1锁,全局变量B,需要2锁,就有可能多个线程在使用这两个资源的时候,出现循环等待的情况。
生产消费模型
123规则
1个线程安全队列:保证先进先出特性的数据结构
2种角色的线程:生产者&消费者
3个规则:生产者与生产者互斥,消费者与消费者互斥,生产者与消费者互斥+同步
代码实现
可以利用C++中的queue,对其进行包装,来实现线程安全队列,有了前面的学习,那么该代码是非常好写的。代码如下:
1 #include <stdio.h>
2 #include <queue>
3 #include <pthread.h>
4 #define N 2
5 using namespace std;
6 class SaftyQueue{
7 public:
8 SaftyQueue(){//做一些初始化工作
9 _capacity=5;
10 pthread_mutex_init(&que_lock,NULL);
11 pthread_cond_init(&cons_cond,NULL);
12 pthread_cond_init(&prod_cond,NULL);
13 }
14 void Push(int val){
15 pthread_mutex_lock(&que_lock);//加锁保护
16 while(_que.size()>=_capacity){
17 pthread_cond_wait(&prod_cond,&que_lock);//将当前生产者线程放到自己的PCB等待队列中
18 }
19 _que.push(val);
20 printf("我是生产线程:%lu,我生产了:%d\n",pthread_self(),val);//printf不是原子性的,需要加锁保护
21 pthread_mutex_unlock(&que_lock);//解锁
22 pthread_cond_signal(&cons_cond);//去通知消费者的PCB等待队列
23 }
24 void Pop(){
25 pthread_mutex_lock(&que_lock);//加锁保护
26 while(_que.size()<=0){
27 pthread_cond_wait(&cons_cond,&que_lock);//将当前消费者线程放到自己的PCB等待队列中
28 }
29 int data=_que.front();
30 _que.pop();
31 printf("我是消费线程:%lu,我消费了:%d\n",pthread_self(),data);//printf不是原子性的,需要加锁保护
32 pthread_mutex_unlock(&que_lock);//解锁
33 pthread_cond_signal(&prod_cond);//去通知生产者的PCB等待队列
34 }
35 ~SaftyQueue(){//销毁互斥锁对象与条件变量
36 pthread_mutex_destroy(&que_lock);
37 pthread_cond_destroy(&cons_cond);
38 pthread_cond_destroy(&prod_cond);
39 }
40 private:
41 queue<int> _que;//使用STL提供的队列即可
42 size_t _capacity;//队列的大小
43 pthread_mutex_t que_lock;//同一时刻保证只有一个线程对队列进行操作
44 pthread_cond_t cons_cond;//消费者条件变量,将消费者线程放到PCB等待队列中
45 pthread_cond_t prod_cond;//生产者条件变量,将生产者线程放到PCB等待队列中
46 };
47 int g_data=0;
48 pthread_mutex_t g_lock = PTHREAD_MUTEX_INITIALIZER;//静态初始化互斥锁变量
49 void* start_prod(void* arg){//生产者往队列中生产
50 SaftyQueue* que=(SaftyQueue*)arg;
51 while(1){
52 pthread_mutex_lock(&g_lock);//对全局变量g_data进行加锁保护,防止多个生产者同时访问g_data
53 que->Push(g_data);
54 g_data++;
55 pthread_mutex_unlock(&g_lock);//进行解锁,给其他生产者线程访问g_data的机会
56 }
57 }
58 void* start_cons(void* arg){//消费者从队列中消费
59 SaftyQueue* que=(SaftyQueue*)arg;
60 while(1){
61 que->Pop();
62 }
63 }
64 int main(){
65 SaftyQueue* que=new SaftyQueue();//que指向申请出来的线程安全队列
66 pthread_t prod[N];//存放生产者线程的标识符
67 pthread_t cons[N];//存放消费者线程的标识符
68 int i=0;
69 for(;i<N;i++){//创建N个生产与消费线程
70 int ret=pthread_create(&prod[i],NULL,start_prod,(void*)que);//将指针que传递过去
71 if(ret!=0){
72 perror("pthread_create");
73 return 0;
74 }
75 ret=pthread_create(&cons[i],NULL,start_cons,(void*)que);//将指针que传递过去
76 if(ret!=0){
77 perror("pthread_create");
78 return 0;
79 }
80 }
81 i=0;
82 for(;i<N;i++){//等待工作线程
83 pthread_join(prod[i],NULL);
84 pthread_join(cons[i],NULL);
85 }
86 delete que;
87 return 0;
88 }
编译运行:
观察现象,生产者线程将队列写满后消费者线程才来消费,消费者线程将队列中的元素全部消费完后,生产者线程才来生产。这个现象好像与代码的逻辑不符合,因为代码实现中,生产者和消费者并不关心对方的状态,只关心对列中有没有元素,也就是说生产和消费不一定要等到对列为空或者队列满时才进行。那么现在出现这种现象正确吗?正确,因为当前机器为单核CPU,同一时刻拿到CPU的只有一个线程,无论是哪一种线程,在自己的时间片范围内,都能执行到临界区资源不可用将自己放到PCB等待队列为止,所以说这种现象是正确的。
优点
1.忙闲不均
2.生产者与消费者解耦:生产者与消费者只关心线程安全队列中有没有元素,互不干扰
3.支持高并发
信号量
信号量原理
使用条件变量可以保证同步,它本质就是一个PCB等待队列。而信号量也可以保证同步,与条件变量不同的是,信号量多了一个资源计数器,而这个资源计数器的作用就是用来判断临界资源是否可用,在代码实现中就不用程序员自己来判断临界资源是否可用了。
信号量接口
1.初始化
用来初始化信号量
2.等待接口
调用sem_wait后会做两件事:
1.对资源计数器进行减1操作
2.判断资源计数器的值是否小于0? ?是:阻塞等待,将当前线程放到PCB等待队列。否:接口返回
3.释放接口
调用sem_post后会做两件事:
1.对资源计数器进行加1操作
2.判断资源计数器的值是否小于等于0? ?是:去通知PCB等待队列(说明没有加1之前资源计数器的值一定是负数,根据sem_wait的特性,一定有线程因为临界资源不可用被放到了PCB等待队列中。此时加1后,临界资源就可以使用了,所以要唤醒正在等待的线程)。?否:不用通知PCB等待队列,因为此时没有线程在等待。
4.销毁接口
用来销毁信号量
信号量的使用
1.使用信号量时,是先保证互斥还是先获取信号量?
如果先保证互斥,观察信号量的接口,在等待接口中并没有与互斥锁相关的参数,也就是说如果使用了互斥锁,调用sem_wait后如果发现临界资源不可用,将线程放到PCB等待队列中后不会进行解锁,那么就造成了死锁。所以说:先获取信号量在保正互斥,与条件变量的使用正好相反
2.信号量可以用于互斥吗?
可以,把信号量的资源计数器初始化为1,那么当某一个线程将资源计数器的值减为0后,其他线程就都会被阻塞掉,直到当前线程将其释放。
使用信号量完成生产消费模型代码
我们考虑将vector作为线程安全队列的底层容器,那么此时可以将该队列假想为一个环形队列。则生产消费模型如下图所示:
代码实现如下:
1 #include <stdio.h>
2 #include <vector>
3 #include <semaphore.h>
4 #include <pthread.h>
5 #define CAPACITY 5//线程安全队列的大小
6 #define N 2//创建某种线程的数量
7 using namespace std;
8 class SemQueue{
9 public:
10 SemQueue()
11 :_que(CAPACITY)
12 {
13 _capacity=CAPACITY;
14 sem_init(&_sem_lock,0,1);//初始化保证互斥的信号量,注意资源计数器的值为1
15 sem_init(&_sem_cons,0,0);//初始化消费者信号量,注意资源计数器的值为0
16 sem_init(&_sem_prod,0,CAPACITY);//初始化生产者信号量,注意资源计数器的值为队列的大小
17 front=0;//队头
18 back=0;//队尾
19 }
20 void Push(int val){
21 sem_wait(&_sem_prod);//获取生产者信号量
22 sem_wait(&_sem_lock);//保证互斥
23 _que[back]=val;
24 printf("我是生产者线程:%lu,我生产了:%d\n",pthread_self(),_que[back]);
25 back=(back+1)%_capacity;
26 sem_post(&_sem_lock);//释放
27 sem_post(&_sem_cons);//释放消费者信号量,有可能去通知消费者等待队列
28 }
29 void Pop(){
30 sem_wait(&_sem_cons);//获取消费者信号量,可能会阻塞
31 sem_wait(&_sem_lock);//保证互斥
32 printf("我是消费者线程:%lu,我消费了:%d\n",pthread_self(),_que[front]);
33 front=(front+1)%_capacity;
34 sem_post(&_sem_lock);//释放
35 sem_post(&_sem_prod);//释放生产者信号量,可能会通知生产者等待队列
36 }
37 ~SemQueue(){//销毁所有信号量
38 sem_destroy(&_sem_lock);
39 sem_destroy(&_sem_cons);
40 sem_destroy(&_sem_prod);
41 }
42 private:
43 sem_t _sem_lock;//保证互斥的信号量
44 sem_t _sem_cons;//消费者信号量
45 sem_t _sem_prod;//生产者信号量
46 vector<int> _que;
47 int _capacity;
48 int front;
49 int back;
50 };
51 pthread_mutex_t g_lock = PTHREAD_MUTEX_INITIALIZER;
52 int val=0;
53 void* prod_start(void* arg){
54 SemQueue* que=(SemQueue*)arg;
55 while(1){
56 pthread_mutex_lock(&g_lock);//对val加锁保护
57 que->Push(val);
58 val++;
59 pthread_mutex_unlock(&g_lock);//解锁
60 }
61 }
62 void* cons_start(void* arg){
63 SemQueue* que=(SemQueue*)arg;
64 while(1){
65 que->Pop();
66 }
67 }
68 int main(){
69 SemQueue* que=new SemQueue();
70 pthread_t cons[N];
71 pthread_t prod[N];
72 int i=0;
73 for(;i<N;i++){//创建两种线程,并将线程安全队列的地址传递过去
74 int ret=pthread_create(&prod[i],NULL,prod_start,(void*)que);
75 if(ret<0){
76 perror("pthread_create");
77 return 0;
78 }
79 ret=pthread_create(&cons[i],NULL,cons_start,(void*)que);
80 if(ret<0){
81 perror("pthread_create");
82 return 0;
83 }
84 }
85 for(i=0;i<N;i++){//线程等待,回收两种线程的退出状态信息
86 pthread_join(prod[i],NULL);
87 pthread_join(cons[i],NULL);
88 }
89 return 0;
90 }
使用信号量完成的生产消费模型的代码比使用条件变量更简单,只不过底层稍微复杂一些,两种都可以达到预期效果。编译运行后:
线程池?
概念
? ?一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线 程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够 保证内核的充分利用,还能防止过分调度。
为什么要有线程池?
? ? ?如果每次都只创建一个线程,首先当用户请求过多时,每次都需要创建一个线程,创建线程需要时间和调度开销,这样会影响缓存的局部性和整体的性能。其次,如果无上限一直创建线程,还会导致CPU的过分调度。
? ? ? 线程池已经创建好了一定数量的线程,等待着分配任务,这样避免了处理任务时的线程创建和销毁。线程池里线程个数确定,能够保证内核的充分利用,还能防止过分调度。
?线程池中可用线程数量取决于可用额并发处理器,处理器内核,内存,网络socket等的数量。
线程池的应用场景:
? ? ? ?需要大量的线程来完成任务,且完成人物的时间比较短。比如:WEB服务器完成网页请求这样的任务,因为当个任务小,并且任务量巨大,你可以想象一个热门网站的请求次数。但是对于长时间的任务,线程池的优先就不明显了。比如:一个Telnet连接请求,因为Telnet会话时间比线程创建时间大多了。 ? ? ? ? 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。 ? ? ? ? 接收突发性的大量请求,但是不至于使服务器因此产生大量线程应用。突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量的线程可能使内存到达极限,出现错误。
原理
线程池中包含一个线程安全队列与一些线程,如图:
线程池会先创建一定数量的线程,然后循环从任务队列中获取任务对象,在执行任务对象指定的接口即可。?
代码实现
线程池的代码对比生产消费模型,就是将消费线程的创建放在了类的内部,而生产线程变成了主函数。代码的难度并不大,如下:
1 #include <stdio.h>
2 #include <queue>
3 #include <pthread.h>
4 using namespace std;
5 typedef void (*Handler)(int);//Handler是类型不是指针,该类型的变量会指向无返回值,参数为int的函数
6 class QueueData{//将要处理的数据和处理该数据要执行的函数(指针)封装起来
7 public:
8 QueueData(){}
9 QueueData(int data,Handler handler){
10 _data=data;
11 _handler=handler;
12 }
13 void run(){//该类对象调用run方法后,执行数据对应的函数
14 _handler(_data);
15 }
16 private:
17 int _data;//要处理的数据
18 Handler _handler;//处理该数据将要使用到的函数的地址
19 };
20 class ThreadPool{
21 public:
22 ThreadPool(int capacity,int thread_num){//初始化
23 _capacity=capacity;
24 _thread_num=thread_num;
25 pthread_mutex_init(&_lock,NULL);
26 pthread_cond_init(&_cond_cons,NULL);
27 pthread_cond_init(&_cond_prod,NULL);
28 _exitflag=0;
29 }
30 int OnIint(){//在线程池中创建线程
31 int i=0;
32 int count=0;
33 pthread_t tid;
34 for(;i<_thread_num;i++){
35 int ret=pthread_create(&tid,NULL,PthreadStart,(void*)this);//注意将this指针传递给线程入口函数
36 if(ret<0){
37 count++;
38 }
39 }
40 return _thread_num-=count;//判断一共成功创建了几个线程
41 }
42 static void* PthreadStart(void* arg){
43 pthread_detach(pthread_self());//线程分离,消费线程退出后,不用被其他线程回收资源
44 ThreadPool* p=(ThreadPool*)arg;//强转为ThreadPool*
45 while(1){//消费线程不断消费即可
46 pthread_mutex_lock(&p->_lock);//从队列中拿,必须先加锁
47 while(p->_que.empty()){//判断队列是否为空
48 if(p->_exitflag){//此时代表要退出了
49 p->_thread_num--;//_thread_num表示还有几个线程没有退出
50 pthread_mutex_unlock(&p->_lock);//在线程退出之前解锁
51 pthread_exit(NULL);
52 }
53 pthread_cond_wait(&p->_cond_cons,&p->_lock);//队列为空,当前线程放到PCB等待队列中
54 }
55 QueueData q;
56 p->Pop(&q);
57 pthread_mutex_unlock(&p->_lock);//拿到数据后解锁
58 pthread_cond_signal(&p->_cond_prod);//通知生产者
59 q.run();//处理该数据即可
60
61 }
62 return NULL;
63 }
64 void Push(QueueData q){//把数据放入队列中,注意互斥与同步即可
65 pthread_mutex_lock(&_lock);
66 while(_que.size()>=_capacity){
67 pthread_cond_wait(&_cond_prod,&_lock);
68 }
69 _que.push(q);
70 pthread_mutex_unlock(&_lock);
71 pthread_cond_signal(&_cond_cons);
72 }
73 void Pop(QueueData* p){//从队列中拿出数据,并且将其从队列中删除
74 *p=_que.front();
75 _que.pop();
76 }
77 void ThreadExit(){//主线程调用该方法,说明此时Push已经全部完成,等待消费线程全部退出即可
78 _exitflag=1;
79 while(_thread_num){//防止有消费线程在PCB等待队列中
80 pthread_cond_signal(&_cond_cons);
81 }
82 }
83 ~ThreadPool(){//销毁互斥锁、条件变量
84 pthread_mutex_destroy(&_lock);
85 pthread_cond_destroy(&_cond_cons);
86 pthread_cond_destroy(&_cond_prod);
87 }
88 private:
89 queue<QueueData> _que;//使用STL队列作为线程安全队列的底层
90 size_t _capacity;//线程安全队列的大小
91 int _thread_num;//线程池中线程的个数
92 pthread_mutex_t _lock;//互斥锁,保证同一时刻只用一个线程在操作线程安全队列
93 pthread_cond_t _cond_cons;//消费者条件变量,即线程池中的线程将会使用到的条件变量
94 pthread_cond_t _cond_prod;//生产者条件变量,此代码中生产者即为主线程
95 int _exitflag;//标志位,保证将队列中元素处理完成后生产者才退出
96 };
97 void hand(int i){
98 printf("pthread_t=%lu,data=%d\n",pthread_self(),i);
99 }
100 int main(){
101 ThreadPool* tp=new ThreadPool(10,5);//定义线程池对象,给定队列大小为10,5个消费线程
102 if(tp==NULL){
103 printf("create threadpool failed!\n");
104 return 0;
105 }
106 if(tp->OnIint()<=0){//创建消费线程
107 printf("create threadpool failed!\n");
108 }
109 int i=0;
110 for(;i<10000;i++){
111 QueueData qd(i,hand);//注意hand就是要执行的函数
112 tp->Push(qd);//往队列中插入元素
113 }
114 tp->ThreadExit();//保证所有消费线程全部退出
115 delete tp;
116 return 0;
117 }
注意:
1.线程池内部的线程一开始都会执行PthreadStart函数,需要将该函数给成类的静态成员函数。如果不是静态成员函数,那么第一个参数编译器会默认传递this,但是该函数的第一个参数必须是void*,所以将函数给成静态的,就没有this了,防止冲突。
2.主线程在Push完所有数据后会立即退出,并不关心消费线程是否将数据全部处理完了,所以在线程池中加上_exitflag标志位,根据代码的执行逻辑,此时主线程退出之前,所有的消费线程都会退出,也就是说所有数据都被处理完了。
编译运行:
出现乱序的结果是正确的,因为不同的线程在处理数据执行run()方法时互斥锁已经释放了。当某一个线程刚准备处理数据时,可能被从CPU上剥离,而其他线程在抢到锁之后拿到数据并处理,就会出现乱序的结果。
|