POSIX信号量
POSIX信号量概念
POSIX 信号量和System V 信号量作用相同,都是用于同步和互斥 操作,以达到无冲突的访问共享资源目的。
POSIX版本的信号量主要用于实现线程之间的通信 ,System V信号量只能用于进程间通信
POSIX信号量本质是一个计数器 ,用来描述临界区内的资源数量,对临界资源进行更细粒度 管理
互斥锁 是把临界区整个锁起来 ,只允许一个线程进入临界区 ,而POSIX信号量 则是把临界区分为n个互不干扰的区域,可以允许n个线程同时进入临界区 ,每个区域之间的数据不受影响,可以让更多的线程同时生产和消费,效率更高。
信号量也叫信号灯
信号量的操作
信号量中资源的申请和释放可以用**P 和V **来表示
对信号量的申请操作,实质上是对计数器进行--操作 ,表示临界区资源数量-1,也就是这个临界区中的一个区域被申请。
对信号量的释放操作,实质上是对计数器的++操作 ,表示临界区资源数量+1,临界区内的一个区域被释放。
信号量操作接口
信号量的初始化(sem_init接口)
sem_init接口
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
- 第一个参数sem,要初始化的信号量
- 第二个参数pshared,传入0值表示线程间共享,传入非零值表示进程间共享
- 第三个参数value,信号量的初始值(临界区内资源数目的初始值)
返回值:
初始化信号量成功返回0,失败返回-1。
信号量的申请/等待(sem_wait接口)
sem_wait接口
int sem_wait(sem_t *sem);
参数:
需要申请的信号量
返回值:
申请信号量成功返回0,失败返回-1。
信号量的释放/发布(sem_post接口)
sem_post接口
int sem_post(sem_t *sem);
参数:
需要释放的信号量
返回值:
释放信号量成功返回0,失败返回-1。
信号量的销毁(sem_destroy接口)
sem_destroy接口
int sem_destroy(sem_t *sem);
参数:
要销毁的信号量
返回值:
销毁信号量成功返回0,失败返回-1。
基于环形队列的生产者消费者模型
只要保证生产者和消费者不对同一个区域进行操作,他们之间就不会有线程安全问题,通过信号量来控制,每当生产者生产一个数据,数据信号量就+1,这时消费者才能申请到区域的读取权限,这时消费者已经到下一个区域内了,如果此时生产者生产的很快,放慢了所有区域的数据,这时blank信号量就变为了0,无法申请到新的空格用来放数据,这样的话生产者和消费者永远无法同时进入同一个小区域,保证了生产者和消费者之间的互斥关系。
队列代码实现:
#pragma once
#include<iostream>
#include<unistd.h>
#include<semaphore.h>
#include<vector>
#define NUM 10
class Task {
public:
int _x, _y;
public:
Task(int x, int y) :_x(x), _y(y)
{}
Task() {}
int run() {
return _x + _y;
}
};
template <class T>
class RingQueue {
private:
std::vector<T> _v;
int max_capacity;
sem_t sem_data;
sem_t sem_blank;
int c_index;
int p_index;
public:
RingQueue(int capacity = NUM) :max_capacity(capacity), _v(capacity) {
sem_init(&sem_data, 0, 0);
sem_init(&sem_blank, 0, max_capacity);
c_index = 0;
p_index = 0;
}
void Get(T& out) {
P(sem_data);
out = _v[c_index];
c_index++;
c_index %= max_capacity;
V(sem_blank);
}
void Put(const T& in) {
P(sem_blank);
_v[p_index] = in;
p_index++;
p_index %= max_capacity;
V(sem_data);
}
void P(sem_t& sem) {
sem_wait(&sem);
}
void V(sem_t& sem) {
sem_post(&sem);
}
~RingQueue() {
sem_destroy(&sem_data);
sem_destroy(&sem_blank);
c_index = 0;
p_index = 0;
}
};
测试:
#include "RingQueue.hpp"
using namespace std;
pthread_mutex_t p_lock;
pthread_mutex_t c_lock;
void* Productor_run(void* arg) {
sleep(1);
RingQueue<Task>* rq = (RingQueue<Task>*)arg;
while (true) {
int x = rand() % 10;
int y = rand() % 100;
Task t(x, y);
pthread_mutex_lock(&p_lock);
rq->Put(t);
pthread_mutex_unlock(&p_lock);
sleep(1);
cout << "Productor is [" << pthread_self() << "]" << " Productor Task is " << x << " + " << y << " = ?" << endl;
}
}
void* Consumer_run(void* arg) {
sleep(2);
RingQueue<Task>* rq = (RingQueue<Task>*)arg;
Task t;
while (true) {
pthread_mutex_lock(&c_lock);
rq->Get(t);
pthread_mutex_unlock(&c_lock);
sleep(1);
cout << "Consumer is [" << pthread_self() << "]" << " Consumer Task is " << t._x << " + " << t._y << " = " << t.run() << endl;
}
}
int main() {
RingQueue<Task> rq;
pthread_t p1, p2, p3, c1, c2, c3;
pthread_create(&p1, nullptr, Productor_run, &rq);
pthread_create(&p2, nullptr, Productor_run, &rq);
pthread_create(&p3, nullptr, Productor_run, &rq);
pthread_create(&c1, nullptr, Consumer_run, &rq);
pthread_create(&c2, nullptr, Consumer_run, &rq);
pthread_create(&c3, nullptr, Consumer_run, &rq);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
return 0;
}
|