1. 消息队列- ConcurrentQueue
消息队列用一个类:ConcurrentQueue进行封装,拥有成员**_queue**,来存放元素;内部实现push,pop函数,来对队列进行入队和出队操作。
message_queue.h
#ifndef NET_FRAME_CONCURRENT_QUEUE_H
#define NET_FRAME_CONCURRENT_QUEUE_H
#include <queue>
#include <mutex>
#include <condition_variable>
template<class Type>
class ConcurrentQueue {
ConcurrentQueue& operator=(const ConcurrentQueue&) = delete;
ConcurrentQueue(const ConcurrentQueue& other) = delete;
public:
ConcurrentQueue() : _queue(), _mutex(), _condition() { }
virtual ~ConcurrentQueue() { }
void Push(Type record) {
std::lock_guard <std::mutex> lock(_mutex);
_queue.push(record);
_condition.notify_one();
}
bool Pop(Type& record, bool isBlocked = true) {
if (isBlocked) {
std::unique_lock <std::mutex> lock(_mutex);
while (_queue.empty()) {
_condition.wait(lock);
}
}
else
{
std::lock_guard <std::mutex> lock(_mutex);
if (_queue.empty()) {
return false;
}
}
record = std::move(_queue.front());
_queue.pop();
return true;
}
int32_t Size() {
std::lock_guard <std::mutex> lock(_mutex);
return _queue.size();
}
bool Empty() {
std::lock_guard <std::mutex> lock(_mutex);
return _queue.empty();
}
private:
std::queue <Type> _queue;
mutable std::mutex _mutex;
std::condition_variable _condition;
};
#endif
2. 线程池-ThreadPool
封装了ThreadPool类,该类需要指定线程的数量_threads,包含用来存放线程的_workers容器。 在创建线程的时候,需要指定线程的处理函数lambda表达式以及元素的处理函数_handler:将消息队列的元素取出,然后作为_handler函数的形参进行处理。
Submit方法用于往消息队列里增加元素。
thread_pool.h
#ifndef NET_FRAME_THREAD_POOL_H
#define NET_FRAME_THREAD_POOL_H
#include "message_queue.h"
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#define MIN_THREADS 10
template<class Type>
class ThreadPool {
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(const ThreadPool& other) = delete;
public:
ThreadPool(int32_t threads, std::function<void(Type& record)> handler);
virtual ~ThreadPool();
void Submit(Type record);
private:
private:
bool _shutdown;
int32_t _threads;
std::function<void(Type& record)> _handler;
std::vector <std::thread> _workers;
ConcurrentQueue <Type> _tasks;
};
template<class Type>
ThreadPool<Type>::ThreadPool(int32_t threads, std::function<void(Type &record)> handler)
: _shutdown(false),
_threads(threads),
_handler(handler),
_workers(),
_tasks() {
if (_threads < MIN_THREADS)
_threads = MIN_THREADS;
for (int32_t i = 0; i < _threads; ++i)
_workers.emplace_back(
[this] {
while (!_shutdown) {
Type record;
this->_tasks.Pop(record, true);
this->_handler(record);
}
}
);
}
template<class Type>
ThreadPool<Type>::~ThreadPool() {
for (std::thread &worker: _workers)
worker.join();
}
template<class Type>
void ThreadPool<Type>::Submit(Type record) {
_tasks.Push(record);
}
#endif
3. 测试代码
#include"message_queue.h"
#include"thread_pool.h"
#include<iostream>
std::mutex mutex_;
void threadProcess(int value){
std::unique_lock<std::mutex> lock_(mutex_);
std::cout<<"current threadId:"<<std::this_thread::get_id()<<" value:"<<value<<std::endl;
}
int main(){
std::cout<<"main threadId:"<<std::this_thread::get_id()<<std::endl;
ThreadPool<int> threadPool(15,threadProcess);
for(int i=0;i<100000;++i){
threadPool.Submit(i);
}
return 0;
}
4.运行截图
ps-ef |grep main top -H -p 23796 可以看出有16个进程正在运行。
|