queue实现阻塞队列
#include <iostream>
#include <queue>
#include <unistd.h>
#include <pthread.h>
using namespace std;
template <typename T>
class blockQueue
{
private:
int _capacity;
queue<T> _bq;
pthread_mutex_t _bqmtx;
pthread_cond_t _bqfull;
pthread_cond_t _bqempty;
private:
void _bqLock()
{
pthread_mutex_lock(&_bqmtx);
}
void _bqUnlock()
{
pthread_mutex_unlock(&_bqmtx);
}
void _bqNotifyFull()
{
pthread_cond_signal(&_bqfull);
}
void _bqNotifyEmpty()
{
pthread_cond_signal(&_bqempty);
}
void _bqWaitFull()
{
pthread_cond_wait(&_bqfull, &_bqmtx);
}
void _bqWaitEmpty()
{
pthread_cond_wait(&_bqempty, &_bqmtx);
}
bool _bqIsFull()
{
return _bq.size() == _capacity;
}
bool _bqIsEmpty()
{
return _bq.size() == 0;
}
public:
blockQueue(const int &capacity);
~blockQueue();
void pushBlockQueue(const T &data);
void popBlockQueue(T &data);
};
template <typename T>
blockQueue<T>::blockQueue(const int &capacity):_capacity(capacity)
{
pthread_mutex_init(&_bqmtx, nullptr);
pthread_cond_init(&_bqfull, nullptr);
pthread_cond_init(&_bqempty, nullptr);
}
template <typename T>
blockQueue<T>::~blockQueue()
{
pthread_mutex_destroy(&_bqmtx);
pthread_cond_destroy(&_bqfull);
pthread_cond_destroy(&_bqempty);
}
template <typename T>
void blockQueue<T>::pushBlockQueue(const T &data)
{
_bqLock();
while (_bqIsFull())
{
_bqWaitEmpty();
}
_bq.push(data);
_bqNotifyFull();
_bqUnlock();
}
template <typename T>
void blockQueue<T>::popBlockQueue(T &data)
{
_bqLock();
while (_bqIsEmpty())
{
_bqWaitFull();
}
data = _bq.front();
_bq.pop();
_bqNotifyEmpty();
_bqUnlock();
}
void *Producter(void *arg)
{
blockQueue<int> *bq = (blockQueue<int> *)arg;
srand((unsigned long)time(nullptr));
while (1)
{
int data;
bq->pushBlockQueue(data = rand() % 1024);
printf("Product data : %d\n", data);
usleep(800000);
}
return nullptr;
}
void *Consumer(void *arg)
{
blockQueue<int> *bq = (blockQueue<int> *)arg;
srand((unsigned long)time(nullptr));
while (1)
{
int data;
bq->popBlockQueue(data);
printf("Consumer get data : %d\n", data);
usleep(1000000 + rand() % 1000000);
}
return nullptr;
}
int main(int argc, char *argv[])
{
blockQueue<int> bq(1000000);
pthread_t prod, cons;
pthread_create(&prod, nullptr, Producter, (void *)&bq);
pthread_create(&cons, nullptr, Consumer, (void *)&bq);
pthread_join(prod, nullptr);
pthread_join(cons, nullptr);
return 0;
}
|