前言:用一些习题来说明这三个东西怎么写。说实话,编写线程同步代码并不简单。
完成两个线程通过条件变量实现交替打印的控制
锁+条件变量
不管是C++11还是POSIX,套路都是一样的
首先要强调的两点: 1.缓冲区队列是临界资源,因此进入就要加锁 2.条件变量是临界资源,进行判断条件是否满足和修改条件前,必须加锁 3.条件变量是一个具体的条件,我们经常还会定义一个全局的变量来表示一个条件。 这点很重要,就比如这一道题目。
POSIX库的流程
C++11流程:
c++11代码:
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
using namespace std;
int main()
{
mutex mtx;
condition_variable cv;
bool flag = true;
thread t1([&]{
for(int i = 1; i <= 100; i += 2)
{
unique_lock<mutex> lck(mtx);
cv.wait(lck, [&]{return flag == true;});
cout << this_thread::get_id() << ' ' << i << endl;
flag = false;
cv.notify_one();
}
});
thread t2([&]{
for(int i = 2; i <= 100; i += 2)
{
unique_lock<mutex> lck(mtx);
cv.wait(lck, [&]{return flag == false;});
cout << this_thread::get_id() << ' ' << i << endl;
flag = true;
cv.notify_one();
}
});
t1.join(), t2.join();
}
在c++里面,写线程有很多种写法,传给线程的函数可以是lambda,可以是静态成员函数,可以是包装器等等。但是不可以是成员函数。因为线程函数是全局的。并不随对象销毁而消失。
我们可以发现一个显著的特点,哪个线程函数的条件判断一开始为true,哪个线程就先开始执行,先执行的线程通过改变条件,再唤醒下一个线程,从而实现同步机制。
二元信号量
条件变量其实就是二元的信号量封装而来的。参考王道操作系统上面的公式: 使用信号量实现同步机制的操作口诀为:先V后P。
ps:这是实现单次同步的做法。要实现死循环式的同步,要定义两个信号量。
具体如下:
这样就会先执行thread1,再执行thread2.具体原因要看P和V到底干了什么。
P就是sem_wait,意思是申请资源,申请不到就wait。作用是用来判断条件是否满足的。
P是原语(一段代码组成,这段代码是原子的,使用关中断和开中断来实现原子性),它的代码如下: V是还资源的意思。 V也是原语,代码如下: 因此只要初始化sem为0,那么P操作就一定会陷入阻塞。因为没有资源给他申请,先进行V操作的线程就会先执行,从而实现同步。
信号量代码: 我没有删除条件变量的代码,为的就是进行二者对比。
int main()
{
sem_t s, t;
sem_init(&s, 0, 0);
sem_init(&t, 0, 1);
bool flag = true;
thread t1([&]{
for(int i = 1; i <= 100; i += 2)
{
sem_wait(&t);
cout << this_thread::get_id() << ' ' << i << endl;
sem_post(&s);
}
});
thread t2([&]{
for(int i = 2; i <= 100; i += 2)
{
sem_wait(&s);
cout << this_thread::get_id() << ' ' << i << endl;
sem_post(&t);
}
});
t1.join(), t2.join();
}
这里必须要用两个条件变量才可以实现死循环式的同步。一个是不行的。不要问为什么,用信号量来写同步代码并没有什么一成不变的套路,必须得分析。
你可以认为是有两个同步动作,因此要两个二元信号量。
对比一下条件变量和二元信号量
条件变量是由二元信号量演变来的,使用条件变量可以更容易写出线程同步代码,对于新手建议用条件变量来编写同步代码。套路清晰+编码容易,不用那么多PV操作。但是多元信号量的作用还是无法被条件变量替代的。后面讲生产者消费者模型的时候讲。(在写生产者消费者模型的时候,仍然建议你写条件变量的阻塞版本,PV容易写错。)
按序打印
按序打印 这道题不需要循环的同步,只需要一次同步即可。那么用PV也很好解决了,当然,还是推荐些条件变量的版本。
条件变量版
还是上面的模板,唯一改变的就是条件值变了,k == 1代表执行线程1,k == 2代表执行线程2, k == 3代表执行线程3
class Foo {
public:
condition_variable cv;
mutex mtx;
int k = 0;
Foo() {
}
void first(function<void()> printFirst) {
printFirst();
k = 1;
cv.notify_all();
}
void second(function<void()> printSecond) {
unique_lock<mutex> lock(mtx);
cv.wait(lock, [&]{return k == 1;});
printSecond();
k = 2;
cv.notify_all();
}
void third(function<void()> printThird) {
unique_lock<mutex> lock(mtx);
cv.wait(lock, [&]{return k == 2;});
printThird();
}
};
二元信号量版
直接先V后P即可。两个同步动作,因此要两个二元信号量。
#include <semaphore.h>
class Foo {
public:
sem_t s, t;
Foo() {
sem_init(&s, 0, 0);
sem_init(&t, 0, 0);
}
void first(function<void()> printFirst) {
printFirst();
sem_post(&s);
}
void second(function<void()> printSecond) {
sem_wait(&s);
printSecond();
sem_post(&t);
}
void third(function<void()> printThird) {
sem_wait(&t);
printThird();
}
};
生产者消费者阻塞队列版本
条件变量+锁(C++11)
用POSIX还是一样的,这里写C++11版本 还是一样的套路,没有任何区别。直接默写模板就好了。 唯一的区别就在于这里的条件并不是简单的1,2,3这样的东西,而是判断队列数据个数与capacity之间的关系。 贴代码:
struct Task
{
Task()
{
x = rand() % 100 + 1;
}
void handler()
{
cout << "随机数生成任务完成,随机数是 " << x << endl;
}
int x;
};
template<class T>
class BlockingQueue
{
public:
BlockingQueue(int _cap = 5) :cap(_cap){}
static void Productor(BlockingQueue* bq)
{
int cnt = 10;
while(cnt--){
unique_lock<mutex> lck(mtx);
cv.wait(lck, [&]{return bq->q.size() < bq->cap;});
T* task = new T();
bq->Put(task);
cv.notify_one();
}
}
static void Consumer(BlockingQueue* bq)
{
int cnt = 10;
while(cnt--){
unique_lock<mutex> lck(mtx);
cv.wait(lck, [&]{return bq->q.size() > 0;});
T* task = bq->Get();
task->handler();
delete task;
cv.notify_one();
}
}
void Put(T* task)
{
q.push(task);
}
T* Get()
{
T* res = q.front();
q.pop();
return res;
}
private:
static mutex mtx;
static condition_variable cv;
queue<T*> q;
int cap;
};
template<class T>
mutex BlockingQueue<T>::mtx;
template<class T>
condition_variable BlockingQueue<T>::cv;
int main()
{
BlockingQueue<Task>* bq = new BlockingQueue<Task>();
thread t1 = thread(BlockingQueue<Task>::Productor, bq);
thread t2 = thread(BlockingQueue<Task>::Consumer, bq);
t1.join(), t2.join();
}
多元信号量
这个东西有点绕。由于这里的条件并不是简单的1,2,3这样的值,而是判断队列数据个数与capacity之间的关系,因此要用多元信号量。
和第一题轮流打印奇数偶数有点像,都是两个先V后P。
template<class T>
class RingQueue
{
public:
RingQueue(int _ca = 5) :ca(_ca)
{
v.resize(1000);
sem_init(&data, 0, 0);
sem_init(&block, 0, ca);
thread t1([&]{
for(int i = 0; i < 10; i++)
{
T* task = new Task();
Put(task);
}
});
thread t2([&]{
for(int i = 0; i < 10; i++)
{
sleep(1);
T* task = Get();
task->handler();
delete task;
}
});
t1.join(), t2.join();
}
void Put(T* task)
{
sem_wait(&block);
unique_lock<mutex> lck(mtx);
q.push(task);
sem_post(&data);
}
T* Get()
{
sem_wait(&data);
unique_lock<mutex> lck(mtx);
T* res = q.front();
q.pop();
sem_post(&block);
return res;
}
private:
sem_t data;
sem_t block;
int ca;
queue<T*> q;
mutex mtx;
};
int main()
{
BlockingQueue<Task>* rq = new BlockingQueue<Task>();
}
用普通的队列来写是要加锁的(生产者和消费者加同一把锁)。这点参考王道操作系统。
原因很简单,就是因为队列是临界资源,生产者和消费者都看得见。因此要在生产者和消费者之间加锁。其实最重要的是阻塞队列用的是队列,先进先出。生产者和消费者都是从头部拿数据,容易有线程安全问题。
环形队列
环形队列和普通队列的区别是什么?搞清楚这个问题才可以了解下面的东西。
普通队列是先进先出。 环形队列其实就是一个数组。从尾部放数据,从头部拿数据。
条件变量
用条件变量来写环形队列和用条件变量写阻塞队列是一样的。还是要加锁,生产者和消费者还是串行进入缓冲区队列的。
多元信号量(无锁化编程)
重点在这里,用多元信号量写环形队列。生产者消费者之间不需要加锁。这有个专门的术语,叫无锁化编程。
#include <iostream>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <vector>
#include <unistd.h>
using namespace std;
class Task
{
public:
int x, y;
public:
Task(int _x, int _y) :x(_x), y(_y) {}
int add()
{
return x + y;
}
};
template<class T>
class RingQueue
{
private:
vector<T> v;
sem_t sem_data;
sem_t sem_block;
int c_i;
int p_i;
int cap;
public:
RingQueue(int _cap = 10) :cap(_cap), c_i(0), p_i(0)
{
v.resize(10);
sem_init(&sem_data, 0, 0);
sem_init(&sem_block, 0, cap);
}
~RingQueue()
{
sem_destroy(&sem_data);
sem_destroy(&sem_block);
}
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Get(T &out)
{
P(sem_data);
out = v[c_i];
c_i++;
c_i = c_i % cap;
V(sem_block);
}
void Put(T &out)
{
P(sem_block);
v[p_i++] = out;
p_i = p_i % cap;
V(sem_data);
}
};
-
问题来了:这里的缓冲区队列可是临界资源啊,我们这么写不加锁应该不对吧? -
实际上是没有问题的。虽然我们没有实现生产者和消费者之间的互斥,只实现了生产者和消费者的同步关系(生产者先生产,消费者再拿数据)。 -
但是生产者和消费者不会同时拿到同一块数据,因为环形队列是从头部拿数据,从尾部放数据,生产者和消费者不会在同一个区域逗留。那么它们两拿的任务就不是临界资源了(虽然都去访问缓冲区队列这个临界资源了),因此生产者和消费者之间不用加锁。 -
如果有多个生产者和多个消费者,多个生产者之间加同一把锁,多个消费者之间加同一把锁就可以保证临界资源(任务)的安全了。还是不需要在生产者和消费者之间加同一把锁。
上面讲的4条长篇大论,可以说明一个道理,多元信号量是允许多个执行流访问共享资源的。但是其中的线程安全问题要你自己保证。这里由于使用了环形队列,因此线程安全问题得以保证。
当然,多元信号量+环形队列实现生产者消费者还是可以像阻塞队列一样,一旦进入缓冲区队列就加锁,但是这没必要,且影响效率。
因为在访问缓冲区队列中加了锁之后,生产者和消费者只有一个线程能进入缓冲区队列,而做不到不加锁之前,允许一个生产者和一个消费者同时进入缓冲区队列。
这种写法得到的效果和条件变量是一致的。条件变量由于也是加了锁,它只允许一个执行流进入临界资源。(因为在wait和signal之前,都是加了锁的)
条件变量和多元信号量的对比
上面讲那么多,总结一下:
- 如果生产者消费者加了同一把锁,多元信号量就完全等价于条件变量。都是只能有一个执行流访问共享资源,其他拿不到锁的执行流,直接丢到阻塞队列或者挂起队列里面等待。
- 如果生产者消费者允许同时访问缓冲区队列,那么多元信号量就不等价于条件变量。这也是信号量允许多执行流进入共享资源的一个实例。这是无锁化编程的重要组成部分
|