本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
Intro
- 本文为多生产者多消费者的C语言实现
- 生产者与消费者针对链表的头结点进行操作
- 使用的现场同步方法为信号量
- 本文侧重于理解条件变量的使用
- 为保证代码简洁,本文代码没有对函数的返回值进行检查,聪明的你一定一定知道如何检查这个小细节(?????)? ??
- 关于互斥锁+条件变量的多生产者多消费者模型实现请参考我的这篇博客
analysis
多生产者多消费者有多个线程,当根据信号量定义的资源数不同,所对应的情况也有所不同,所以在使用信号量去实现多生产者多消费者模型要处理好资源数与多线程之间的关系,在拥有多个资源数时可以配合互斥锁来约束多线程对共享资源的访问
单生产者单消费者
- 限于篇幅,本篇博客仅简单提及此模型,不展示具体代码
- 对于单生产者单消费者,资源数只能为1,每次唤醒一个线程,情况并不复杂
- 若资源数大于1,实际运行就会产生段错误(读写了不该访问的地址)
多生产者多消费者
资源数为1
sem_init(&psem, 0, 1);
sem_init(&csem, 0, 0);
如此定义的好处为:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
struct Node {
int number;
struct Node *next;
};
struct Node *head = NULL;
sem_t psem;
sem_t csem;
void *producer(void *arg);
void *consumer(void *arg);
int main(int argc, char **argv)
{
pthread_t ptid[5];
pthread_t ctid[5];
sem_init(&psem, 0, 1);
sem_init(&csem, 0, 0);
for(int i = 0; i < 5; i++){
pthread_create(&ptid[i], NULL, producer, NULL);
pthread_create(&ctid[i], NULL, consumer, NULL);
}
sem_destroy(&psem);
sem_destroy(&csem);
for(int i = 0; i < 5; i++){
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
return 0;
}
void *producer(void *arg)
{
while(1){
sem_wait(&psem);
struct Node* pnew = (struct Node*)malloc(sizeof(struct Node));
pnew -> number = rand() % 100;
pnew -> next = head;
head = pnew;
printf("Producer: number = %d, tid = %ld\n", pnew -> number, pthread_self());
sem_post(&csem);
sleep(rand() % 3);
}
return NULL;
}
void *consumer(void *arg)
{
while(1){
sem_wait(&csem);
struct Node *pnode = head;
printf("Consumer: number = %d, tid = %ld\n", pnode -> number, pthread_self());
head = pnode -> next;
free(pnode);
sem_post(&psem);
sleep(rand() % 3);
}
return NULL;
}
资源数大于1
sem_init(&psem, 0, 5);
sem_init(&csem, 0, 0);
如此定义的好处为:
-
使生产者先工作 -
效率更高 -
由于资源数为5,程序运行中有多个线程可被唤醒:
- 多个生产者同时生产
- 多个消费者同时消费
- 生产者和消费者同时运行
-
会出现多个线程同时访问共享资源的的情况 -
需添加互斥锁对共享资源进行保护 -
SHOW ME THE CODE
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
struct Node {
int number;
struct Node *next;
};
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
struct Node *head = NULL;
sem_t psem;
sem_t csem;
void *producer(void *arg);
void *consumer(void *arg);
int main(int argc, char **argv)
{
pthread_t ptid[5];
pthread_t ctid[5];
sem_init(&psem, 0, 5);
sem_init(&csem, 0, 0);
for(int i = 0; i < 5; i++){
pthread_create(&ptid[i], NULL, producer, NULL);
pthread_create(&ctid[i], NULL, consumer, NULL);
}
sem_destroy(&psem);
sem_destroy(&csem);
pthread_mutex_destroy(&mutex);
for(int i = 0; i < 5; i++){
pthread_join(ptid[i], NULL);
pthread_join(ctid[i], NULL);
}
return 0;
}
void *producer(void *arg)
{
while(1){
sem_wait(&psem);
pthread_mutex_lock(&mutex);
struct Node* pnew = (struct Node*)malloc(sizeof(struct Node));
pnew -> number = rand() % 100;
pnew -> next = head;
head = pnew;
printf("Producer: number = %d, tid = %ld\n", pnew -> number, pthread_self());
pthread_mutex_unlock(&mutex);
sem_post(&csem);
sleep(rand() % 3);
}
return NULL;
}
void *consumer(void *arg)
{
while(1){
sem_wait(&csem);
pthread_mutex_lock(&mutex);
struct Node *pnode = head;
printf("Consumer: number = %d, tid = %ld\n", pnode -> number, pthread_self());
head = pnode -> next;
free(pnode);
pthread_mutex_unlock(&mutex);
sem_post(&psem);
sleep(rand() % 3);
}
return NULL;
}
- 在效率提升的同时,我们不仅遇到了要保护共享资源的麻烦,还要处理可能出现死锁的情况,在上述代码中若调换某两行代码,则会产生死锁,很细节,但很重要
sem_wait(&csem);
pthread_mutex_lock(&mutex);
sem_wait(&csem);
pthread_mutex_lock(&mutex);
若将其替换为
pthread_mutex_lock(&mutex);
sem_wait(&csem);
pthread_mutex_lock(&mutex);
sem_wait(&csem);
- 若消费者某一线程先于生产者线程运行:
- 此线程成功给互斥锁加锁,然后运行
sem_wait(&csem) ,因为生产者尚未生产,所以此线程堵塞在sem_wait处 - 其余消费者线程堵塞在互斥锁处
- 对于生产者,任意生产者线程调用
pthread_mutex_lock(&mutex) 时,因为互斥锁已经上锁,所以所有的生产者线程全都被阻塞 - 至此所有的线程都阻塞,死锁产生
参考:
- 线程同步|爱编程的大丙
|