模型
实现流程:
前面已经介绍了EPoller类,EPoller主要监听的是Channel对象,每一个Channel对象会绑定一个文件描述符(fd_),fd_上绑定要监听的事件。当epoll监听到就绪事件时,会将就绪事件添加到激活队列中,激活队列在事件循环(EventLoop的loop中)被循环调用,当激活队列不为空时,依次调用Channel注册的回调。
源码分析
update
update是对上提供的接口,主要调用的是EventLoop类的updateChannel接口将当前对象更新至poller。
void Channel::update()
{
addedToLoop_ = true;
loop_->updateChannel(this);
}
remove
同理,remove调用的是EventLoop类的removeChannel接口删除当前对象
void Channel::remove()
{
assert(isNoneEvent());
addedToLoop_ = false;
loop_->removeChannel(this);
}
设置回调
void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
bool isWriting() const { return events_ & kWriteEvent; }
bool isReading() const { return events_ & kReadEvent; }
什么时候会设置回调?例如上层连接建立成功后,会设置读使能。
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); // 通道读使能
connectionCallback_(shared_from_this()); // 连接回调
}
设置激活事件类型
channel.h中
void set_revents(int revt) { revents_ = revt; }
poller中处理激活队列是会设置通道事件类型,便于Channel处理handleEvent时使用
void PollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0)
{
--numEvents;
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel* channel = ch->second;
assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents); // 设置激活通道类型
// pfd->revents = 0;
activeChannels->push_back(channel); // 加入激活队列
}
}
}
handleEvent
处理激活的Channel事件,由Poller更新激活的Channel列表,EventLoop::loop()根据激活Channel列表,逐个执行Channel中已注册好的相应回调。
void Channel::tie(const std::shared_ptr<void>& obj)
{
tie_ = obj; // std::shared_ptr是强引用的智能指针, 可以直接赋值给tie_
tied_ = true; // 设置绑定标志
}
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock(); // weak_ptr::lock可将weak_ptr提升为shared_ptr
if (guard) // 通过非空判断可以确定guard是否存在
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
LOG_TRACE << reventsToString();
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
}
// 根据不同的事件类型, 执行不同的回调
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
tie_对象
tie_是一个使用std::weak_ptr维护的对象,目的是为了解决循环引用的问题。关于循环引用的问题,可以参考:https://blog.csdn.net/www_dong/category_10702918.html?spm=1001.2014.3001.5482
定义:
std::weak_ptr<void> tie_;
TcpConnection建立连接成功的时候调用tie将自身指针传给Channel。
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this()); // 绑定当前对象, shared_from_this()主要是为了解决TcpConnection先于Channel被析构情况可能带来的问题
channel_->enableReading();
connectionCallback_(shared_from_this());
}
关于shared_from_this,可以参考:https://blog.csdn.net/www_dong/article/details/111999624
|