,在很多情况下,线程需要检查某一条件(condition)满足之后,才会继续运行。例如,父线程需要检查子线程是否执行完毕 [这常被称为 join()]。这种等待如何实现呢?
我们可以尝试用一个共享变量,如图 30.2 所示。这种解决方案一般能工作,但是效率低下,因为主线程会自旋检查,浪费 CPU 时间。我们希望有某种方式让父线程休眠,直到等待的条件满足(即子线程完成执行)。
volatile int done = 0;
void *child(void *arg) {
printf("child\n");
done = 1;
return NULL;
}
int main(int argc, char *argv[]) {
printf("parent: begin\n");
pthread_t c;
Pthread_create(&c, NULL, child, NULL);
while (done == 0)
;
printf("parent: end\n");
return 0;
}
30.1 定义和程序
线程可以使用条件变量(condition variable),来等待一个条件变成真。条件变量是一个显式队列,当某些执行状态(即条件,condition)不满足时,线程可以把自己加入队列,等待(waiting)该条件。。另外某个线程,当它改变了上述状态时,就可以唤醒一个或者多个等待线程(通过在该条件上发信号),让它们继续执行。Dijkstra 最早在“私有信号量”[D01]中提出这种思想。Hoare 后来在关于观察者的工作中,将类似的思想称为条件变量
pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m);
pthread_cond_signal(pthread_cond_t *c);
int done = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
void thr_exit() {
Pthread_mutex_lock(&m);
done = 1;
Pthread_cond_signal(&c);
Pthread_mutex_unlock(&m);
}
void *child(void *arg) {
printf("child\n");
thr_exit();
return NULL;
}
void thr_join() {
Pthread_mutex_lock(&m);
while (done == 0)
Pthread_cond_wait(&c, &m);
Pthread_mutex_unlock(&m);
}
int main(int argc, char *argv[]) {
printf("parent: begin\n");
pthread_t p;
Pthread_create(&p, NULL, child, NULL);
thr_join();
printf("parent: end\n");
return 0;
}
30.2 生产者/消费者(有界缓冲区)问题
假设有一个或多个生产者线程和一个或多个消费者线程。生产者把生成的数据项放入缓冲区;消费者从缓冲区取走数据项,以某种方式消费。
首先需要一个共享缓冲区,让生产者放入数据,消费者取出数据。简单起见,我们就拿一个整数来做缓冲区(你当然可以想到用一个指向数据结构的指针来代替) ,两个内部函数将值放入缓冲区,从缓冲区取值。如下:
int buffer;
int count = 0;
void put(int value) {
assert(count == 0);
count = 1;
buffer = value;
}
int get() {
assert(count == 1);
count = 0;
return buffer;
}
现在我们需要编写一些函数,知道何时可以访问缓冲区,以便将数据放入缓冲区或从缓冲区取出数据。条件是显而易见的:仅在 count 为 0 时(即缓冲器为空时),才将数据放入缓冲器中。仅在计数为 1 时(即缓冲器已满时),才从缓冲器获得数据。
void *producer(void *arg) {
int i;
int loops = (int) arg;
for (i = 0; i < loops; i++) {
put(i);
}
}
void *consumer(void *arg) {
int i;
while (1) {
int tmp = get();
printf("%d\n", tmp);
}
}
有问题的方案
假设只有一个生产者和一个消费者。显然, put()和 get()函数之中会有临界区,因为 put()更新缓冲区,get()读取缓冲区。但是,给代码加锁没有用,我们还需别的东西。不奇怪,别的东西就是某些条件变量。
cond_t cond;
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
if (count == 1)
Pthread_cond_wait(&cond, &mutex);
put(i);
Pthread_cond_signal(&cond);
Pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
if (count == 0)
Pthread_cond_wait(&cond, &mutex);
int tmp = get();
Pthread_cond_signal(&cond);
Pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}
较好但仍有问题的方案:使用 While 语句替代 If
cond_t cond;
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 1)
Pthread_cond_wait(&cond, &mutex);
put(i);
Pthread_cond_signal(&cond);
Pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 0)
Pthread_cond_wait(&cond, &mutex);
int tmp = get();
Pthread_cond_signal(&cond);
Pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}
单值缓冲区的生产者/消费者方案
使用两个条件变量
cond_t empty, fill;
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 1)
Pthread_cond_wait(&empty, &mutex);
put(i);
Pthread_cond_signal(&fill);
Pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 0)
Pthread_cond_wait(&fill, &mutex);
int tmp = get();
Pthread_cond_signal(&empty);
Pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}
在上述代码中,生产者线程等待条件变量 empty,发信号给变量 fill。相应地,消费者线程等待 fill,发信号给 empty。这样做,从设计上避免了上述第二个问题:消费者再也不会唤醒消费者,生产者也不会唤醒生产者。
最终的生产者/消费者方案
|