线程安全队列
#pragma once
#include <iostream>
#include <string>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <memory>
using namespace std;
template<typename T>
class ThreadSafe_Queue
{
private:
mutable mutex m_mut;
queue<T> m_queue;
condition_variable m_data_cond;
public:
ThreadSafe_Queue() {}
ThreadSafe_Queue(const ThreadSafe_Queue&) = delete;
void push(T data)
{
lock_guard<mutex> lg(m_mut);
m_queue.push(data);
m_data_cond.notify_one();
}
void WaitPop(T&t)
{
unique_lock<mutex> ul(m_mut);
m_data_cond.wait(ul, [this] {return !m_queue.empty(); });
t = m_queue.front();
m_queue.pop();
}
shared_ptr<T> WaitPop()
{
unique_lock<mutex> ul(m_mut);
m_data_cond.wait(ul, [this] {return !m_queue.empty(); });
shared_ptr<T> res(make_shared<T>(m_queue.front()));
m_queue.pop();
return res;
}
bool TryPop(T &t)
{
lock_guard<mutex> lg(m_mut);
if (m_queue.empty())
return false;
t = m_queue.front();
m_queue.pop();
return true;
}
shared_ptr<T> TryPop()
{
lock_guard<mutex> lg(m_mut);
if (m_queue.empty())
return shared_ptr<T>();
shared_ptr<T> res(make_shared<T>(m_queue.front()));
m_queue.pop();
return res;
}
bool IsEmpty()
{
lock_guard<mutex> lg(m_mut);
return m_queue.empty();
}
};
线程主函数代码
#include "threadSafeQueue.h"
using namespace std;
ThreadSafe_Queue<int> g_queue;
int g_index = 10;
void thread_Fuc() {
cout << "thread_fuc1 start\n";
while (true) {
int value = 0;
g_queue.WaitPop(value);
printf("wait_and_pop done! value=%d thread id:%d\n", value, std::this_thread::get_id());
}
}
void thread_Fuc2() {
cout << "thread_fuc2 start\n";
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
g_index++;
g_queue.push(g_index);
}
}
int main() {
thread thd(thread_Fuc);
thd.detach();
thread thd2(thread_Fuc2);
thd2.detach();
int a;
while (cin >> a) { ; }
return 0;
}
运行结果
|