条件变量(condition variables):
条件变量本身不是锁,它是用来实现并发编程中的一种同步机制。条件变量可以使得线程阻塞等待某个条件发生之后,再继续执行。
为什么需要条件变量:
假设在生产者消费者模型中,我们希望生产者制造出100个产品之后,庆祝一下,那么如果直接使用mutex互斥锁实现的话,那么我们需要不停的在消费者进程上轮询,进行加锁解锁的操作,询问生产者是否产生了100个产品,由于每次加锁解锁是对资源的浪费,而使用条件变量之后,我们可以等待某个条件发生,而不需要主动的轮询,即条件满足的时候生产者进程通知消费者进程,而不需要消费者进程不停的去检测,因此高效了很多。
条件变量的用法
为了防止发生竞争条件,条件变量必须与互斥锁搭配使用。pthread_cond_wait函数的调用和临界资源的访问都需要互斥锁进行保护。
1.创建条件变量
静态创建的方式
pthread_cond_t myCond = PTHREAD_COND_INITIALIZER;
动态创建的方式
int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr);
2.阻塞当前线程,等待条件成立
当条件不成立时,条件变量可以阻塞当前线程,所有被阻塞的线程会构成一个等待队列。 阻塞线程可以借助以下两个函数实现:
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, const struct timespec* abstime);
cond 参数表示已初始化好的条件变量;mutex 参数表示与条件变量配合使用的互斥锁;abstime 参数表示阻塞线程的时间。
这两个 wait 函数的调用,都要在获取 mutex 锁后进行。 以上两个函数内部主要进行了如下的操作: 1.阻塞线程,直到接受到条件成立的信号。 2.当线程被添加到等待队列之后,将互斥锁解锁(所以在调用wait之前必须加锁,否则不能保证将线程添加到等待队列中)。 3.等待条件变量被触发,休眠的线程会对互斥锁进行加锁操作,之后解除线程的阻塞,线程会被唤醒。
3.解除线程的“阻塞”状态
对于被 pthread_cond_wait() 或 pthread_cond_timedwait() 函数阻塞的线程,我们可以借助如下两个函数向它们发送“条件成立”的信号,解除它们的“被阻塞”状态,cond参数表示初始化好的条件变量,当函数成功解除线程的被阻塞状态时,函数会返回数字0。
int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);
两个函数都能解除线程的“被阻塞”状态,区别在于: 1.pthread_cond_signal() 函数至少解除一个线程的“被阻塞”状态,如果等待队列中包含多个线程,优先解除哪个线程将由操作系统的线程调度程序决定。 2.pthread_cond_broadcast() 函数可以解除等待队列中所有线程的“被阻塞”状态。 由于互斥锁的存在,解除阻塞后的线程也不一定能立即执行。当互斥锁处于“加锁”状态时,解除阻塞状态的所有线程会组成等待互斥锁资源的队列,等待互斥锁“解锁”。(等待其他线程解锁之后,该线程才能拿到锁)
4.条件变量的销毁
对于初始化好的条件变量,我们可以调用 pthread_cond_destory() 函数销毁它。
int pthread_cond_destroy(pthread_cond_t *cond)
cond 参数表示要销毁的条件变量。如果函数成功销毁 cond 参数指定的条件变量,返回数字 0
注意事项:
尽可能的使用while(条件不成立)条件不成立的时候一直阻塞,while内部的话调用wait函数,而不要使用if语句。 这是因为wait被唤醒之后,存在虚假唤醒的情况,导致唤醒之后发现条件依旧不成立,因为需要使用while语句循环的等待,直到条件成立。
条件变量应用-生产者消费者模型
生产者消费者问题指的是系统中存在一组生产者进程和一组消费者进程,生产者进程每生产一个产品放入缓冲区,消费者进程每次从缓冲区中拿走一个产品并使用,它是线程同步中的经典问题之一。 为了保证线程对缓冲区内的数据有序访问,需要满足以下3点:
- 缓存区是临界资源,所以线程之间需要互斥访问。
- 只有缓冲区不空的时候,消费者才可以从缓存区中拿出数据,否则阻塞等待。
- 只有缓冲区没有满的时候,生产者才能向缓冲区中放数据,否则阻塞等待。
基于条件变量的生产者消费者代码如下所示: 1.使用结构体来模拟缓冲区,缓冲区之间以链表形式相连接
struct msg
{
struct msg* next;
int num;
};
2.静态的方式定义了一个互斥锁 和 一个条件变量
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t has_product=PTHREAD_COND_INITIALIZER;
总体代码如下:
# include<stdio.h>
# include<stdlib.h>
# include<pthread.h>
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t has_product=PTHREAD_COND_INITIALIZER;
struct msg
{
struct msg* next;
int num;
};
struct msg* head;
void * producer(void* p)
{
struct msg* mp;
while(1)
{
mp=(struct msg*)malloc(sizeof(struct msg));s
mp->num = rand()%1000+1;
printf("produce is %d\n",mp->num);
pthread_mutex_lock(&lock);
mp->next=head;
head=mp;
pthread_mutex_unlock(&lock);
pthread_cond_signal(&has_product);
sleep(rand()%5);
}
}
void* consumer(void* p)
{
struct msg* mp;
while(1)
{
pthread_mutex_lock(&lock);
while(head==NULL)
{
pthread_cond_wait(&has_product,&lock);
}
mp=head;
head=mp->next;
pthread_mutex_unlock(&lock);
printf("Consume is %d\n",mp->num);
free(mp);
mp=NULL;
sleep(rand()%5);
}
}
int main()
{
pthread_t pid,cid;
pthread_create(&pid,NULL,producer,NULL);
pthread_create(&cid,NULL,consumer,NULL);
pthread_join(pid,NULL);
pthread_join(cid,NULL);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&has_product);
return 0;
}
这个代码要想写的更好一点,可以定义为环形链表,初始化环形链表的空间大小,当产品的个数大于空间的时候,生产者进行被阻塞。这里的话链表头插,没有设置初始空间的大小。
信号量(semaphore)
信号量它是一个变量,可以用信号量来代表某种资源的数量。它的初始值可以是从1到N取值,linux中是一个结构体,可以将其理解为一个整数。 信号量不仅可以解决线程之间的同步,也可以用来解决进程之间的同步。 1.创建信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
其中sem 代表信号量,pshared=0,代表在线程间共享,pshared=1,代表在进程中共享。value:代表信号量的初值。
2.信号量的值减一 当信号量的值小于0的时候,线程(进程)会陷入阻塞。相当于P操作。
int sem_wait(sem_t *sem);
3.信号量的值加1 当信号量加1之后,它的值仍然小于等于0的时候,代表有其他线程(进程)在等待使用该资源,因而会唤醒等待队列中的线程(进程)
int sem_post(sem_t *sem)
4.信号量的销毁
int sem_destroy (sem_t *sem)
信号量实现生产者消费者模型
实现的方法:使用环形队列来充当缓冲区 定义
- empty信号量:初始值为N,代表空闲缓冲区的个数为N。
- full信号量:初始值为0,代表放有产品的缓冲区大小。
- mutex互斥量:保证同一时刻只有生产者或者消费者线程中的一个访问。
其中伪代码如下:如何记忆:记住生产者P操作,对应的V操作位于消费者中。 生产者代码:
P(empty);
P(mutex);
生产者将数据放入缓冲区
V(mutex);
V(full);
消费者代码:
P(full);
P(mutex);
消费者拿走数据
V(mutex);
V(empty);
代码如下:
int queue[N];
sem_t blank_number,product_number;
void * producer(void*arg)
{
int p=0;
while(1)
{
sem_wait(&blank_number);
queue[p]=rand()%1000+1;
printf("product %d \n",queue[p]);
sem_post(&product_number);
p=(p+1)%N;
sleep(rand()%5);
}
}
void * consumer(void* arg)
{
int c=0;
while(1)
{
sem_wait(&product_number);
printf("Consume %d\n",queue[c]);
queue[c]=0;
sem_post(&blank_number);
c=(c+1)%N;
sleep(rand()%5);
}
}
int main()
{
sem_init(&blank_number,0,N);
sem_init(&product_number,0,0);
pthread_t pid,cid;
pthread_create(&pid,NULL,producer,NULL);
pthread_create(&cid,NULL,consumer,NULL);
pthread_join(pid,NULL);
pthread_join(cid,NULL);
}
|