IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> redis之辛勤的哨兵(二)时刻关注你 -> 正文阅读

[大数据]redis之辛勤的哨兵(二)时刻关注你

哨兵初始化后,配置加载成功后,整个配置结构图如下:
请添加图片描述
整个哨兵由一个全局的sentinelState结构进行存储组织,其中master字典存储的是此哨兵需要监控的主服务器。
而每个master又由sentinelRedisInstance结构体进行存储表示,其中有几个关键的结构体,比如instanceLink等。

一、如何关注你

在启动时,通过加载配置,知道了需要监控的master信息。那如何监控master呢?
在redis的定时任务处理函数中, 将对监控的master发起请求。

1.1 定时任务触发

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
 /* Run the Sentinel timer if we are in sentinel mode. */
    if (server.sentinel_mode) sentinelTimer();
...
}
void sentinelTimer(void) {
...
 sentinelHandleDictOfRedisInstances(sentinel.masters);
...
}

1.2 遍历所有监控的master

遍历sentinel.masters中的master,逐一进行处理

void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
 	...

    /* There are a number of things we need to perform against every master. */
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
		//进行处理
       ...
    }
    ...
    dictReleaseIterator(di);
}

1.3 创建master的链接

对于每一个监控的master,将调用sentinelHandleRedisInstance函数进行处理,
sentinelHandleRedisInstance函数中将调用sentinelReconnectInstance函数进行处理重连情况。

sentinelHandleDictOfRedisInstances() ->
	sentinelHandleRedisInstance(ri); ->
		sentinelReconnectInstance()

对于重连函数,首先通过ri->link->disconnected判断链接是否断开,而对于刚启动的哨兵,还没有链接,所有默认链接是断开的,需要进行重连。

instanceLink *createInstanceLink(void) {
    instanceLink *link = zmalloc(sizeof(*link));
	...
    link->disconnected = 1;
	...
	return link;
}

1.3.1 异步建立TCP链接

void sentinelReconnectInstance(sentinelRedisInstance *ri) {
	//1. 判断链接是否断开,断开才进行重连,否则直接返回
    if (ri->link->disconnected == 0) return;
    if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
    instanceLink *link = ri->link;
    mstime_t now = mstime();

	//2. 为了减少重连的频率,重连间隔必须大于等于1000毫秒
    if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
    ri->link->last_reconn_time = now;

    /* Commands connection. */
    if (link->cc == NULL) {
        link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
        ...
    }
  	...
}

1.3.2 设置链接回调函数

当链接建立无错误发生时,设置链接建立成功后的回调函数,以及链接断开的回调函数。

link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
redisAeAttach(server.el,link->cc);
redisAsyncSetConnectCallback(link->cc,
        sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->cc,
        sentinelDisconnectCallback);

回调函数如下所示

void instanceLinkConnectionError(const redisAsyncContext *c) {
    instanceLink *link = c->data;
    int pubsub;

    if (!link) return;

    pubsub = (link->pc == c);
    if (pubsub)
        link->pc = NULL;
    else
        link->cc = NULL;
    link->disconnected = 1;
}

/* Hiredis connection established / disconnected callbacks. We need them
 * just to cleanup our link state. */
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
    if (status != C_OK) instanceLinkConnectionError(c);
}

void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
    UNUSED(status);
    instanceLinkConnectionError(c);
}

1.3.3 发送认证信息

如果监控的实例需要密码认证,则需要使用AUTH命令进行认证。而对于sentinelRedisInstance结构,可以表示master,slave,甚至是其他哨兵,所以根据不同的flags判断不同的类型,从不同的地方获取密码进行认证,并且如果有acl,则还需要指定用户名。

 sentinelSendAuthIfNeeded(ri,link->cc);
