生产者消费者是极其经典的并发同步模型,描述了在共享固定大小缓冲区下,生产者生产一定量数据放入缓冲区,而消费者则从缓冲区消费取出一定量数据。
生产者消费者问题还有一个名字是有限缓冲问题。
生产者消费者更加强调了两边写缓冲线程的角色,而有限缓冲则将目光聚焦到中间的缓冲
semaphore 实现
semaphore(信号量)是一个变量或者抽象数据类型,被用于控制在并发系统临界区问题多个线程对公共资源的访问。与自旋锁、栅栏一起作为同步手段。一个普通的信号量是单纯取决于程序员定义条件而改变的变量。
可以将信号量看作是特定资源的数量记录,并耦合调整该记录的安全操作。
生产者消费者的有限缓冲实际上就可以看作是特定资源,因此可以使用信号量来记录有限缓冲的数量。
以下是c++版本的实现,当然为了控制对buffer写操作时可能碰到的多个生产者消费者同时取放数据导致的写冲突,也在其中加了互斥锁
#include <thread>
#include <mutex>
#include <semaphore>
std::counting_semaphore<N> number_of_queueing_portions{0};
std::counting_semaphore<N> number_of_empty_positions{N};
std::mutex buffer_manipulation;
void producer(){
for(;;) {
Portion portion = produce_next_portion();
number_of_empty_positions.acquire();
{
std::lock_guard<std::mutex> g(buffer_manipulation);
add_portion_to_buffer(portion);
}
number_of_queueing_portions.release();
}
}
void consumer(){
for(;;) {
number_of_queueing_portions.acquire();
Portion portion;
{
std::lock_guard<std::mutex> g(buffer_manipulation);
portion = take_portion_from_buffer();
}
number_of_empty_positions.release();
process_portion_taken(portion);
}
}
int main(int argc, char const *argv[])
{
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
monitor实现
monitor(管程)是允许线程具有互斥、等待(堵塞)某个条件为false的能力的抽象数据结构。还具有通知其他线程他们特定条件已经满足的机制,以及让他们暂时放弃独占访问,以便等待某些条件满足,然后重新获取独占访问并恢复他们的任务。
管程由互斥锁以及特定条件变量组成。条件变量本质上是等待特定条件的线程的容器
管程和信号量一个明显的区别就是信号量是对共享资源数量的记录,wait(),notify()是仅有能修改共享资源数量的记录的方法,并且这种修改是互斥的
管程则是条件变量(等待满足特定条件线程容器)以及操作该变量的程序
class Bounded_buffer{
Portion buffer[N];
unsigned head, tail;
unsigned count;
std::condition_variable nonempty, nonfull;
std::mutex mtx;
public:
void append(Portion x){
std::unique_lock<std::mutex> lck(mtx);
nonfull.wait(lck, [&]{return !(N==count);});
assert(0<=count && count < N)
buffer[tail++]=x;
tail%=N;
++count;
nonempty.notify_one();
}
Portion remove() {
std::unique_lock<std::mutex> lck(mtx);
nonempty.wait(lck,[&]{return !(0==count);});
assert(0 < count && count <= N)
Portion x = buffer[head++];
head %= N;
--count;
nonfull.notify_one();
return x;
}
Bounded_buffer() {
head = 0; tail = 0; count = 0;
}
}
channel实现
channel是一种通过消息传递实现进程间通信和同步的模型。消息可以通过通道发送,而另一个进程或线程能够接收通过它引用channel发送的消息,比如流。通道不同实现可以被缓冲,也可以不被缓冲,可以是异步或者同步的。
channel实现在golang中是极为优雅的
var element = 0
func produce() int {
element++
return element
}
func consume(e int) {
}
const (
producerCount = 2
consumerCount = 2
bufferSize = 1
)
func main() {
ch:=make(chan int, bufferSize)
for i := 0; i < producerCount; i++ {
go func() {
ch<-produce()
}()
}
for j := 0; j < consumerCount; j++ {
go func() {
consume(<-ch)
}()
}
}
无semaphore、monitors实现
在单生产者和消费者的时候,可以定义一个容量为b(b>=1)的buffer。使k为大于b的常数并为b的倍数,s、r为0到k-1之间的整数。
在初始化时s=r且buffer为空。生产者放入消息到buffer[s mod b],消费者取出消息buffer[r mod b]
生产者消费者最重要的是互斥有限缓冲的写操作以及同步生产者消费者。在原子操作下的buffer中同一个位置不可能同时存在有资源以及没有资源两种情况,因此无需互斥。单生产者消费者,仅仅只需要考虑生产者和消费者之间同步,而忙等待判断是否为空或者超出容量做到了这点
当然还要考虑到在调度器切换时,可能一个线程读取了变量值,切换到第二个线程更改了该值,再切回来,那么第一个线程将使用旧值,而不是当前值,因此需要通过原子操作来解决这个问题
enum {
N = 4
};
Message buffer[N];
std::atomic<unsigned> count{0};
void producer() {
unsigned tail{0};
for (;;) {
Message msg = produceMessage();
while (N == count);
buffer[tail++] = msg;
tail%=N;
count.fetch_add(1,std::memory_order_relaxed);
}
}
void consumer(){
unsigned head {0};
for(;;){
while (count==0);
Message msg = buffer[head++];
head%=N;
count.fetch_sub(1,std::memory_order_relaxed);
consumeMessage(msg);
}
}
注意代码中while的使用,这是为了防止一些竞争情况的出现。比如某消费者在数据放入缓冲区时被唤醒,但另一个消费者在管程上等待了一段时间并移除了该数据。所以如果是if,可能会出现放入缓冲区的数据项过多,或移除空缓冲区中的元素的情况。
Ref
- https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
- https://en.wikipedia.org/wiki/Monitor_(synchronization)
- https://en.wikipedia.org/wiki/Semaphore
- https://en.wikipedia.org/wiki/Channel_(programming)
|