?
目录
📚 1.?同步概念
📚 2.?条件变量
条件变量函数
📚 3.?生产者消费者模型
生产者消费者模型优点
生产者消费者的321原则
生产者消费者问题
BlockingQueue的生产者消费者模型
? 🍎作者:努力学习的少年
?🍎个人简介:双非大二,一个正在自学c++和linux操作系统,写博客是总结知识,方便复习
?🍎目标:进大厂
?🍎 如果你觉得文章可以的话,麻烦你给我点个赞和关注,感谢你的关注!
?
📚 1.?同步概念
在并发多线程时,在某些关键点时,需要互相等待消息与互通消息,这种相互制约的等待和互通信息称为线程(进程)同步? 。
举个生活例子:
同步就是操作过程中必须要有先后,比如妈妈做完饭后,儿子才能开始吃饭。
📚 2.?条件变量
互斥量可以防止多个线程同时访问临界资源,而条件变量允许一个线程将某个临界资源的状态变化通知其他线程,在共享资源设定一个条件变量,如果共享资源条件不满足,则让线程到该条件变量下阻塞等待,当条件满足时,其他线程可以唤醒条件变量阻塞等待的线程。
在线程之间有一种情况:线程A需要某个条件才能继续往下执行,如果该条件不成立,此时线程A进行阻塞等待,当线程B运行后使该条件成立后,则唤醒该线程A继续往下执行。
在pthread库中,可以通过条件变量中,可以设定一个阻塞等待的条件,或者唤醒等待条件的线程。
条件变量函数
?在linux操作系统使用pthread_cond_t类型来标识条件变量。
#include <pthread.h>
//条件变量的销毁
int pthread_cond_destroy(pthread_cond_t *cond);
//条件变量的初识化
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
函数执行成功返回0,否则返回错误码,pthread_cond_init是用来初始化一个条件变量,参数为NULL表示默认属性。如果条件变量是静态的可以用
pthread_cond_t cond =PTHREAD_COND_INITIALIZER;,相当于调用函数pthread_cond_init()初始化,并且参数attr为NULL。
条件变量的操作函数
#include <pthread.h>
//唤醒该条件变量的所有线程
int pthread_cond_broadcast(pthread_cond_t *cond);
//唤醒该条件变量中的一个线程
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
//让该线程在某个条件变量进行等待
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
条件变量总是需要与互斥量结合使用,互斥量能限制一个线程能够访问共享资源,条件变量是在共享变量状态改变时发出通知。一个线程调用pthread_cond_wait函数则让它在一个条件变量下进行等待,pthead_cond_wait函数进行三个步骤:
- 释放互斥量(为什么会释放互斥量,下面生产者和消费者模型中的代码会讲)。
- 阻塞等待
- 当被唤醒时,重新获得互斥量并返回。
📚 3.?生产者消费者模型
生产者消费者模型是通过一个容器来解决生产者和消费者的强耦合问题,生产者和消费者彼此不通信,生产者不需要等待消费者是否消费者处理,生产者直接往阻塞队列中生产数据,消费者不找生产者要数据,直接从阻塞队列中拿数据并处理数据,阻塞队列是一个数据缓冲区,这个阻塞队列将消费数据和生产数据进行了解耦。
生产者:生产数据的线程或进程
消费者:消费数据的进程或线程
生产者消费者模型优点
- 生产数据和消费数据进行了解耦,生产者和消费者之间的相互影响较小。
- ?支持并发,生产者和消费者可以并发执行。
- 支持忙闲不均,如果生产者生产数据快,消费者消费数据慢,那么可以创建较多的消费者去消费数据,使消费数据的速度去匹配生产数据的速度。
生产者消费者的321原则
- 3种关系:生产者与生产者(互斥关系),消费者与消费者(互斥关系),生产者与消费者(互斥和同步的关系)
- 2种角色:生产者和消费者(指特定的线程和进程)
- 1个交易场所:有限的空间的缓冲区。
生产者消费者问题
为了保证线程在访问缓冲区出现数据混乱,所以每次只允许一个线程进入缓冲区,即线程与线程之间具有互斥关系,同步关系是生产者和消费者进入缓冲区前需要有条件判断(用条件变量来维持),例如:
- 当缓冲区为空时,消费者不能进入缓冲区消费数据。
- 当缓冲区为满时,生产者不能进入缓冲区生产数据。
- 任何时刻只能有一个线程能进入缓冲区。
BlockingQueue的生产者消费者模型
在多线程中,阻塞队列(BlockingQueue)是一种常见的生产者消费者模型,每次只能有一个线程进入阻塞队列中生产或消费数据,当阻塞队列为空时,消费者则挂起等待,当有一定的数据时,消费者才能进入阻塞队列中消费数据,当阻塞队列为满时,则生产者挂起等待,当有一定空间时,生产者才能进入阻塞队列中生产数据。
实现一个阻塞队列,生产者往队列生产任务,该任务包含数据和运算符,消费者从阻塞队列中取出任务,并根据运算符类型对数据进行运算,将结果打印出来。
模型如下:
?
在实现BlockiingQueue时,我们需要定义3个变量。
- 互斥量:用于访问缓冲区,一次只能有一个线程进入缓冲区
- full条件变量:当缓冲区满时,用于阻塞生产者。
- empty条件变量:当缓冲区为空是,用户阻塞消费者。
BlockingQueue代码实现
#include<iostream>
#include<queue>
#include<pthread.h>
#include<ctime>
#include<cstdio>
#include<cstdlib>
#include<unistd.h>
using namespace std;
int NUM=32;
template<class T>
class BlockQueue
{
private:
queue<T> q;
int cap=NUM;//允许缓冲队列中存储的最多数据
pthread_mutex_t lock;//互斥量
pthread_cond_t _full;//队列满的条件变量
pthread_cond_t _empty;//队列空的条件变量
bool full()//判断缓冲队列是否满了
{
return q.size()==cap;
}
bool empty()//判断缓冲队列是否为空
{
return q.empty();
}
public:
BlockQueue()
{
pthread_mutex_init(&lock,NULL);
pthread_cond_init(&_full,NULL);
pthread_cond_init(&_empty,NULL);
}
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
void push(const T& data)//生产者接口
{
pthread_mutex_lock(&lock);//限制只能一个线程进入缓冲队列
while(full())
{
pthread_cond_wait(&_full,&lock);//缓冲队列为满,则生产者到_full的条件变量挂起等待
}
q.push(data);
pthread_mutex_unlock(&lock);
if(q.size()>16)//当缓冲队列有16个空间时,唤醒消费者
{
pthread_cond_signal(&_empty);
}
}
void pop(T& data)//消费者接口
{
pthread_mutex_lock(&lock);//限制只能有一个线程进入缓冲队列
while(empty())
{
pthread_cond_wait(&_empty,&lock);//当缓冲队列为空时,消费者到_empty中挂起等待
}
data=q.front();//取出队列中的数据
q.pop();//删除取出的数据
pthread_mutex_unlock(&lock);
pthread_cond_signal(&_full);
}
};
计算任务
#include<iostream>
using namespace std;
class task
{
private:
int _x,_y;//计算数据
char _op;//运算符
public:
task(int x,int y,char c)
:_x(x)
,_y(y)
,_op(c) {
}
task() { }
void Run()
{
int ret=0;
switch(_op)//判断运算符
{
case '+':
ret=_x+_y;
break;
case '-':
ret=_x-_y;
break;
case '*':
ret=_x*_y;
break;
case '/':
if(_y==0)
{
cout<<"除法错误"<<endl;
}
else{
ret=_x/_y;
}
break;
}
printf("%d %c %d = %d\n",_x,_op,_y,ret);
}
};
main.cc
#include"BlockQueue.hpp"
#include"task.hpp"
void* comsumer(void* arg)//消费者
{
BlockQueue<task>* q=(BlockQueue<task>*) arg;
while(1)
{
task data;
q->pop(data);//取出任务
data.Run();//对该任务进行运算
sleep(1);
}
return NULL;
}
void* producter(void* arg)//消费者
{
char* arr="+-*/";
BlockQueue<task>* q=(BlockQueue<task>*) arg;
while(1)
{
int x=rand()%100+1;
int y=rand()%30;
char c=arr[rand()%4];
task t(x,y,c);//产生一个任务
q->push(t);//将任务放进阻塞队列中
sleep(1);
}
return NULL;
}
int main()
{
srand((unsigned int)time(NULL));
BlockQueue<task>* q=new BlockQueue<task>();//创建阻塞队列
pthread_t _product,_comsumer;
pthread_create(&_product,NULL,producter,(void*)q);//创建生产者线程
pthread_create(&_comsumer,NULL,comsumer,(void*)q);//创建消费者线程
pthread_join(_product,NULL);
pthread_join(_comsumer,NULL);
return 0;
}
输出结果:
?当然这只是进程中的线程的生产者消费模型。
如果我们学了网络之后,我们可以让客户端中一个一个的下单请求放进我们的阻塞队列中,让服务端取出一个一个下单请求帮我们进行处理,这就是基本的生产者消费者模型。
细节详解:
在使用pthread_cond_wait函数时,我们都需要传入一个互斥量,因为条件变量总是在互斥量之后使用,当我们让某个线程在条件变量中阻塞等待,我们需要释放互斥量,才能使其他线程能够进入阻塞队列中,如果没有释放互斥量,将会造成死锁的现象。?
上一篇:【线程(二)】——互斥量的详细解析
下一篇:【线程(三)】——信号量的详细解析
感谢你的关注和支持!!!
|