定时器的实现一般需要借助系统提供的超时相关接口,比如select、 条件变量、或者sleep, usleep等,sleep,usleep提供的睡眠功能太有限,无法中途唤醒,这就导致他们其实不适合做定时器中的定时方法。而select和条件变量都可以设置定时时长,而且在中途可以唤醒,精度也很高,因此,可以采用select或条件变量作为定时方法,本文讨论使用条件变量如何实现定时任务。
条件变量可以设置超时时间,如果条件变量在没有接收到来自其他线程的条件通知时,将一直阻塞,直到超时。通过这个特性可以按照定时器需要等待的时间让任务线程挂起,直到超时,说明有定时任务可以处理。这里需要有一个专门的线程用来等待超时事件,在这个线程里面每次循环开始,检查超时任务需要等待的时间,将线程通过条件变量的wait_for方法阻塞,当超时时,将超时时间到来的事件进行处理,或者中途如果有新的事件也可以通过notify_one方法唤醒正在阻塞的条件变量,提前结束阻塞,此时可能并没有超时事件需要执行,但是可以将新的定时任务添加到超时等待队列中。
在超时等待任务线程中会处理两种事件,一个是超时任务,一个是添加新的定时任务。
while(1){
};
采用时间轮方式管理定时任务。时间轮类似于一个时钟,时钟的指针每个刻度表示一个时间间隔。本次设计的定时器精度为1ms,那么就确定时间轮每个刻度为1ms。定时任务可以比喻为日程,分布在未来的某个时间点上。时间轮指针每毫秒转动一个刻度,类似于时钟的指针转动。比如在某个时间T1添加了一个5ms的定时任务task1,那么将此任务放置到从T1刻度开始第5个刻度的位置,当时间轮指针转动到第5个刻度位置时,说明定时任务task1超时时间到,此时就可以执行task1,这就是时间轮定时器的原理。从数据结构上看,时间轮类似于一个环形的哈希表,每个哈希桶里面是一个链表,链表里面的定时任务按照超时时间从小到大排列,当时间轮指针转动到某个时间点时,从链表头开始检查时间是否超时,如果超时则执行任务,直到遇到未超时任务,结束检查。
上图表示时间轮的数据结构和不同超时时间的任务被存放的位置,通过这种数据结构,可以使得超时任务的查找效率达到O(1)。当需要插入一个定时任务到时间轮数据结构时,将当前时间+超时时间得到未来时间,对未来时间进行哈希,将任务定位到某个哈希桶里面,再按照从小到大的顺序插入到链表中,插入的时间复杂度依赖于哈希冲突情况,当某一个哈希桶中存在很多任务时,插入效率将会是O(n),这里可以进一步通过红黑树进行优化,提高插入效率。
时间轮定时器可以每个时间刻度检查一次,这种方式,定时器需要经常被唤醒,如果时间刻度为1ms,那么定时器每1ms都会检查一次超时事件,这种方式使得定时器的检查频率很高,存在某些时刻可能并没有超时事件需要处理,但仍然被唤醒的情况。可以采用每次计算下一次超时事件的到来时间,时间轮不需要每ms都去检查,而是根据下一次定时任务的超时时间,这样能够减少无用的唤醒和时间检查,减少无畏的cpu消耗。
时间的获取采用c++11提供的std::chrono::stead_clock::now,stead_clock获取到的是一个单调递增的时间,不会受系统时间的影响。
定时任务是用户传入的,定时任务的运行在定时器线程中,如果用户任务存在耗时操作,那么将影响定时器的精度,因此,定时器中通过线程池的方式将用户的定时任务放到线程池中执行,保证了定时器的精度。
通过测试,该实现可以达到误差1ms
hello world dt:14, timeout:15, count:40001
hello world dt:50, timeout:50, count:40002
hello world dt:37, timeout:37, count:40003
hello world dt:62, timeout:63, count:40004
hello world dt:14, timeout:15, count:40005
hello world dt:2, timeout:2, count:40006
hello world dt:41, timeout:42, count:40007
hello world dt:87, timeout:88, count:40008
hello world dt:50, timeout:51, count:40009
hello world dt:33, timeout:34, count:40010
hello world dt:54, timeout:55, count:40011
hello world dt:115, timeout:116, count:40012
hello world dt:1, timeout:2, count:40013
hello world dt:81, timeout:82, count:40014
hello world dt:110, timeout:111, count:40015
代码如下:
#ifndef __TIMER_H__
#define __TIMER_H__
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
#include <list>
#include <functional>
#include "ThreadPool.h"
struct TimedTask;
namespace timer{
class Timer
{
public:
typedef std::function<void()> Task;
static const int WHELL_LEN = 1024;
static const int INDEX_MASK = WHELL_LEN - 1;
public:
Timer();
~Timer();
public:
void AddTask(const Task& task, int msec);
private:
void ThreadRoutine();
int GetNextWaitMs();
void Tick(long long tickCount);
void AddNewTask(const std::vector<TimedTask*>& taskList);
private:
bool _stop;
long long _tickCount;
std::mutex _mutex;
std::thread _thread;
std::condition_variable _cond;
threadpool::ThreadPool _threadPool;
std::vector<TimedTask*> _inputTaskList;
std::list<TimedTask*> _timedTask[WHELL_LEN];
std::chrono::time_point<std::chrono::steady_clock> _startTime;
};
}
#endif
#include <assert.h>
#include "Timer.h"
using namespace timer;
struct TimedTask
{
TimedTask(const Timer::Task& task, int timeout);
Timer::Task task;
int timeout;
long long deadline;
};
TimedTask::TimedTask(const Timer::Task& task, int timeout)
: task(task)
, timeout(timeout)
, deadline(0)
{
}
Timer::Timer()
: _stop(false)
, _tickCount(0)
{
_startTime = std::chrono::steady_clock::now();
_thread = std::thread(&Timer::ThreadRoutine, this);
}
Timer::~Timer()
{
{
std::lock_guard<std::mutex> lock(_mutex);
_stop = true;
}
_cond.notify_one();
_thread.join();
}
void Timer::AddTask(const Task& task, int msec)
{
if(msec <= 0){
_threadPool.AddTask(task);
return;
}else{
std::lock_guard<std::mutex> lock(_mutex);
_inputTaskList.push_back(new TimedTask(task, msec));
}
_cond.notify_one();
}
void Timer::Tick(long long tickCount)
{
const int index = tickCount & INDEX_MASK;
auto& taskList = _timedTask[index];
for(auto itr = taskList.begin(); itr != taskList.end();){
auto task = *itr;
if(tickCount >= task->deadline){
_threadPool.AddTask(task->task);
itr = taskList.erase(itr);
delete task;
continue;
}
break;
}
}
void Timer::ThreadRoutine()
{
while(!_stop){
long long lastMillSec = _tickCount;
int waitMillSec = GetNextWaitMs();
assert(waitMillSec);
std::vector<TimedTask*> taskList;
std::cv_status status = std::cv_status::no_timeout;
{
std::unique_lock<std::mutex> lock(_mutex);
while(_inputTaskList.size() == 0 && status == std::cv_status::no_timeout && !_stop){
if(waitMillSec > 0){
status = _cond.wait_for(lock, std::chrono::milliseconds(waitMillSec));
}else{
_cond.wait(lock);
}
}
if(_stop){
return;
}
if(_inputTaskList.size() > 0){
taskList.swap(_inputTaskList);
}
}
_tickCount = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - _startTime).count();
long long dt = _tickCount - lastMillSec;
if(dt >= waitMillSec && waitMillSec != -1){
for(int i = 0; i <= dt - waitMillSec; ++i){
Tick(lastMillSec + waitMillSec + i);
}
}
if(taskList.size() > 0){
AddNewTask(taskList);
}
}
}
int Timer::GetNextWaitMs()
{
int minMs = -1;
bool findNext = false;
const int startIndex = _tickCount & INDEX_MASK;
for(int i = 1; i <= WHELL_LEN; ++i){
const int index = (startIndex + i) & INDEX_MASK;
auto& tasks = _timedTask[index];
if(tasks.size() > 0){
long long dt = tasks.front()->deadline - _tickCount;
if(findNext){
if(dt < minMs){
minMs = dt;
}
}else{
assert(dt > 0);
minMs = dt;
findNext = true;
}
if(minMs < WHELL_LEN){
break;
}
}
}
assert(minMs != 0);
return minMs;
}
void Timer::AddNewTask(const std::vector<TimedTask*>& taskList)
{
for(auto& task : taskList){
assert(task->timeout > 0);
task->deadline = _tickCount + task->timeout;
const int index = task->deadline & INDEX_MASK;
auto& bucket = _timedTask[index];
if(bucket.empty() || task->deadline > bucket.back()->deadline){
bucket.push_back(task);
}else{
for(auto itr = bucket.begin(); itr != bucket.end(); ++itr){
auto& t = *itr;
if(task->deadline <= t->deadline){
bucket.insert(itr, task);
break;
}
}
}
}
}
ThreadPool的实现:链接
|