void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
    char *auth_pass = NULL;
    char *auth_user = NULL;

    if (ri->flags & SRI_MASTER) {
        auth_pass = ri->auth_pass;
        auth_user = ri->auth_user;
    } else if (ri->flags & SRI_SLAVE) {
        auth_pass = ri->master->auth_pass;
        auth_user = ri->master->auth_user;
    } else if (ri->flags & SRI_SENTINEL) {
        /* If sentinel_auth_user is NULL, AUTH will use default user
           with sentinel_auth_pass to authenticate */
        if (sentinel.sentinel_auth_pass) {
            auth_pass = sentinel.sentinel_auth_pass;
            auth_user = sentinel.sentinel_auth_user;
        } else {
            /* Compatibility with old configs. requirepass is used
             * for both incoming and outgoing authentication. */
            auth_pass = server.requirepass;
            auth_user = NULL;
        }
    }

    if (auth_pass && auth_user == NULL) {
        if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
            sentinelInstanceMapCommand(ri,"AUTH"),
            auth_pass) == C_OK) ri->link->pending_commands++;
    } else if (auth_pass && auth_user) {
        /* If we also have an username, use the ACL-style AUTH command
         * with two arguments, username and password. */
        if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s %s",
            sentinelInstanceMapCommand(ri,"AUTH"),
            auth_user, auth_pass) == C_OK) ri->link->pending_commands++;
    }
}

1.3.4 设置哨兵的名字

告诉master,哨兵自己的名字。相对于master,哨兵就是master的一个client,所以通过设置client名字就可以告诉master哨兵自己的名字。
这样就可以通过访问master的client列表,知道哪些哨兵链接到此master。
sentinel-<first_8_chars_of_runid>-<connection_type>
此处的connection_type是cmd

sentinelSetClientName(ri,link->cc,"cmd");
void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
    char name[64];

    snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
    if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
        "%s SETNAME %s",
        sentinelInstanceMapCommand(ri,"CLIENT"),
        name) == C_OK)
    {
        ri->link->pending_commands++;
    }
}

1.3.5 发送异步PING命令

重连完成后,发送一个ping命令,其中sentinelPingReplyCallback函数为处理响应数据的回调函数。

/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);
int sentinelSendPing(sentinelRedisInstance *ri) {
    int retval = redisAsyncCommand(ri->link->cc,
        sentinelPingReplyCallback, ri, "%s",
        sentinelInstanceMapCommand(ri,"PING"));
    if (retval == C_OK) {
        ri->link->pending_commands++;
        ri->link->last_ping_time = mstime();
        /* We update the active ping time only if we received the pong for
         * the previous ping, otherwise we are technically waiting since the
         * first ping that did not receive a reply. */
        if (ri->link->act_ping_time == 0)
            ri->link->act_ping_time = ri->link->last_ping_time;
        return 1;
    } else {
        return 0;
    }
}

1.3.6 清除重连标志

 /* Clear the disconnected status only if we have both the connections
     * (or just the commands connection if this is a sentinel instance). */
    if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
        link->disconnected = 0;

到这里,哨兵与被监控的master已经建立起链接

二、如何判断你丢失了

哨兵和被监控的master链接建立后,则需要开始监控master是否正常。
那么如何判断master不正常了呢?

哨兵不间断的给master发送PING命令,然后接收PONG响应,当某个PING命令长时间没有响应时,根据判断规则进行推断是否下线。
主要的函数sentinelCheckSubjectivelyDown

2.1 计算发送PING后到现在的时间差

typedef struct instanceLink {
...
	mstime_t last_avail_time; /* Last time the instance replied to ping with
	                                 a reply we consider valid. */
	mstime_t act_ping_time;   /* Time at which the last pending ping (no pong
                                 received after it) was sent. This field is
                                 set to 0 when a pong is received, and set again
                                 to the current time if the value is 0 and a new
                                 ping is sent. */
...
}instanceLink;

根据PING命令的回调处理函数可以看出,和上面的注释一致。当ping收到PONG回复时,act_ping_time为0,last_avail_time为当前处理时间。

