一. 什么是POSIX信号量?
POSIX 和System V 都是可移植的操作系统接口标准,它们都定义了操作系统应该为应用程序提供的接口标准。
- POSIX信号量和System V信号量作用相同,都是用于同步和互斥操作,以达到无冲突的访问共享资源目的。
- System V版本的信号量只适用于实现进程间的通信,而POSIX版本的信号量主要用于实现线程之间的通信。
信号量(信号灯)本质是一个是用来对临界资源进行更细粒度地描述和管理的计数器。
二. 为什么要有POSIX信号量?
POSIX信号量主要用于实现线程间的同步。
三. POSIX信号量实现原理
信号量的结构如下:
count :记录还有多少小块的临界资源未被使用。queue :当count为0时,其它未申请到信号量的线程的task_struct地址会被放到信号量等待队列中阻塞挂起。
信号量的PV操作
- P操作:我们把申请信号量得操作称为P操作,申请信号量的本质就是申请获得整块临界资源中某小块资源的使用权限,当申请成功时临界资源中小块资源的数目应该减一,因此P操作的本质就是让
count - -。 - V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让
count ++。
申请不到信号量的线程被阻塞挂起
当count 为0时,表示不允许其它线程再访问临界资源,这时其它申请信号量的线程会被阻塞到该信号量的等待队列中,直到其它线程释放信号量。
四. POSIX信号量接口函数
1. 创建、初始化信号量
信号量的类型是sem_t ,我们可以根据这个类型自己定义信号量对象: 定义出信号量对象之后,必须用sem_init()函数来初始化这个信号量: 参数说明:
- sem:信号量对象的地址。
- pshared:0表示线程间共享,非零表示进程间共享。
- value:信号量初始值,即count的大小。
函数说明:该函数主要用于设置信号量对象的基本属性。
2. 销毁信号量
函数说明:只需传入信号量对象的地址即可销毁该信号量。
3. 等待(申请)信号量
函数说明:传入信号量对象的地址用于申请该信号量,调用成功返回0,count- -;失败返回-1,count值不变。
4. 发布(释放)信号量
函数说明:传入信号量对象的地址用于释放该信号量,调用成功返回0,count++;失败返回-1,count值不变。
五. 信号量的应用
1. 二元信号量模拟互斥锁
当count = 1 时,说明整块临界资源作为一个整体使用而没有被切分管理,那么这个信号量对象就相当于是一把互斥锁,称为二元信号量。
下面我们用二元信号量模拟互斥锁完成黄牛抢票代码:
- 在主线程中创建4个新线程去抢10张票。
- 此时票是临界资源,我们用二元信号量对其进行保护。
- 每个新线程抢票之前都要先申请二元信号量,没有申请到线程被阻塞挂起。
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
class MySem
{
public:
MySem(size_t num)
{
sem_init(&_sem, 0, num);
}
~MySem()
{
sem_destroy(&_sem);
}
void P()
{
sem_wait(&_sem);
}
void V()
{
sem_post(&_sem);
}
private:
sem_t _sem;
};
int count = 10;
MySem sem(1);
void* GetTickets(void* arg)
{
while(true)
{
size_t id = (size_t)arg;
sem.P();
if(count > 0)
{
usleep(1000);
cout<<'['<<"thread "<<id<<']'<<" get ticket No."<<count--<<endl;
sem.V();
}
else
{
sem.V();
break;
}
}
return nullptr;
}
int main()
{
pthread_t tids[4];
for(size_t i = 0; i < 4; ++i)
{
pthread_create(&tids[i], nullptr, GetTickets, (void*)(i+1));
}
for(size_t i = 0; i < 4; ++i)
{
pthread_join(tids[i], nullptr);
}
return 0;
}
编译运行,由于我们没有实现同步所以都是第一个创建的1号线程申请到信号量,但是最终票的结果是对的,说明互斥是实现了的:
2. 基于环形队列的生产者消费者模型
2.1 基本规则
- 生产者只关心是否还有格子用来生产数据。
- 消费者只关心环形队列中是否还有数据。
- 一开始没有数据,生产者和消费者指向同一个位置,这时生产者要先执行生产操作,消费者阻塞挂起;数据满时,生产者和消费者也指向同一个位置,这时消费者先执行消费操作再轮到生产者生产。
- 生产者和消费者不能同时访问队列中的同一个位置。
- 生产者和消费者可以并发访问环形队列中的不同位置。
2.2 环形队列的实现
成员变量说明:
- 这里用一个数组来模拟环形队列,因为生产者和消费者要并发执行且不能同时操作相同位置的数据,刚好数组可以通过下标随机访问数据,所以这里我们选用数组。
- 定义了两个无符号整型对象
_proPos 和_cusPos 分别指向生产者要生产数据的格子下标和消费者要拿取数据的位置下标。 - 还定义了
_proSem 和_cusSem 两个信号量对象,分别记录着环形队列中格子数量和以生产数据个数。 - 最后还有必要记录环形队列的容量大小,可以用它来取模更新
_proPos 和_cusPos 的值。
#pragma once
#include <vector>
#include <time.h>
#include <iostream>
#include <unistd.h>
#include <semaphore.h>
using namespace std;
const size_t NUM = 8;
template<class T>
class RingQueue
{
public:
RingQueue(size_t num = NUM)
:_v(num)
,_cusPos(0)
,_proPos(0)
,_capacity(num)
{
sem_init(&_cusSem, 0, 0);
sem_init(&_proSem, 0, num);
}
~RingQueue()
{
sem_destroy(&_cusSem);
sem_destroy(&_proSem);
}
void Push(const T& inData)
{
P(_proSem);
_v[_proPos] = inData;
V(_cusSem);
++_proPos;
_proPos %= _capacity;
}
void Pop(T& outData)
{
P(_cusSem);
outData = _v[_cusPos];
V(_proSem);
++_cusPos;
_cusPos %= _capacity;
}
private:
void P(sem_t& s)
{
sem_wait(&s);
}
void V(sem_t& s)
{
sem_post(&s);
}
sem_t _cusSem;
sem_t _proSem;
size_t _cusPos;
size_t _proPos;
vector<T> _v;
size_t _capacity;
};
成员函数说明:
- 这里特意封装了信号量的PV操作,只需把信号量对象作为参数传入就能完成信号量的申请、释放操作。
- 生产者执行Push()操作生产数据时,需要先申请(减一)
_proSem 信号量,生产完成后释放(加一)_cusPos 信号量,让消费者来消费。反之亦然
2.3 单生产者单消费者
在主线程中创建两个新线程分别代表生产者和消费者,消费者每隔一秒地从环形队列中拿取数据,生产者每隔一秒生产一个数据:
#include "RingQueue.h"
void* Customer(void* arg)
{
RingQueue<int>* q = (RingQueue<int>*)arg;
while(true)
{
sleep(1);
int getData;
q->Pop(getData);
cout<<"[Customer] pop data:"<<getData<<endl;
}
}
void* Producer(void* arg)
{
RingQueue<int>* q = (RingQueue<int>*)arg;
while(true)
{
sleep(1);
int putData = (rand()%100) + 1;
q->Push(putData);
cout<<"[Producer] push data:"<<putData<<endl;
}
}
int main()
{
srand((size_t)time(nullptr));
RingQueue<int>* q = new RingQueue<int>;
pthread_t tid1, tid2;
pthread_create(&tid1, nullptr, Customer, (void*)q);
pthread_create(&tid2, nullptr, Producer, (void*)q);
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
delete q;
return 0;
}
编译运行,由于_proSem 初始值为0,一开始没有数据生产者线程要挂起等待,消费者生产一个数据,生产者就拿取一个数据:
接下来我们让生产者生产得快,消费者消费的慢: 编译运行,发现生产者生产的数据瞬间把队列填满了,接下来消费者拿走一个数据,生产者再生产一个数据,二者串行执行: 如果消费者消费得快,生产者生产得慢的话,可以推测结果是生产者生产完一个数据,消费者马上就拿走,然后继续等待生产者生产数据,这个就不在做演示了。
2.4 多生产者多消费者
这次我们在主线程中分别新建三个生产者线程、三个消费者线程。生产者之间竞争proLock 这把锁,消费者之间竞争cusLock 这把锁,竞争到锁的线程才能去生产或拿取数据,它们完成一次操作后释放锁,然后重新内部竞争:
#include "RingQueue.h"
pthread_mutex_t cusLock;
pthread_mutex_t proLock;
RingQueue<int>* q = new RingQueue<int>;
void* Customer(void* arg)
{
while(true)
{
size_t id = (size_t)arg;
int getData;
pthread_mutex_lock(&cusLock);
q->Pop(getData);
pthread_mutex_unlock(&cusLock);
cout<<'['<<"Customer "<<id<<']'<<" Pop data:"<<getData<<endl;
sleep(1);
}
}
void* Producer(void* arg)
{
size_t id = (size_t)arg;
while(true)
{
int putData = (rand()%100) + 1;
pthread_mutex_lock(&proLock);
q->Push(putData);
pthread_mutex_unlock(&proLock);
cout<<'['<<"Producer "<<id<<']'<<" push data "<<putData<<endl;
sleep(1);
}
}
int main()
{
pthread_mutex_init(&cusLock, nullptr);
pthread_mutex_init(&proLock, nullptr);
srand((size_t)time(nullptr));
pthread_t cusTids[3];
pthread_t proTids[3];
for(size_t i = 0; i < 3; ++i)
{
pthread_create(&cusTids[i], nullptr, Customer, (void*)(i+1));
}
for(size_t i = 0; i < 3; ++i)
{
pthread_create(&proTids[i], nullptr, Producer, (void*)(i+1));
}
for(size_t i = 0; i < 3; ++i)
{
pthread_join(cusTids[i], nullptr);
}
for(size_t i = 0; i < 3; ++i)
{
pthread_join(proTids[i], nullptr);
}
delete q;
pthread_mutex_destroy(&cusLock);
pthread_mutex_destroy(&proLock);
return 0;
}
编译运行,生产和消费操作并发执行:
六. 信号量和条件变量的区别
信号量既可以实现同步还可以实现互斥,而条件变量只能实现同步;条件变量需要搭配互斥锁使用,而信号量通过自身计数器实现同步的条件判断,不需要搭配互斥锁使用。
|