EventLoopThread:执行事件循环的线程 可以看到EventLoopThread打包了一个事件循环EventLoop和一个线程Thread
一、Thread类
我们先来看看Thread类 这个线程类就是封装了pthread_create创建的线程,我们使用C++提供的线程类thread,就不使用Linux原生的线程函数了
1. Thread.h
#pragma once
#include "noncopyable.h"
#include <functional>
#include <thread>
#include <memory>
#include <unistd.h>
#include <string>
#include <atomic>
class Thread : noncopyable{
public:
using ThreadFunc = std::function<void()>;
explicit Thread(ThreadFunc, const std::string& name = std::string());
~Thread();
void start();
void join();
bool started() const { return started_; }
pid_t tid() const { return tid_; }
const std::string& name() const { return name_; }
static int numCreated() { return numCreated_; }
private:
void setDefaultName();
bool started_;
bool joined_;
std::shared_ptr<std::thread> thread_;
pid_t tid_;
ThreadFunc func_;
std::string name_;
static std::atomic_int32_t numCreated_;
};
2. Thread::Thread()和Thread::~Thread()
Thread::Thread(ThreadFunc func, const std::string& name)
: started_(false)
, joined_(false)
, tid_(0)
, func_(std::move(func))
, name_(name)
{
setDefaultName();
}
Thread::~Thread(){
if(started_ && !joined_){
thread_->detach();
}
}
3. Thread::start()
Linux的pthread_create一旦调用,线程就直接启动了;同样的,使用C++的thread定义对象,绑定一个线程函数也是直接启动了,我们需要自己控制线程启动的时机,而不是创建线程的时候就直接启动,所以我们使用智能指针+lambda表达式的方式控制线程的启动
void Thread::start(){
started_ = true;
sem_t sem;
sem_init(&sem, false, 0);
thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
tid_ = CurrentThread::tid();
sem_post(&sem);
func_();
}));
sem_wait(&sem);
}
这里涉及两个线程,线程A调用start并创建新的线程B,假如调用start的线程A走得快,很快就执行到了函数最后,如果新的执行func_的线程B还没有创建出来,线程A就会阻塞在sem_wait,等待子线程B创建
这样的话,如果一旦有一个线程调用了start,那就可以放心的使用这个线程的tid,因为此时这个线程是肯定存在的
4. Thread.cc
#include "Thread.h"
#include "CurrentThread.h"
#include <semaphore.h>
std::atomic_int32_t Thread::numCreated_(0);
Thread::Thread(ThreadFunc func, const std::string& name)
: started_(false)
, joined_(false)
, tid_(0)
, func_(std::move(func))
, name_(name)
{
setDefaultName();
}
Thread::~Thread(){
if(started_ && !joined_){
thread_->detach();
}
}
void Thread::start(){
started_ = true;
sem_t sem;
sem_init(&sem, false, 0);
thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
tid_ = CurrentThread::tid();
sem_post(&sem);
func_();
}));
sem_wait(&sem);
}
void Thread::join(){
joined_ = true;
thread_->join();
}
void Thread::setDefaultName(){
int num = ++numCreated_;
if(name_.empty()){
char buff[32] = {0};
snprintf(buff, sizeof(buff), "Thread%d", num);
name_ = buff;
}
}
二、EventLoopThread类
Thread类只关于一个线程,EventLoopThread类用于绑定一个EventLoop和一个Thread,在一个Thread里创建一个EventLoop,让这个Thread执行一个EventLoop,即:one loop per thread
1. EventLoopThread.h
#include "noncopyable.h"
#include "Thread.h"
#include <functional>
#include <mutex>
#include <condition_variable>
#include <string>
class EventLoop;
class EventLoopThread : noncopyable{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;
EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(), const std::string& name = std::string());
~EventLoopThread();
EventLoop* startLoop();
private:
void threadFunc();
EventLoop* loop_;
bool exiting_;
Thread thread_;
std::mutex mutex_;
std::condition_variable cond_;
ThreadInitCallback callback_;
};
2. EventLoopThread.cc
#include "EventLoopThread.h"
#include "EventLoop.h"
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb, const std::string& name)
: loop_(nullptr)
, exiting_(false)
, thread_(std::bind(&EventLoopThread::threadFunc, this), name)
, mutex_()
, cond_()
, callback_(cb)
{}
EventLoopThread::~EventLoopThread(){
exiting_ = true;
if(loop_ != nullptr){
loop_->quit();
thread_.join();
}
}
EventLoop* EventLoopThread::startLoop(){
thread_.start();
EventLoop* loop = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
while(loop_ == nullptr){
cond_.wait(lock);
}
loop = loop_;
}
return loop;
}
void EventLoopThread::threadFunc(){
EventLoop loop;
if(callback_){
callback_(&loop);
}
{
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop;
cond_.notify_one();
}
loop.loop();
std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}
当我们调用EventLoopThread::startLoop 的时候,底层才创建一个新线程,而这个刚刚创建的新线程需要执行一个线程函数(我们在构造成员变量thread_时传入的EventLoopThread::threadFunc() ),等新创建的线程初始化成员变量EventLoopThread::loop_ 完成后,才会通知调用startLoop 的线程访问成员变量loop_
即调用EventLoopThread::startLoop 就会返回一个EventLoop对象,并且成员变量EventLoopThread::loop_ 也记录了该EventLoop对象的地址
三、EventLoopThreadPool类
EventLoopThreadPoll是一个事件线程池,管理EventLoop,EventLoop绑定的就是一个线程
1. EventLoopThreadPool.h
#pragma once
#include "noncopyable.h"
#include "Thread.h"
#include <functional>
#include <string>
#include <vector>
#include <memory>
class EventLoop;
class EventLoopThread;
class EventLoopThreadPool : noncopyable{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;
EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg);
~EventLoopThreadPool();
void setThreadNum(int numThreads){ numThreads_ = numThreads; }
void start(const ThreadInitCallback& cb = ThreadInitCallback());
EventLoop* getNextLoop();
std::vector<EventLoop*> getAllLoops();
bool started() const{ return started_; }
const std::string name() const { return name_; }
private:
EventLoop* baseLoop_;
std::string name_;
bool started_;
int numThreads_;
long unsigned int next_;
std::vector<std::unique_ptr<EventLoopThread>> threads_;
std::vector<EventLoop*> loops_;
};
-
成员变量baseLoop_:我们使用muduo编写程序的时候,就会定义一个EventLoop变量,这个变量作为TcpServer构造函数的参数,用户创建的就叫做baseLoop -
成员变量numThreads_:表示subReactor的数量。我们使用muduo编写程序的时候,就会定义一个EventLoop变量,这个变量作为TcpServer构造函数的参数,这就是baseLoop。如果我们不使用setThreadNum指定Reactor模型线程数量,那么muduo默认使用单线程模型,这个线程既负责新用户连接,也负责已连接用户的读写事件 -
成员变量loops_:包含了所有创建的subLoop的指针,这些EventLoop对象都是栈上的(见EventLoopThread::threadFunc),不需要我们手动释放。EventLoopThread::threadFunc函数是由线程执行的,EventLoop对象存在于线程栈上
2. EventLoopThreadPool::start()
根据指定的numThreads_创建线程,开启事件循环
void EventLoopThreadPool::start(const ThreadInitCallback& cb){
started_ = true;
for(int i = 0; i < numThreads_; ++i){
char buff[name_.size() + 32];
snprintf(buff, sizeof(buff), "%s%d", name_.c_str());
EventLoopThread* thread = new EventLoopThread(cb, buff);
threads_.push_back(std::unique_ptr<EventLoopThread>(thread));
loops_.push_back(thread->startLoop());
}
if(numThreads_ == 0 && cb){
cb(baseLoop_);
}
}
3. EventLoopThreadPool::getNextLoop()
不使用setThreadNum指定Reactor模型线程数量,那么muduo默认只有一个baseLoop_
EventLoop* EventLoopThreadPool::getNextLoop(){
EventLoop* loop = baseLoop_;
if(!loops_.empty()){
loop = loops_[next_];
++next_;
if(next_ >= loops_.size()){
next_ = 0;
}
}
return loop;
}
4. EventLoopThreadPool.cc
#include "EventLoopThreadPool.h"
#include "EventLoopThread.h"
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg)
: baseLoop_(baseLoop)
, name_(nameArg)
, started_(false)
, numThreads_(0)
, next_(0)
{}
EventLoopThreadPool::~EventLoopThreadPool(){}
void EventLoopThreadPool::start(const ThreadInitCallback& cb){
started_ = true;
for(int i = 0; i < numThreads_; ++i){
char buff[name_.size() + 32];
snprintf(buff, sizeof(buff), "%s%d", name_.c_str(), i);
EventLoopThread* thread = new EventLoopThread(cb, buff);
threads_.push_back(std::unique_ptr<EventLoopThread>(thread));
loops_.push_back(thread->startLoop());
}
if(numThreads_ == 0 && cb){
cb(baseLoop_);
}
}
EventLoop* EventLoopThreadPool::getNextLoop(){
EventLoop* loop = baseLoop_;
if(!loops_.empty()){
loop = loops_[next_];
++next_;
if(next_ >= loops_.size()){
next_ = 0;
}
}
return loop;
}
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops(){
if(loops_.empty()){
return std::vector<EventLoop*>(1, baseLoop_);
}
return loops_;
}
|