C++ 实现生产者和消费者(并发)(多线程)
一、解释生产者和消费者模型
就是生产者生产产品,放到缓冲区里面,消费者从缓冲区里面取出产品进行消费。 (假设一次只能生产和消费一个产品。)
主要需要实现的功能是:
-
生产者生产内容。 -
消费者消费内容。 -
(阻塞)缓冲区的大小有限制(超过限制,生产者暂停生产),反之就一直等待。 -
(阻塞)只有缓冲区有内容的时候,消费者才可以进行消费,反之就一直等待。 -
缓冲区需要加锁,才能保证线程安全。 -
有结束条件 —— 生产n份产品并消费n份产品之后就停止。
生产者消费者模型有啥用(真实应用场景举例)??
——我本科的时候,感觉这种有点像是工厂,操作系统那么底层的内容会使用吗? ——其实它的应用场景比我本科的时候想象的更加底层。 比如,操作屏幕输出,可能一个CPU读取数据,另一个CPU操作uart(连接了console和keyboard)的buffer,这时候我们希望过程是串行的,因此需要进行同步。uart和它对应的buffer只有一个。这就是一个典型的生产者消费者模型。生产者可能是一个CPU处理得到的输入,队列是uart对应的buffer(存放待输出数据),消费者是另一个CPU或者说是uart(将CPU传送过来的数据输出到屏幕上)——具体例子可以看uart.c,中的uartputc() 函数。(或者看我的博客 6.S081-7中断 - Interrupts )
二、分析核心代码流程
0. 定义需要的变量
uint64_t num = 0;
std::queue<uint64_t> buffer_q{};
uint64_t max_buffer_q_size = 10;
uint64_t total_production_count = 100;
bool produce_is_not_finished_flag = true;
bool consume_is_not_finished_flag = true;
std::mutex mutexLock{};
std::condition_variable condition{};
1. 生产者生产内容(暂时不考虑锁)
1.1 模拟生产,就是往队列里面添加产品号。
uint64_t data = num++;
buffer_q.push(data);
2.2 生产者生产内容,就是不停生产(不考虑消息队列大小,也不考虑线程安全)
void produce(){
int produced_production_count = 0;
while(true){
uint64_t data = num++;
buffer_q.push(data);
std::cout << "task data = " << data << " produced" << std::endl;
produced_production_count++;
if(produced_production_count > total_production_count){
break;
}
}
}
1.3 阻塞—— 生产者需要考虑消费者也需要访问缓冲区队列—— 加锁。
void produce(){
int produced_production_count = 0;
while(true){
std::unique_lock<std::mutex> lockGuard(mutexLock);
uint64_t data = num++;
buffer_q.push(data);
std::cout << "task data = " << data << " produced" << std::endl;
produced_production_count++;
if(produced_production_count > total_production_count){
break;
}
lockGuard.unlock();
condition.notify_all();
}
}
注意,这里在过程中加入了锁,并在结束前解锁,并在最后唤醒所有等待线程👇
std::unique_lock<std::mutex> lockGuard(mutexLock);
lockGuard.unlock();
condition.notify_all();
-
不加condition.notify_all(); 这句也可以运行,但是过程结束就唤醒其他线程,显然是效率更高的(不然其他进程可能还要“睡一段时间”才会“醒来”)。 -
这里的**lockGuard 有一个特性 —— 可以不进行解锁操作(当跳出作用域的时候lockGuard 会自动调用析构函数进行解锁操作)** -
但是这里我还是加了解锁操作。这是因为,lockGuard 的析构解锁,是在跳出while的时候执行的(跳出作用域),但是我想通过condition.notify_all(); 唤醒所有进等待的线程,因此我要在此之前就释放锁,而不是在此之后才释放锁。
1.4 阻塞 —— 生产过程中超过缓冲区大小,需要阻塞 —— 通过锁进行wait
void produce(){
int produced_production_count = 0;
while(true){
std::unique_lock<std::mutex> lockGuard(mutexLock);
while(buffer_q.size() >= max_buffer_q_size){
condition.wait(lockGuard);
}
uint64_t data = num++;
buffer_q.push(data);
std::cout << "task data = " << data << " produced" << std::endl;
produced_production_count++;
if(produced_production_count > total_production_count){
break;
}
lockGuard.unlock();
condition.notify_all();
}
}
如果当前缓冲区大小大于最大缓冲区大小,那么需要阻塞👇
while(buffer_q.size() >= max_buffer_q_size){
condition.wait(lockGuard);
}
解释一下上面👆代码是如何解决“ 生产过程中超过缓冲区大小,需要阻塞 ”的情况的:
wait会将当前在运行的进程存放于sleep数据中 (我猜应该指的就是阻塞队列中)。sleep传入的参数是需要等待的信号,在这个例子中传入的是lockGuard对应的mutexLock的地址。在while过程中,一旦buffer_q中有了空间,会调用与sleep对应的函数wakeup,传入的也是mutexLock的地址。任何等待在这个地址的进程都会被唤醒。有时候这种机制被称为conditional synchronization。
1.5 锁的经典要求 —— 锁的内容的内部,尽量不要出现跳转命令,比如break , return , goto
考虑前面的代码👇,有个明显的bug:如果我们执行了break 那么我们就没有释放锁——会导致死锁~!
while (true) {
std::unique_lock<std::mutex> lockGuard(mutexLock);
...
if(produced_production_count > total_production_count){
break;
}
lockGuard.unlock();
}
修改方法非常简单,我们将break; 跳出改成👇
while (produce_is_not_finished_flag) {
std::unique_lock<std::mutex> lockGuard(mutexLock);
...
if(produced_production_count > total_production_count){
produce_is_not_finished_flag = false;
}
lockGuard.unlock();
}
1.6 生产者完整代码👇
void produce(){
int produced_production_count = 0;
while(produce_is_not_finished_flag){
std::unique_lock<std::mutex> lockGuard(mutexLock);
while(buffer_q.size() >= max_buffer_q_size){
condition.wait(lockGuard);
}
uint64_t data = num++;
buffer_q.push(data);
std::cout << "task data = " << data << " produced" << std::endl;
produced_production_count++;
if(produced_production_count > total_production_count){
produce_is_not_finished_flag = false;
}
lockGuard.unlock();
condition.notify_all();
}
}
2. 消费者
按照上面流程,可以写出消费者消费代码如下👇
void consume(){
while(consume_is_not_finished_flag){
std::unique_lock<std::mutex> lockGuard(mutexLock);
while(buffer_q.empty()){
condition.wait(lockGuard);
}
uint64_t data = buffer_q.front();
buffer_q.pop();
std::cout << "task data = " << data << " has been consumed" << std::endl;
if(!produce_is_not_finished_flag && buffer_q.empty()){
consume_is_not_finished_flag = false;
}
lockGuard.unlock();
condition.notify_all();
}
}
三、多线程调用
1. 开启多线程bind
bind 是用来绑定函数调用的某些参数,可以将bind函数看作一个通用的函数适配器,它接受一个可调用对象,生成新的可调用对象来适应原对象的参数列表。
在构造函数里面使用👇进行绑定(将参数绑定到可执行的produce / consume 函数)。并开启多线程。
producer_consumer(){
producer = new std::thread(std::bind(&producer_consumer::produce, this));
consumer = new std::thread(std::bind(&producer_consumer::consume, this));
}
2. 多线程结束:join(析构中join)
通常,我们进行多线程的时候,需要等待所有线程执行完毕了,再进行退出。
~producer_consumer(){
if(producer -> joinable()){
producer -> join();
}
if(consumer -> joinable()){
consumer -> join();
}
produce_is_not_finished_flag = false;
std::cout << "producer_consumer end" << std::endl;
}
四、完整代码
1. 完整producer_consumer.cpp 程序
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <functional>
#include <condition_variable>
uint64_t num = 0;
class producer_consumer{
public:
producer_consumer(){
producer = new std::thread(std::bind(&producer_consumer::produce, this));
consumer = new std::thread(std::bind(&producer_consumer::consume, this));
}
~producer_consumer(){
if(producer -> joinable()){
producer -> join();
}
if(consumer -> joinable()){
consumer -> join();
}
produce_is_not_finished_flag = false;
std::cout << "producer_consumer end" << std::endl;
}
void produce(){
int produced_production_count = 0;
while(produce_is_not_finished_flag){
std::unique_lock<std::mutex> lockGuard(mutexLock);
while(buffer_q.size() >= max_buffer_q_size){
condition.wait(lockGuard);
}
uint64_t data = num++;
buffer_q.push(data);
std::cout << "task data = " << data << " produced" << std::endl;
produced_production_count++;
if(produced_production_count > total_production_count){
produce_is_not_finished_flag = false;
}
lockGuard.unlock();
condition.notify_all();
}
}
void consume(){
while(consume_is_not_finished_flag){
std::unique_lock<std::mutex> lockGuard(mutexLock);
while(buffer_q.empty()){
condition.wait(lockGuard);
}
uint64_t data = buffer_q.front();
buffer_q.pop();
std::cout << "task data = " << data << " has been consumed" << std::endl;
if(!produce_is_not_finished_flag && buffer_q.empty()){
consume_is_not_finished_flag = false;
}
lockGuard.unlock();
condition.notify_all();
}
}
private:
std::queue<uint64_t> buffer_q{};
std::mutex mutexLock{};
std::condition_variable condition{};
std::thread * producer;
std::thread * consumer;
bool produce_is_not_finished_flag = true;
bool consume_is_not_finished_flag = true;
uint64_t total_production_count = 100;
uint64_t max_buffer_q_size = 10;
};
static producer_consumer * global_PC;
int main() {
global_PC = new producer_consumer();
delete global_PC;
return 0;
}
2. 编译运行命令
编译的时候需要说明使用多线程的动态库pthread (链接阶段链接这个库) 👇
g++ producer_consumer.cpp -o producer_consumer -l pthread
运行命令👇
./producer_consumer
3. 运行结果示例
levi@LEVI1:~/code$ g++ producer_consumer.cpp -o producer_consumer -l pthread
levi@LEVI1:~/code$ ./producer_consumer
task data = 0 produced
task data = 1 produced
task data = 2 produced
task data = 3 produced
task data = 4 produced
task data = 5 produced
task data = 6 produced
task data = 7 produced
task data = 8 produced
task data = 9 produced
task data = 0 has been consumed
task data = 1 has been consumed
task data = 2 has been consumed
task data = 3 has been consumed
task data = 4 has been consumed
task data = 5 has been consumed
task data = 6 has been consumed
task data = 7 has been consumed
task data = 8 has been consumed
task data = 9 has been consumed
task data = 10 produced
task data = 11 produced
task data = 12 produced
task data = 13 produced
task data = 14 produced
task data = 15 produced
task data = 16 produced
task data = 17 produced
task data = 18 produced
task data = 19 produced
task data = 10 has been consumed
task data = 11 has been consumed
task data = 12 has been consumed
task data = 13 has been consumed
task data = 14 has been consumed
task data = 15 has been consumed
task data = 16 has been consumed
task data = 17 has been consumed
task data = 18 has been consumed
task data = 19 has been consumed
task data = 20 produced
task data = 21 produced
task data = 22 produced
task data = 23 produced
task data = 24 produced
task data = 25 produced
task data = 26 produced
task data = 27 produced
task data = 28 produced
task data = 29 produced
task data = 20 has been consumed
task data = 21 has been consumed
task data = 22 has been consumed
task data = 23 has been consumed
task data = 24 has been consumed
task data = 25 has been consumed
task data = 26 has been consumed
task data = 27 has been consumed
task data = 28 has been consumed
task data = 29 has been consumed
task data = 30 produced
task data = 31 produced
task data = 32 produced
task data = 33 produced
task data = 34 produced
task data = 35 produced
task data = 36 produced
task data = 37 produced
task data = 38 produced
task data = 39 produced
task data = 30 has been consumed
task data = 31 has been consumed
task data = 32 has been consumed
task data = 33 has been consumed
task data = 34 has been consumed
task data = 35 has been consumed
task data = 36 has been consumed
task data = 37 has been consumed
task data = 38 has been consumed
task data = 39 has been consumed
task data = 40 produced
task data = 41 produced
task data = 42 produced
task data = 43 produced
task data = 44 produced
task data = 45 produced
task data = 46 produced
task data = 47 produced
task data = 48 produced
task data = 49 produced
task data = 40 has been consumed
task data = 41 has been consumed
task data = 42 has been consumed
task data = 43 has been consumed
task data = 44 has been consumed
task data = 45 has been consumed
task data = 46 has been consumed
task data = 47 has been consumed
task data = 48 has been consumed
task data = 49 has been consumed
task data = 50 produced
task data = 51 produced
task data = 52 produced
task data = 53 produced
task data = 54 produced
task data = 55 produced
task data = 56 produced
task data = 57 produced
task data = 58 produced
task data = 59 produced
task data = 50 has been consumed
task data = 51 has been consumed
task data = 52 has been consumed
task data = 53 has been consumed
task data = 54 has been consumed
task data = 55 has been consumed
task data = 56 has been consumed
task data = 57 has been consumed
task data = 58 has been consumed
task data = 59 has been consumed
task data = 60 produced
task data = 61 produced
task data = 62 produced
task data = 63 produced
task data = 64 produced
task data = 65 produced
task data = 66 produced
task data = 67 produced
task data = 68 produced
task data = 69 produced
task data = 60 has been consumed
task data = 61 has been consumed
task data = 62 has been consumed
task data = 63 has been consumed
task data = 64 has been consumed
task data = 65 has been consumed
task data = 66 has been consumed
task data = 67 has been consumed
task data = 68 has been consumed
task data = 69 has been consumed
task data = 70 produced
task data = 71 produced
task data = 72 produced
task data = 73 produced
task data = 74 produced
task data = 75 produced
task data = 76 produced
task data = 77 produced
task data = 78 produced
task data = 79 produced
task data = 70 has been consumed
task data = 71 has been consumed
task data = 72 has been consumed
task data = 73 has been consumed
task data = 74 has been consumed
task data = 75 has been consumed
task data = 76 has been consumed
task data = 77 has been consumed
task data = 78 has been consumed
task data = 79 has been consumed
task data = 80 produced
task data = 81 produced
task data = 82 produced
task data = 83 produced
task data = 84 produced
task data = 85 produced
task data = 86 produced
task data = 87 produced
task data = 88 produced
task data = 89 produced
task data = 80 has been consumed
task data = 81 has been consumed
task data = 82 has been consumed
task data = 83 has been consumed
task data = 84 has been consumed
task data = 85 has been consumed
task data = 86 has been consumed
task data = 87 has been consumed
task data = 88 has been consumed
task data = 89 has been consumed
task data = 90 produced
task data = 91 produced
task data = 92 produced
task data = 93 produced
task data = 94 produced
task data = 95 produced
task data = 96 produced
task data = 97 produced
task data = 98 produced
task data = 99 produced
task data = 90 has been consumed
task data = 91 has been consumed
task data = 92 has been consumed
task data = 93 has been consumed
task data = 94 has been consumed
task data = 95 has been consumed
task data = 96 has been consumed
task data = 97 has been consumed
task data = 98 has been consumed
task data = 99 has been consumed
task data = 100 produced
task data = 100 has been consumed
producer_consumer end
levi@LEVI1:~/code$
|