定时器的设计与实现
定时器的应用
应用场景一:keep alive 保活机制(心跳检测)
成千上万个客户端连接服务器,但有些客户端可能连上服务器之后就再也没有发送数据过,为了将最好的资源分配给有效的连接,每个客户端需要定时向服务器发送心跳包,如果服务器一段时间检测不到心跳包,则自动断开连接。
应用场景二:每隔一段时间需要执行的特定操作
1.游戏技能冷却 2.地图野怪刷新 3.倒计时
定时器的设计
在设计定时器的数据结构时需要达到以下几点要求:
- 有序的结构,且增加删除操作不影响该结构有序;
- 能快速查找最小节点;
综上定时器的设计可以使用以下几个方式:红黑树、最小堆、单层级时间轮、多层级时间轮, 接下来我会从算法的层面分析这几个实现方式的优缺点。
算法名称 | 增加节点时间复杂度 | 查找节点时间复杂度 | 删除节点时间复杂度 | 删除最小节点的时间复杂度 |
---|
红黑树 | lg(n) | lg(n) | lg(n) | 即最左下角的那个节点lg(n) | 最小堆 | lg(n) | lg(n) | n | 最上面的节点O(1) | 单层级时间轮 | O(1) | O(1) | O(1) | O(1) | 多层级时间轮 | O(1) | O(1) | O(1) | O(1) |
可见在众多算法中,时间轮算法是最好的。
定时器的实现
红黑数与最小堆
#include <stdio.h>
#include <sys/epoll.h>
#include "mh-timer.h"
void hello_world(timer_entry_t *te) {
printf("hello world time = %u\n", te->time);
}
int main() {
init_timer();
add_timer(3000, hello_world);
int epfd = epoll_create(1);
struct epoll_event events[512];
for (;;) {
int nearest = find_nearest_expire_timer();
int n = epoll_wait(epfd, events, 512, nearest);
for (int i=0; i < n; i++) {
}
expire_timer();
}
return 0;
}
这两者实现的方法类似,只是底层数据结构改变。 二者都采用健值的方式存储,key是到期时间,alue是执行的回调函数。
单层级时间轮
原理
单层级时间轮的数据结构其实就是链表,中间的圆圈是时间链表,外面的是定时任务链表,指针tick会绕着时间轮旋转,每次添加时间事件的时候,将事件放入(tick+delay) % MAX_TIME_WHEEL_SIZE的链表的末尾,每次tick转到相应的时间刻度,取出其中的时间事件并执行。
缺陷
1.可定时的时间长度与精度取决于时间轴的刻度,这也是单层时间轴的局限所在。
解决方案
1.增加round(轮次)属性:就像跑步一样,为每个任务加上round的计数器,当指针转过一圈时,round计数器减1,所以只需取出round=0的任务即需要触发的任务。
优点:可减少时间格过大造成的空间浪费
缺点:每次都需要遍历对应格子里面全部的任务列表,特别是当时间轮刻度粒度很小,任务列表特别长时,耗时将会大大增加。
2.采用层级时间轮:类似时钟一样,有时钟、分钟和秒钟。 层级时间轮可以进一步扩展为天轮、月轮甚至年轮。
代码实现
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#if defined(__APPLE__)
#include <AvailabilityMacros.h>
#include <sys/time.h>
#include <mach/task.h>
#include <mach/mach.h>
#else
#include <time.h>
#endif
#define MAX_TIMER ((1<<17)-1)
#define MAX_CONN ((1<<16)-1)
typedef struct conn_node {
uint8_t used;
int id;
} conn_node_t;
typedef struct timer_node {
struct timer_node *next;
struct conn_node *node;
uint32_t idx;
} timer_node_t;
static timer_node_t timer_nodes[MAX_TIMER] = {0};
static conn_node_t conn_nodes[MAX_CONN] = {0};
static uint32_t t_iter = 0;
static uint32_t c_iter = 0;
timer_node_t * get_timer_node() {
t_iter++;
while (timer_nodes[t_iter & MAX_TIMER].idx > 0) {
t_iter++;
}
timer_nodes[t_iter].idx = t_iter;
return &timer_nodes[t_iter];
}
conn_node_t * get_conn_node() {
c_iter++;
while (conn_nodes[c_iter & MAX_CONN].used > 0) {
c_iter++;
}
return &conn_nodes[c_iter];
}
#define TW_SIZE 16
#define EXPIRE 10
#define TW_MASK (TW_SIZE - 1)
static uint32_t tick = 0;
typedef struct link_list {
timer_node_t head;
timer_node_t *tail;
}link_list_t;
void add_conn(link_list_t *tw, conn_node_t *cnode, int delay) {
link_list_t *list = &tw[(tick+EXPIRE+delay) & TW_MASK];
timer_node_t * tnode = get_timer_node();
cnode->used++;
tnode->node = cnode;
list->tail->next = tnode;
list->tail = tnode;
tnode->next = NULL;
}
void link_clear(link_list_t *list) {
list->head.next = NULL;
list->tail = &(list->head);
}
void check_conn(link_list_t *tw) {
int32_t itick = tick;
tick++;
link_list_t *list = &tw[itick & TW_MASK];
timer_node_t *current = list->head.next;
while (current) {
timer_node_t * temp = current;
current = current->next;
conn_node_t *cn = temp->node;
cn->used--;
temp->idx = 0;
if (cn->used == 0) {
printf("fd:%d kill down\n", cn->id);
temp->next = NULL;
continue;
}
printf("fd:%d used:%d\n", cn->id, cn->used);
}
link_clear(list);
}
static time_t
current_time() {
time_t t;
#if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER)
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (time_t)ti.tv_sec;
#else
struct timeval tv;
gettimeofday(&tv, NULL);
t = (time_t)tv.tv_sec;
#endif
return t;
}
int main() {
memset(timer_nodes, 0, MAX_TIMER * sizeof(timer_node_t));
memset(conn_nodes, 0, MAX_CONN * sizeof(conn_node_t));
link_list_t tw[TW_SIZE];
memset(tw, 0, TW_SIZE * sizeof(link_list_t));
for (int i = 0; i < TW_SIZE; i++) {
link_clear(&tw[i]);
}
{
conn_node_t *node = get_conn_node();
node->id = 10001;
add_conn(tw, node, 0);
add_conn(tw, node, 5);
}
{
conn_node_t *node = get_conn_node();
node->id = 10002;
add_conn(tw, node, 0);
}
{
conn_node_t *node = get_conn_node();
node->id = 10003;
add_conn(tw, node, 3);
}
time_t start = current_time();
for (;;) {
time_t now = current_time();
if (now - start > 0) {
for (int i=0; i<now-start; i++)
check_conn(tw);
start = now;
printf("check conn tick:%d\n", tick);
}
usleep(20000);
}
return 0;
}
多层级时间轮
多层级时间轮其实就是多个单层级时间轮的集合,就如同我们的时钟一样,秒针转动一圈,分针走一格,分针转动一圈,时针走一格,设计思路也是一样,设计多个链表,每个链表所代表的精度不同,根据设定的时间不同放入不同的链表中,让变量tick++,加到哪里便利那个链表,执行相应任务。 由上图可见,除了第一级时间轮是精确的时间,从第二级到第五级都存储的是一个时间范围,只要落在这个时间范围内的任务都存在这个链表中,等到tick遍历到2-5层的节点时,只需要把这些节点取出来,再做一次插入操作,这样他们就全部变成第一层的节点了。
代码实现
线程间互斥采用自旋锁,因为互斥资源是纯计算类,并不涉及系统调用,所以互斥锁耗费资源相比于自旋锁要大。
#ifndef _SPINLOCK_H
#define _SPINLOCK_H
struct spinlock {
int lock;
};
static void spinlock_init(struct spinlock *lock) {
lock->lock = 0;
}
static void spinlock_lock(struct spinlock *lock) {
while (__sync_lock_test_and_set(&lock->lock, 1)) {}
}
static int spinlock_trylock(struct spinlock *lock) {
return __sync_lock_test_and_set(&lock->lock, 1) == 0;
}
static void spinlock_unlock(struct spinlock *lock) {
__sync_lock_release(&lock->lock);
}
static void spinlock_destroy(struct spinlock *lock) {
(void) lock;
}
#endif
#ifndef TIMEWHEEL_H
#define TIMEWHEEL_H
#include<pthread.h>
#include<iostream>
#include<list>
#include<string.h>
#include"spinlock.h"
using namespace std;
#define TIME_NEAR_SHIFT 8
#define TIME_NEAR (1 << TIME_NEAR_SHIFT)
#define TIME_LEVEL_SHIFT 6
#define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)
#define TIME_NEAR_MASK (TIME_NEAR-1)
#define TIME_LEVEL_MASK (TIME_LEVEL-1)
typedef void (*handler_pt) (struct timeNode *node);
struct timeNode;
struct link_list {
list<timeNode *> head;
};
class timeWheel
{
public:
timeNode* add_timer(int time, handler_pt func, int threadid);
static timeWheel* GetInstance();
void timer_update();
void del_timer(timeNode *node);
void clear_timer();
private:
timeWheel();
uint64_t gettime();
void expire_timer();
void add_node(timeNode *node);
void move_list(int level, int idx);
void link(link_list *list, timeNode *node);
void timer_shift();
void dispatch_list(list<timeNode *> *current);
void timer_execute();
list<timeNode *> link_clear(link_list *list);
private:
static timeWheel *m_Instance;
link_list m_near[TIME_NEAR];
link_list m_t[4][TIME_LEVEL];
struct spinlock m_lock;
uint32_t m_time;
uint64_t m_current;
uint64_t m_current_point;
};
struct timeNode
{
uint32_t m_expire;
handler_pt m_callback;
uint8_t m_cancel;
int id;
};
#endif
#include "timewheel.h"
timeWheel* timeWheel::m_Instance = NULL;
timeWheel::timeWheel()
{
for(int i=0;i<TIME_NEAR;i++)
{
m_near->head.clear();
}
for(int i=0;i<4;i++)
{
for(int j=0;j<TIME_LEVEL;j++)
{
m_t[i][j].head.clear();
}
}
spinlock_init(&m_lock);
m_time = 0;
m_current = 0;
m_current_point = gettime();
}
timeWheel* timeWheel::GetInstance()
{
if (m_Instance == NULL)
m_Instance = new timeWheel();
return m_Instance;
}
timeNode* timeWheel::add_timer(int time, handler_pt func, int threadid)
{
timeNode *node = (timeNode *)malloc(sizeof(*node));
spinlock_lock(&m_lock);
node->m_expire = time+m_time;
node->m_callback = func;
node->id = threadid;
if (time <= 0) {
node->m_callback(node);
spinlock_unlock(&m_lock);
free(node);
return NULL;
}
add_node(node);
spinlock_unlock(&m_lock);
return node;
}
void timeWheel::timer_update()
{
spinlock_lock(&m_lock);
timer_execute();
timer_shift();
timer_execute();
spinlock_unlock(&m_lock);
}
void timeWheel::del_timer(timeNode *node)
{
node->m_cancel=1;
}
uint64_t timeWheel::gettime()
{
uint64_t t;
#if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER)
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (uint64_t)ti.tv_sec * 100;
t += ti.tv_nsec / 10000000;
#else
struct timeval tv;
gettimeofday(&tv, NULL);
t = (uint64_t)tv.tv_sec * 100;
t += tv.tv_usec / 10000;
#endif
return t;
}
void timeWheel::expire_timer()
{
uint64_t cp = gettime();
if (cp != m_current_point) {
int diff = (uint32_t)(cp - m_current_point);
m_current_point = cp;
int i;
for (i=0; i<diff; i++) {
timer_update();
}
}
}
void timeWheel::clear_timer()
{
for(int i=0;i<TIME_NEAR;i++)
{
m_near->head.clear();
}
for(int i=0;i<4;i++)
{
for(int j=0;j<TIME_LEVEL;j++)
{
m_t[i][j].head.clear();
}
}
}
void timeWheel::link(link_list *list, timeNode *node)
{
list->head.push_back(node);
}
void timeWheel::add_node(timeNode *node)
{
uint32_t time=node->m_expire;
uint32_t current_time=m_time;
uint32_t msec = time - current_time;
if (msec < TIME_NEAR) {
link(&m_near[time&TIME_NEAR_MASK],node);
} else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) {
link(&m_t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node);
} else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) {
link(&m_t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
} else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) {
link(&m_t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
} else {
link(&m_t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
}
}
void timeWheel::move_list(int level, int idx)
{
list<timeNode *> *current = &m_t[level][idx].head;
while (!current->empty()) {
timeNode *temp=current->front();
current->pop_front();
add_node(temp);
}
current->clear();
}
void timeWheel::timer_shift()
{
int mask = TIME_NEAR;
uint32_t ct = ++m_time;
if (ct == 0) {
move_list(3, 0);
} else {
uint32_t time = ct >> TIME_NEAR_SHIFT;
int i=0;
while ((ct & (mask-1))==0) {
int idx=time & TIME_LEVEL_MASK;
if (idx!=0) {
move_list(i, idx);
break;
}
mask <<= TIME_LEVEL_SHIFT;
time >>= TIME_LEVEL_SHIFT;
++i;
}
}
}
void timeWheel::dispatch_list(list<timeNode *> *current)
{
while (!current->empty())
{
timeNode * temp = current->front();
current->pop_front();
if (temp->m_cancel == 0)
temp->m_callback(temp);
free(temp);
}
}
void timeWheel::timer_execute()
{
int idx = m_time & TIME_NEAR_MASK;
while (!m_near[idx].head.empty())
{
list<timeNode *> *current = &m_near[idx].head;
spinlock_unlock(&m_lock);
dispatch_list(current);
current->clear();
spinlock_lock(&m_lock);
}
}
list<timeNode *> timeWheel::link_clear(link_list *list)
{
std::list<timeNode *> head=list->head;
list->head.clear();
return head;
}
测试代码:
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>
#include "timewheel.h"
struct context {
int quit;
int thread;
};
struct thread_param {
struct context *ctx;
timeWheel *tw;
int id;
};
static struct context ctx = {0};
void do_timer(timeNode *node) {
printf("timer expired:%d - thread-id:%d\n", node->m_expire, node->id);
}
void* thread_worker(void *p) {
struct thread_param *tp = (struct thread_param *)p;
int id = tp->id;
struct context *ctx = tp->ctx;
while (!ctx->quit) {
int expire = rand() % 200;
tp->tw->add_timer(expire, do_timer, id);
usleep(expire*(10-1)*1000);
}
printf("thread_worker:%d exit!\n", id);
return NULL;
}
void do_quit(timeNode * node) {
ctx.quit = 1;
}
int main() {
timeWheel *tw=timeWheel::GetInstance();
srand(time(NULL));
ctx.thread = 8;
pthread_t pid[ctx.thread];
tw->add_timer(600, do_quit, 100);
struct thread_param task_thread_p[ctx.thread];
int i;
for (i = 0; i < ctx.thread; i++) {
task_thread_p[i].id = i;
task_thread_p[i].ctx = &ctx;
task_thread_p[i].tw = tw;
if (pthread_create(&pid[i], NULL, thread_worker, &task_thread_p[i])) {
fprintf(stderr, "create thread failed\n");
exit(1);
}
}
while (!ctx.quit) {
tw->expire_timer();
usleep(2500);
}
tw->clear_timer();
for (i = 0; i < ctx.thread; i++) {
pthread_join(pid[i], NULL);
}
printf("all thread is closed\n");
return 0;
}
运行效果图 有不足、疑问请留言
|