0 生产者/消费者模式两种实现原理
我们熟知的生产者/消费者模式都是“有界的”,这里的有界是指缓冲区空间是有限的,缓冲区有填满的时刻。而无界限缓冲区是指缓冲区空间理论上是没有无限大的,没有上上限。 因此,第一种缓冲区有限的生产者/消费者模式在进行产品的存取之前,需要借助条件信号量对缓冲空间“为空”、“为满”进行判断。 第二种缓冲区无限的生产者/消费者只需要判断缓冲空间是否为空即可。 下面以队列作为缓冲区存储结构为例对两种实现原理简述。 无界缓冲区
lock(mutex)
queue.push(x)
unlock(mutex)
notEmpty.signal()
while (queue.empty()) {
notEmpty.wait()
}
queue.pop()
notFull.signal()
有界缓冲区
p(semEmpty)
lock(mutex)
queue.push(x)
unlock(mutex)
v(semFull)
p(semFull)
lock(mutex)
queue.pop()
unlock(mutex)
v(semEmpty)
1 无界缓冲区实现
1.1 类图
1.2 实现代码
BlockingQueue.h
#ifndef MUDUO_BASE_BLOCKINGQUEUE_H
#define MUDUO_BASE_BLOCKINGQUEUE_H
#include "muduo/base/Condition.h"
#include "muduo/base/Mutex.h"
#include <deque>
#include <assert.h>
namespace muduo
{
template<typename T>
class BlockingQueue : noncopyable
{
public:
using queue_type = std::deque<T>;
BlockingQueue()
: mutex_(),
notEmpty_(mutex_),
queue_()
{
}
void put(const T& x)
{
MutexLockGuard lock(mutex_);
queue_.push_back(x);
notEmpty_.notify();
}
void put(T&& x)
{
MutexLockGuard lock(mutex_);
queue_.push_back(std::move(x));
notEmpty_.notify();
}
T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty())
{
notEmpty_.wait();
}
assert(!queue_.empty());
T front(std::move(queue_.front()));
queue_.pop_front();
return front;
}
queue_type drain()
{
std::deque<T> queue;
{
MutexLockGuard lock(mutex_);
queue = std::move(queue_);
assert(queue_.empty());
}
return queue;
}
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
private:
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
queue_type queue_ GUARDED_BY(mutex_);
};
}
#endif
2 有界缓冲区实现
2.1 类图
这里的存储结构为一个环形队列。
2.2 实现代码
BoundedBlockingQueue.h
#ifndef MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#define MUDUO_BASE_BOUNDEDBLOCKINGQUEUE_H
#include "muduo/base/Condition.h"
#include "muduo/base/Mutex.h"
#include <boost/circular_buffer.hpp>
#include <assert.h>
namespace muduo
{
template<typename T>
class BoundedBlockingQueue : noncopyable
{
public:
explicit BoundedBlockingQueue(int maxSize)
: mutex_(),
notEmpty_(mutex_),
notFull_(mutex_),
queue_(maxSize)
{
}
void put(const T& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(x);
notEmpty_.notify();
}
void put(T&& x)
{
MutexLockGuard lock(mutex_);
while (queue_.full())
{
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(std::move(x));
notEmpty_.notify();
}
T take()
{
MutexLockGuard lock(mutex_);
while (queue_.empty())
{
notEmpty_.wait();
}
assert(!queue_.empty());
T front(std::move(queue_.front()));
queue_.pop_front();
notFull_.notify();
return front;
}
bool empty() const
{
MutexLockGuard lock(mutex_);
return queue_.empty();
}
bool full() const
{
MutexLockGuard lock(mutex_);
return queue_.full();
}
size_t size() const
{
MutexLockGuard lock(mutex_);
return queue_.size();
}
size_t capacity() const
{
MutexLockGuard lock(mutex_);
return queue_.capacity();
}
private:
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
boost::circular_buffer<T> queue_ GUARDED_BY(mutex_);
};
}
#endif
3 参考源码
有两份测试代码,参见源码的$MUDUO_HOME/muduo/base/tests/路径下的BlockingQueue_test.cc和BoundedBlockingQueue_test.cc文件。 源码地址:http://github.com/gaoziqiang/gmuduo
|