const int MAX_ITEM = 20;
std::mutex mx;
std::condition_variable cv;
class Queue
{
public:
Queue()
{}
~Queue()
{}
void put(int val, int index)
{
std::unique_lock<std::mutex> lock(mx);
while (q.size() == MAX_ITEM)
{
cv.wait(lock);
}
q.push_back(val);
cv.notify_all();
cout << "producer:" << index << "value:" << endl;
}
int get(int index)
{
unique_lock<std::mutex> lock(mx);
while (q.empty())
{
cv.wait(lock);
}
int val = q.front();
q.pop_front();
cv.notify_all();
cout << "Consumer:" << index << "val:" << val << endl;
return val;
}
private:
deque<int> q;
};
void producer(std::shared_ptr<Queue> q, int index)
{
for(int i =0;i<100;++i)
{
q->put(i, index);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(std::shared_ptr<Queue> q,int index)
{
for (int i = 0; i < 100; ++i)
{
q->get(index);
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
int main()
{
std::shared_ptr<Queue> q(new Queue());
thread p1(producer, q, 1);
thread c1(consumer, q, 1);
thread p2(producer, q, 2);
thread c2(consumer, q, 2);
p1.join();
c1.join();
p2.join();
c2.join();
return 0;
}
template<class T>
class Queue
{
enum { QUSIZE = 8 };
T* data;
int front;
int rear;
int size;
int maxsize;
public:
Queue() :data(nullptr), front(0), rear(0), size(0), maxsize(QUSIZE)
{
data = new T[maxsize];
}
~Queue()
{
free(data);
data = nullptr;
front = rear = -1;
size = 0;
maxsize = 0;
}
int Capt() const { return maxsize; }
int Size() const { return size; }
bool Empty() const { return Size() == 0; }
bool Full()const { return Size() == maxsize; }
bool push(const T& val)
{
if (Full()) return false;
data[rear] = val;
rear = (rear + 1) % maxsize;
size += 1;
return true;
}
bool Front(T& val)
{
if (Empty()) return false;
val = data[front];
front = (front + 1) % maxsize;
size -= 1;
return true;
}
};
Queue<int> iq;
std::mutex mx;
std::condition_variable cv;
const int maxsize = iq.Capt();
int number = 0;
void producer(int index)
{
std::unique_lock<std::mutex> lock(mx);
for (int i = 0; i < 100; ++i)
{
cv.wait(lock, []()->bool {return iq.Size() <= 0; });
iq.push(++number);
cout << "producer:" << index << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
cv.notify_all();
}
}
void consumer(int index)
{
int value = 0;
std::unique_lock<std::mutex> lock(mx);
for (int i = 0; i < 100; ++i)
{
cv.wait(lock, []()->bool {return !iq.Empty(); });
iq.Front(value);
cout << "consumer:" << index << "value:" << value << endl;
cv.notify_all();
}
}
int main()
{
std::thread p1(producer, 1);
std::thread p2(producer, 2);
std::thread c1(consumer, 1);
std::thread c2(consumer, 2);
p1.join();
p2.join();
c1.join();
c2.join();
return 0;
}
上面代码中,生产者不会同时进入去插入数据,不存在生产者的竞争关系
锁的粒度
例如我们有不同的线程去访问一个表,我们第一种锁的方案是表锁,当线程A进入则对该表进行上锁,直到线程A访问结束线程B才能进入访问
而第二种方案是行锁,即对每一行进行上锁,这样哪怕线程A与线程B都对该表进行访问,但是不访问同一行则可以同时访问
- 锁的粒度越大,并发程度越低,耗损资源越少
- 锁的粒度越小,并发程度越高,耗损资源越多
生产者消费者模式 malloc
std::mutex mx;
condition_variable cv;
std::list<size_t> s_list;
std::multimap<size_t, void*> svmap;
void func_alloc()
{
std::unique_lock<std::mutex> lock(mx);
while (1)
{
while (s_list.size() == 0)
{
cv.wait(lock);
}
size_t sn = s_list.front();
s_list.pop_front();
void* p = malloc(sn);
svmap.insert(std::multimap<size_t, void*>::value_type(sn, p));
cv.notify_all();
}
}
void* my_malloc(size_t sz)
{
std::unique_lock<std::mutex> lock(mx);
s_list.push_back(sz);
cv.notify_all();
while (svmap.count(sz) == 0)
{
cv.wait(lock);
}
auto it = svmap.find(sz);
void* p = it->second;
svmap.erase(it);
cv.notify_all();
return p;
}
int main()
{
std::thread tha(func_alloc);
std::thread thb(func_alloc);
int* p = (int *)my_malloc(sizeof(int) * 10);
int* s = (int*)my_malloc(sizeof(int));
tha.join();
thb.join();
}
|