以钟表的背景来描述时间轮的运转,时间轮每走一格,就像表钟刻度走一下。 ++tick 60;每秒钟 ++tick 来描述秒针移动;对 让秒针永远在 [0, 59]间移动; 对于时钟来说,它的时间精度(最?运?单元)是1秒;
时间轮的设计
1. 准备一个数组存储连接数据;那么数组长度设置为多少? 参照时钟表盘的运转规律,可以将定时任务根据触发的紧急程度,分布到不同层级的时间轮中;假设时间精度为 10ms ;在第 1 层级每 10ms 移动?格;每移动?格执?该格?当中所有的定时任务;当第 1 层指针从 255 格开始移动,此时层级 2 移动?格;层级 2 移动?格的?为定义为,将该格当中的定时任务重新映射到层级 1 当中;同理,层级 2 当中从 63 格开始移动,层级 3 格?中的定时任务重新映射到层级 2 ; 以此类推层级 4 往层级 3 映射,层级 5 往层级 4 映射; 如何重新映射?定时任务的过期时间对上?层级的?度取余分布在上?层级不同格?当中; 2. 考虑一秒内添加了多条连接,那么可以参考hash结构处理冲突的方式,用链表连接起来; 3. 回到 1 中的问题,如果想 2 中链表稀疏,将数组?度设置??些;如果想紧凑些,则将数组?度设置?些(但是必须?于10); 4. 假设我们设置数组的长度为11;那么检测指针的移动可描述为++point%11;
m
%
n
=
m
?
n
×
f
l
o
o
r
(
m
n
)
m%n =m-n×floor(\frac mn)
m%n=m?n×floor(nm?) 优化: 将
n
n
n替换为
2
k
2^k
2k, 这里
2
k
2^k
2k恰好大于n, 这样以来
m
%
2
k
m%2^k
m%2k可以转化为
m
&
(
2
k
?
1
)
m\&(2^k - 1)
m&(2k?1); 所以我们选择
16
(
2
4
)
16(2^4)
16(24), 那么检测指针移动可优化为
+
+
p
o
i
n
t
&
(
16
?
1
)
++point\&(16-1)
++point&(16?1); 5. 考虑到正常情况下 5 秒钟发送?次?跳包, 10 秒才检测?次,如为 10 的时候并不能踢掉连接;所以需要每收到?次?跳包则 used++ ,每检测?次 used-- ;当检测到used == 0 则踢掉连接;
添加节点
void add_node(s_timer_t *T, timer_node_t *node) {
uint32_t time=node->expire;
uint32_t current_time=T->time;
uint32_t msec = time - current_time;
if (msec < TIME_NEAR) {
link(&T->near[time&TIME_NEAR_MASK],node);
} else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) {
link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node);
} else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) {
link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
} else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) {
link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
} else {
link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
}
}
重新映射
void timer_shift(s_timer_t *T) {
int mask = TIME_NEAR;
uint32_t ct = ++T->time;
if (ct == 0) {
move_list(T, 3, 0);
} else {
uint32_t time = ct >> TIME_NEAR_SHIFT;
int i=0;
while ((ct & (mask-1))==0) {
int idx=time & TIME_LEVEL_MASK;
if (idx!=0) {
move_list(T, i, idx);
break;
}
mask <<= TIME_LEVEL_SHIFT;
time >>= TIME_LEVEL_SHIFT;
++i;
}
}
}
完整代码:
#ifndef _MARK_TIMEWHEEL_
#define _MARK_TIMEWHEEL_
#include <stdint.h>
#define TIME_NEAR_SHIFT 8
#define TIME_NEAR (1 << TIME_NEAR_SHIFT)
#define TIME_LEVEL_SHIFT 6
#define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)
#define TIME_NEAR_MASK (TIME_NEAR-1)
#define TIME_LEVEL_MASK (TIME_LEVEL-1)
typedef struct timer_node timer_node_t;
typedef void (*handler_pt) (struct timer_node *node);
struct timer_node {
struct timer_node *next;
uint32_t expire;
handler_pt callback;
uint8_t cancel;
int id;
};
timer_node_t* add_timer(int time, handler_pt func, int threadid);
void expire_timer(void);
void del_timer(timer_node_t* node);
void init_timer(void);
void clear_timer();
#endif
#ifndef SPINLOCK_H
#define SPINLOCK_H
struct spinlock {
int lock;
};
void spinlock_init(struct spinlock *lock) {
lock->lock = 0;
}
void spinlock_lock(struct spinlock *lock) {
while (__sync_lock_test_and_set(&lock->lock, 1)) {}
}
int spinlock_trylock(struct spinlock *lock) {
return __sync_lock_test_and_set(&lock->lock, 1) == 0;
}
void spinlock_unlock(struct spinlock *lock) {
__sync_lock_release(&lock->lock);
}
void spinlock_destroy(struct spinlock *lock) {
(void) lock;
}
#endif
#include "spinlock.h"
#include "timewheel.h"
#include <string.h>
#include <stddef.h>
#include <stdlib.h>
#include <time.h>
typedef struct link_list {
timer_node_t head;
timer_node_t *tail;
}link_list_t;
typedef struct timer {
link_list_t near[TIME_NEAR];
link_list_t t[4][TIME_LEVEL];
struct spinlock lock;
uint32_t time;
uint64_t current;
uint64_t current_point;
}s_timer_t;
static s_timer_t * TI = NULL;
timer_node_t *
link_clear(link_list_t *list) {
timer_node_t * ret = list->head.next;
list->head.next = 0;
list->tail = &(list->head);
return ret;
}
void
link(link_list_t *list, timer_node_t *node) {
list->tail->next = node;
list->tail = node;
node->next=0;
}
void
add_node(s_timer_t *T, timer_node_t *node) {
uint32_t time=node->expire;
uint32_t current_time=T->time;
uint32_t msec = time - current_time;
if (msec < TIME_NEAR) {
link(&T->near[time&TIME_NEAR_MASK],node);
} else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) {
link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node);
} else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) {
link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
} else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) {
link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
} else {
link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
}
}
timer_node_t*
add_timer(int time, handler_pt func, int threadid) {
timer_node_t *node = (timer_node_t *)malloc(sizeof(*node));
spinlock_lock(&TI->lock);
node->expire = time+TI->time;
node->callback = func;
node->id = threadid;
if (time <= 0) {
node->callback(node);
free(node);
spinlock_unlock(&TI->lock);
return NULL;
}
add_node(TI, node);
spinlock_unlock(&TI->lock);
return node;
}
void
move_list(s_timer_t *T, int level, int idx) {
timer_node_t *current = link_clear(&T->t[level][idx]);
while (current) {
timer_node_t *temp=current->next;
add_node(T,current);
current=temp;
}
}
void
timer_shift(s_timer_t *T) {
int mask = TIME_NEAR;
uint32_t ct = ++T->time;
if (ct == 0) {
move_list(T, 3, 0);
} else {
uint32_t time = ct >> TIME_NEAR_SHIFT;
int i=0;
while ((ct & (mask-1))==0) {
int idx=time & TIME_LEVEL_MASK;
if (idx!=0) {
move_list(T, i, idx);
break;
}
mask <<= TIME_LEVEL_SHIFT;
time >>= TIME_LEVEL_SHIFT;
++i;
}
}
}
void
dispatch_list(timer_node_t *current) {
do {
timer_node_t * temp = current;
current=current->next;
if (temp->cancel == 0)
temp->callback(temp);
free(temp);
} while (current);
}
void
timer_execute(s_timer_t *T) {
int idx = T->time & TIME_NEAR_MASK;
while (T->near[idx].head.next) {
timer_node_t *current = link_clear(&T->near[idx]);
spinlock_unlock(&T->lock);
dispatch_list(current);
spinlock_lock(&T->lock);
}
}
void
timer_update(s_timer_t *T) {
spinlock_lock(&T->lock);
timer_execute(T);
timer_shift(T);
timer_execute(T);
spinlock_unlock(&T->lock);
}
void
del_timer(timer_node_t *node) {
node->cancel = 1;
}
s_timer_t *
timer_create_timer() {
s_timer_t *r=(s_timer_t *)malloc(sizeof(s_timer_t));
memset(r,0,sizeof(*r));
int i,j;
for (i=0;i<TIME_NEAR;i++) {
link_clear(&r->near[i]);
}
for (i=0;i<4;i++) {
for (j=0;j<TIME_LEVEL;j++) {
link_clear(&r->t[i][j]);
}
}
spinlock_init(&r->lock);
r->current = 0;
return r;
}
uint64_t gettime() {
uint64_t t;
struct timespec ti;
clock_gettime(CLOCK_MONOTONIC, &ti);
t = (uint64_t)ti.tv_sec * 100;
t += ti.tv_nsec / 10000000;
return t;
}
void
expire_timer(void) {
uint64_t cp = gettime();
if (cp != TI->current_point) {
uint32_t diff = (uint32_t)(cp - TI->current_point);
TI->current_point = cp;
int i;
for (i=0; i<diff; i++) {
timer_update(TI);
}
}
}
void
init_timer(void) {
TI = timer_create_timer();
TI->current_point = gettime();
}
void
clear_timer() {
int i,j;
for (i=0;i<TIME_NEAR;i++) {
link_list_t * list = &TI->near[i];
timer_node_t* current = list->head.next;
while(current) {
timer_node_t * temp = current;
current = current->next;
free(temp);
}
link_clear(&TI->near[i]);
}
for (i=0;i<4;i++) {
for (j=0;j<TIME_LEVEL;j++) {
link_list_t * list = &TI->t[i][j];
timer_node_t* current = list->head.next;
while (current) {
timer_node_t * temp = current;
current = current->next;
free(temp);
}
link_clear(&TI->t[i][j]);
}
}
}
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <stdlib.h>
#include "timewheel.h"
struct context {
int quit;
int thread;
};
struct thread_param {
struct context *ctx;
int id;
};
static struct context ctx = {0};
void do_timer(timer_node_t *node) {
printf("timer expired:%d - thread-id:%d\n", node->expire, node->id);
}
void* thread_worker(void *p) {
struct thread_param *tp = p;
int id = tp->id;
struct context *ctx = tp->ctx;
while (!ctx->quit) {
int expire = rand() % 200;
add_timer(expire, do_timer, id);
usleep(expire*(10-1)*1000);
}
printf("thread_worker:%d exit!\n", id);
return NULL;
}
void do_quit(timer_node_t * node) {
ctx.quit = 1;
}
int main() {
srand(time(NULL));
ctx.thread = 8;
pthread_t pid[ctx.thread];
init_timer();
add_timer(2000, do_quit, 100);
struct thread_param task_thread_p[ctx.thread];
int i;
for (i = 0; i < ctx.thread; i++) {
task_thread_p[i].id = i;
task_thread_p[i].ctx = &ctx;
if (pthread_create(&pid[i], NULL, thread_worker, &task_thread_p[i])) {
fprintf(stderr, "create thread failed\n");
exit(1);
}
}
while (!ctx.quit) {
expire_timer();
usleep(2500);
}
clear_timer();
for (i = 0; i < ctx.thread; i++) {
pthread_join(pid[i], NULL);
}
printf("all thread is closed\n");
return 0;
}
.PHONY: all clean
CCFLAG = -std=c++11
RM = rm
CC = cc
RMFLAGS = -fr
EXE = tw-timer
SRCS = tw-timer.c timewheel.c
OBJS = tw-timer.o timewheel.o
all: $(EXE)
$(EXE): $(OBJS)
$(CC) -o $@ $^ -lpthread
%.o: %.c timewheel.h spinlock.h
$(CC) -o $@ -c $<
clean:
$(RM) $(RMFLAGS) $(EXE) $(OBJS)
|