void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    ...
    if (r->type == REDIS_REPLY_STATUS ||
        r->type == REDIS_REPLY_ERROR) {
        /* Update the "instance available" field only if this is an
         * acceptable reply. */
        if (strncmp(r->str,"PONG",4) == 0 ||
            strncmp(r->str,"LOADING",7) == 0 ||
            strncmp(r->str,"MASTERDOWN",10) == 0)
        {
            link->last_avail_time = mstime();
            link->act_ping_time = 0; /* Flag the pong as received. */
        } 
	...
}

如果上次发送PING还没有接收到PONG响应,则计算时间差,如果链接已经断开,则计算最后一次接收到响应到现在的时间差。

/* Is this instance down from our point of view? */
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
    mstime_t elapsed = 0;

    if (ri->link->act_ping_time)
        elapsed = mstime() - ri->link->act_ping_time;
    else if (ri->link->disconnected)
        elapsed = mstime() - ri->link->last_avail_time;
  	...
}

2.2 关闭低活动的链接

如果链接建立,并且建立的时间大于15秒,并且ping还未收到响应,并且PING命令已经发送超过down_after_period/2, 并且最后一次响应时间超过down_after_period/2, 则关闭此链接,等下次重连。

#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
 /* Check if we are in need for a reconnection of one of the
     * links, because we are detecting low activity.
     *
     * 1) Check if the command link seems connected, was connected not less
     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
     *    pending ping for more than half the timeout. */
    if (ri->link->cc &&
        (mstime() - ri->link->cc_conn_time) >
        SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
        ri->link->act_ping_time != 0 && /* There is a pending ping... */
        /* The pending ping is delayed, and we did not receive
         * error replies as well. */
        (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
        (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
    {
        instanceLinkCloseConnection(ri->link,ri->link->cc);
    }
/* Disconnect a hiredis connection in the context of an instance link. */
void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
    if (c == NULL) return;

    if (link->cc == c) {
        link->cc = NULL;
        link->pending_commands = 0;
    }
    if (link->pc == c) link->pc = NULL;
    c->data = NULL;
    link->disconnected = 1;
    redisAsyncFree(c);
}

2.3 判断下线

下面两个条件,满足一条都认为下线。

  1. 当间隔时间超过了down_after_period(即无响应)
  2. 当前监控的是master,但是通过INFO命令获取到的信息中此实例是replica, 并且超过down_after_period+SENTINEL_INFO_PERIOD*2的时间都没有改变过角色。
#define SENTINEL_INFO_PERIOD 10000
/* Update the SDOWN flag. We believe the instance is SDOWN if:
     *
     * 1) It is not replying.
     * 2) We believe it is a master, it reports to be a slave for enough time
     *    to meet the down_after_period, plus enough time to get two times
     *    INFO report from the instance. */
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
    {
        /* Is subjectively down */
        if ((ri->flags & SRI_S_DOWN) == 0) {
            sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    }

其中重要的一个指标是down_after_period, 是如何设置的呢?

  1. 默认值为30秒
#define SENTINEL_DEFAULT_DOWN_AFTER 30000
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
...
 ri->down_after_period = master ? master->down_after_period :
                            SENTINEL_DEFAULT_DOWN_AFTER;
...
}
  1. 通过配置修改默认值
#sentinel.conf
sentinel down-after-milliseconds <master-name> <milliseconds>
const char *sentinelHandleConfiguration(char **argv, int argc) {
...
	else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
	        /* down-after-milliseconds <name> <milliseconds> */
	        ri = sentinelGetMasterByName(argv[1]);
	        if (!ri) return "No such master with specified name.";
	        ri->down_after_period = atoi(argv[2]);
	        if (ri->down_after_period <= 0)
	            return "negative or zero time parameter.";
	        sentinelPropagateDownAfterPeriod(ri);
	    } 
...
}

因此不能过大或者过小,如果过大,则检测出异常时间过长;如果过小,则会频繁的检测到异常,很可能是误报。

三、多久看一次你

哨兵在不间断的发送PING命令,那多久发一次呢?

3.1 定时任务调度

在哨兵启动过程中,初始化时,对于定时任务是1毫秒触发一次。

void initServer(void) {
...
    /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
...
}

定时任务是一个双向链表,但是链表中的每个任务时间并未按照when进行排序,每个任务直接使用的头插法插入到双向链表头位置。
请添加图片描述
执行定时任务,遍历定时任务链表(可能因为每次都需要遍历所有的节点,所以没有按照时间排序,而且任务个数很少,没必要排序)

  • 如果任务节点标识为删除,则删除节点
  • 如果节点时间满足when小于等于now,则执行
  • 根据执行返回值进行设置任务状态,如果是还需要再次执行的任务,则处理函数返回下一次执行的间隔时间,否则返回-1,返回-1的任务被标识为删除,下一轮将被删除。
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    monotime now = getMonotonicUs();
    while(te) {
        long long id;

        /* Remove events scheduled for deletion. */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            /* If a reference exists for this timer event,
             * don't free it. This is currently incremented
             * for recursive timerProc calls */
            if (te->refcount) {
                te = next;
                continue;
            }
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc) {
                te->finalizerProc(eventLoop, te->clientData);
                now = getMonotonicUs();
            }
            zfree(te);
            te = next;
            continue;
        }

        /* Make sure we don't process time events created by time events in
         * this iteration. Note that this check is currently useless: we always
         * add new timers on the head, however if we change the implementation
         * detail, this check may be useful again: we keep it here for future
         * defense. */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }

        if (te->when <= now) {
            int retval;

            id = te->id;
            te->refcount++;
            retval = te->timeProc(eventLoop, id, te->clientData);
            te->refcount--;
            processed++;
            now = getMonotonicUs();
            if (retval != AE_NOMORE) {
                te->when = now + retval * 1000;
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

3.2 发送PING

每1毫秒都调度一次哨兵处理函数。

serverCron()->
	sentinelTimer() ->
		sentinelHandleDictOfRedisInstances() ->
			sentinelHandleRedisInstance() ->
				sentinelSendPeriodicCommands(ri);
#define SENTINEL_INFO_PERIOD 10000
#define SENTINEL_PING_PERIOD 1000

#define SENTINEL_MAX_PENDING_COMMANDS 100
  1. 对于链接断开的,不发送直接返回
  2. 发送了100个命令还没有响应的链接,不在发送数据,直接返回
  3. 计算ping的间隔时间ping_period
  4. 判断是否需要发送PING
    • 上一次的响应PONG到现在的时间间隔是否大于ping_period, 大于则发送
    • 上一次PING的时间到现在的时间间隔是否大于ping_period/2,大于则发送
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    /* Return ASAP if we have already a PING or INFO already pending, or
     * in the case the instance is not properly connected. */
    if (ri->link->disconnected) return;

    /* For INFO, PING, PUBLISH that are not critical commands to send we
     * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
     * want to use a lot of memory just because a link is not working
     * properly (note that anyway there is a redundant protection about this,
     * that is, the link will be disconnected and reconnected if a long
     * timeout condition is detected. */
    if (ri->link->pending_commands >=
        SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

  	...

    /* We ping instances every time the last received pong is older than
     * the configured 'down-after-milliseconds' time, but every second
     * anyway if 'down-after-milliseconds' is greater than 1 second. */
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

 ...

    /* Send PING to all the three kinds of instances. */
    if ((now - ri->link->last_pong_time) > ping_period &&
               (now - ri->link->last_ping_time) > ping_period/2) {
        sentinelSendPing(ri);
    }
...
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-13 12:53:31  更:2021-12-13 12:54:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 6:00:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码