一. 什么是生产者消费者模型
1. 基本概念
生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过容器来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给容器;消费者不找生产者要数据,而是直接从容器里取。
2. 三种关系
实际中,生产者可能有多个,消费者也可能有多个,它们彼此之间要应该满足什么关系呢?
假设下面的情景:
- 每次一个生产者一次只能生产一个数据,
- 每次一个消费者一次只能消费一个数据
- 唯一的容器容器每次只容许一个生产者push数据或一个消费者pop数据。
在满足上面的情景下,可以推测生产者、消费者彼此之间的关系:
- [生产者和生产者]:互斥与同步关系。互斥体现在所有生产者竞争,只有一个能去容器pop数据。同步的话要保证每一个生产者都有机会到容器中pop数据。
- [消费者和消费者]:互斥与同步关系。互斥体现在所有消费者竞争,只有一个能去容器push数据。同步要求每一个消费者都有机会去容器中push数据。
- [生产者和消费者]:互斥与同步关系。互斥体现在二者只有一个能访问容器,这时另外一个只能阻塞等待。同步体现在容器不能永远只是生产者在push或消费者在pop,生产者生产了一些数据后要告知消费者来消费,反之亦然。
3. 再次理解生产者消费者模型
生产者消费者模型的核心思想在于:众多的生产者和众多的消费者通过唯一的容器进行数据交互,在交互的同时必须维护好彼此之间的互斥与同步的关系。
二. 生产者消费者模型优点
容器就相当于一个缓冲区,平衡了生产者和消费者的数据处理能力。这个容器就是用来给生产者和消费者解耦的。假如只是一对一的生产和消费,快的那方必须等待慢的那方才能完成一次交易,然后继续下一组;而如果它们之间有一个容器可以存储数据,其中一个生产者把数据push到容器后不用等消费者,下一个生产者继续往容器里push数据,也就是说在容器满之前生产者可以一直连续的生产数据,消费者也是一样的道理。
即通过容器使生产者和消费者解耦提高了数据交互的效率。
三. 基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构,它有如下如下几个特点:
- 众多生产者中内部竞争出一个生产者,去阻塞队列中生产一个数据,完成之后重新竞争(维护了生产者和生产者之间的互斥、同步关系)。
- 众多消费者中也是内部竞争出一个消费者,去阻塞队列里拿取一个数据,拿到后重新竞争(维护了消费者和消费者之间的互斥、同步关系)。
- 每次只能有一个线程操作队列(维护了生产者和消费者之间的互斥关系)。
- 当队列为空时,生产者通知消费者来拿取数据,然后自己会被阻塞等待,直到有消费者把它唤醒;当队列满时,消费者通知生产者来生产数据,然后自己被阻塞等待,直到生产者把它唤醒。(维护了生产者和消费者之间的同步关系)。
1. 准备工作
先从最简单的开始,只有一个生产者和一个消费者,创建两个线程代表它们,后续它们将在自己的控制流中完成相应的生产和消费任务;至于它们进行数据交互的容器,使用STL的容器适配器queue即可,交互的数据类型为整数:
int main()
{
srand((unsigned int)time(nullptr));
BlockQueue<int>* p = new BlockQueue<int>;
pthread_t pro, con;
pthread_create(&pro, nullptr, ProducerAction, p);
pthread_create(&con, nullptr, ConsumerAction, p);
pthread_join(pro, nullptr);
pthread_join(pro, nullptr);
delete p;
return 0;
}
2. 阻塞队列实现
基本框架 接下来看看我们封装的阻塞队列的基本框架,它有4个成员变量:
_q ,一个普通队列,用来存储数据。_capacity ,阻塞队列的容量,默认可以存5个数据。full ,一个条件变量。当阻塞队列满时生产者线程在该条件下等待。empty ,一个条件变量。当阻塞队列空时消费者在该条件下等待。mutex ,一把互斥锁。保证所有时间内只有一个线程能操作队列。
构造函数负责初始化两个条件变量和锁,析构函数负责销毁它们:
template<class T>
class BlockQueue
{
public:
BlockQueue(size_t capcity = 5)
:_capacity(capcity)
{
pthread_cond_init(&full, nullptr);
pthread_cond_init(&empty, nullptr);
pthread_mutex_init(&mutex, nullptr);
}
~BlockQueue()
{
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
pthread_mutex_destroy(&mutex);
}
void PushData(T data){};
void PopData(T& data);
private:
bool IsFull()
{
return _q.size() >= _capacity;
}
bool IsEmpty()
{
return _q.empty();
}
queue<T> _q;
size_t _capacity;
pthread_cond_t full;
pthread_cond_t empty;
pthread_mutex_t mutex;
};
生产者生产数据 成员函数void PushData(T data) 由生产者调用,功能是插入一个数据到阻塞队列中,下面是该函数的几点说明:
- 该函数一进来就要申请锁,最后插入完成释放锁。
- 插入数据之前要检查阻塞队列是否满了,如果满了就要需要通知消费者来消费,然后自己在
full 条件下等待。
void PushData(T data)
{
pthread_mutex_lock(&mutex);
while(IsFull())
{
cout<<"queue is full"<<endl;
pthread_cond_signal(&empty);
pthread_cond_wait(&full, &mutex);
}
_q.push(data);
pthread_mutex_unlock(&mutex);
}
消费者拿取数据 消费者可以调用阻塞队列里的void PopData(T& data) 成员函数拿走一个阻塞队列里的数据,下面是该函数的几点说明:
- 消费者调用时传入的参数是一个输出型参数。阻塞队列会把队头数据写入到输出型参数的内存。
- 进来的第一步先加锁,拿走数据后解锁。
- 拿取数据之前要检查阻塞队列是否为空,为空的话要通知生产者进行生产,然后自己在
empty 条件下等待。
void PopData(T& data)
{
pthread_mutex_lock(&mutex);
while(IsEmpty())
{
cout<<"queue is empty"<<endl;
pthread_cond_signal(&full);
pthread_cond_wait(&empty, &mutex);
}
data = _q.front();
_q.pop();
pthread_mutex_unlock(&mutex);
}
关于阻塞队列生产、拿取数据操作的几个问题
问题一:判断阻塞队列空满时为什么要用while循环,而不用if判断语句? 拿生产者来说,它在插入前判断队列已经满了,如果用if判断语句的话,在if里面要执行pthread_cond_wait() 等待条件满足(即队列有空位置),当这个生产者被唤醒后执行if外面的push插入数据。但是如果pthread_cond_wait() 等待出错了,会继续往下插入数据,这样就多插入了一个数据导致数据出错;如果使用while循环的话,即使等待出错了,这时还会重新回去判断队列是否满了,避免了数据出错的问题。
问题二:判空和判满逻辑中,能不能先等待再唤醒? 答案是不行的,首先对于访问阻塞队列的锁mutex,生产者和消费者是共同竞争的,如果这个线程先等待的话锁确实被释放了,但是它不会继续往下执行唤醒另一个线程的操作了(因为这个线程自己也处在等待中),这样造成的现象就是锁没人申请,线程都等待各自的条件中等待。
3. 测试阻塞队列
下面是生产者线程的控制流,由于只有一个生产者所以不用在其控制流中加锁和引入条件变量来维护生产者和生产者之间的同步与互斥关系。生产者各每隔一秒生产一个数据:
void* ProducerAction(void* arg)
{
BlockQueue<int>* p = (BlockQueue<int>*)arg;
while(true)
{
int data = rand()%100+1;
p->PushData(data);
cout<<"[producer] push data:"<<data<<endl;
sleep(1);
}
}
消费者每隔一秒拿取一个数据:
void* ConsumerAction(void* arg)
{
BlockQueue<int>* p = (BlockQueue<int>*)arg;
while(true)
{
int data = 0;
p->PopData(data);
cout<<"[consumer] get data:"<<data<<endl;
sleep(1);
}
}
下面是main.cpp的全部代码:
#include "blockqueue.h"
void* ProducerAction(void* arg)
{
BlockQueue<int>* p = (BlockQueue<int>*)arg;
while(true)
{
int data = rand()%100+1;
p->PushData(data);
cout<<"[producer] push data:"<<data<<endl;
}
}
void* ConsumerAction(void* arg)
{
BlockQueue<int>* p = (BlockQueue<int>*)arg;
while(true)
{
int data = 0;
p->PopData(data);
cout<<"[consumer] get data:"<<data<<endl;
sleep(2);
}
}
int main()
{
srand((unsigned int)time(nullptr));
BlockQueue<int>* p = new BlockQueue<int>;
pthread_t pro, con;
pthread_create(&pro, nullptr, ProducerAction, p);
pthread_create(&con, nullptr, ConsumerAction, p);
pthread_join(pro, nullptr);
pthread_join(pro, nullptr);
delete p;
return 0;
}
编译运行,发现每生产一个数据马上又被消费者拿走了,这种情况队列永远都不会满: 另外由于我们是先创建生产者线程,再创建消费者线程。所以生产者先生产,消费者后消费。 如果我们先创建消费者线程的话,消费者线程线程先执行发现队列为空,就会被先阻塞起来并且释放操作队列的锁mutex(注意,如果有多个消费者的话,它们是没有机会抢这把锁的,因为它们在抢操作队列的这个锁之前必须要获得内部竞争的锁即竞争出来一个消费者去消费的锁);等到生产者线程轮流生产完所有数据之后,最后一个生产者会唤醒被一开始等待的消费者来消费;在所有消费者线程拿走完队列数据之前,生产者一直等待: 我们先创建消费者线程,消费者发现队列为空后挂起等待生产者生产完所有数据后唤醒消费者线程:
4. 阻塞队列完整代码
分两个文件,头文件blockqueue.h 里包含阻塞队列的声明;main.cpp 即主线程的控制流,负责创建生产者、消费者线程。
blockqueue.h
#pragma once
#include <queue>
#include <unistd.h>
#include <stdlib.h>
#include <iostream>
#include <pthread.h>
using namespace std;
template<class T>
class BlockQueue
{
public:
BlockQueue(size_t capcity = 5)
:_capacity(capcity)
{
pthread_cond_init(&full, nullptr);
pthread_cond_init(&empty, nullptr);
pthread_mutex_init(&mutex, nullptr);
}
~BlockQueue()
{
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
pthread_mutex_destroy(&mutex);
}
void PushData(T data)
{
pthread_mutex_lock(&mutex);
while(IsFull())
{
cout<<"queue is full"<<endl;
pthread_cond_signal(&empty);
pthread_cond_wait(&full, &mutex);
}
_q.push(data);
pthread_mutex_unlock(&mutex);
}
void PopData(T& data)
{
pthread_mutex_lock(&mutex);
while(IsEmpty())
{
cout<<"queue is empty"<<endl;
pthread_cond_signal(&full);
pthread_cond_wait(&empty, &mutex);
}
data = _q.front();
_q.pop();
pthread_mutex_unlock(&mutex);
}
private:
bool IsFull()
{
return _q.size() >= _capacity;
}
bool IsEmpty()
{
return _q.empty();
}
queue<T> _q;
size_t _capacity;
pthread_cond_t full;
pthread_cond_t empty;
pthread_mutex_t mutex;
};
main.cpp
#include "blockqueue.h"
void* ProducerAction(void* arg)
{
BlockQueue<int>* p = (BlockQueue<int>*)arg;
while(true)
{
int data = rand()%100+1;
p->PushData(data);
cout<<"[producer] push data:"<<data<<endl;
sleep(1);
}
}
void* ConsumerAction(void* arg)
{
BlockQueue<int>* p = (BlockQueue<int>*)arg;
while(true)
{
int data = 0;
p->PopData(data);
cout<<"[consumer] get data:"<<data<<endl;
sleep(1);
}
}
int main()
{
srand((unsigned int)time(nullptr));
BlockQueue<int>* p = new BlockQueue<int>;
pthread_t pro, con;
pthread_create(&pro, nullptr, ProducerAction, p);
pthread_create(&con, nullptr, ConsumerAction, p);
pthread_join(pro, nullptr);
pthread_join(pro, nullptr);
delete p;
return 0;
}
5. 关于改进阻塞队列的几点补充
5.1 多生产者多消费者的设计
只有一个生产者和只有一个消费者的情况,只需在阻塞队列push和pop时维护生产者和消费者的同步与互斥关系即可。如果有多个生产者和消费者的话需要在它们各自的控制流中加不同锁和不同的条件变量,确保只有一个消费者和一个生产者能去竞争操作队列。
5.2 阻塞队列所存储数据可以是更复杂的任务
阻塞队列不仅仅可以存简单的整型数字,还可以是复杂任务的结构体指针,这样生产者派发任务,消费者拿到后解决里面的任务。比如生产者派发用户输入的账号密码,消费者拿到后负责传输到数据库中。
|