一、 简介
发布订阅类似于关注公众号,关注公众号就如同进行了订阅,当公众号发布新文章时,将消息推送给所有关注公众号的用户,用户就能看到新的文章。 redis的发布订阅类似,某个用户通过subscribe订阅某个channel,当某个用户通过publish对相应的channel发布消息,订阅了此channel的用户都会收到此消息。
源码文件pubsub.c,redis6.2.6
二、订阅消息(关注公众号)
订阅主要通过subscribe命令设置,redis服务器将使用subscribeCommand函数进行处理。
struct redisCommand redisCommandTable[] = {
...
{"subscribe",subscribeCommand,-2,
"pub-sub no-script ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
...
};
对于client一旦执行了此命令将会进入订阅模式,以后就只能执行SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING, RESET and QUIT 命令,其他命令都不能执行了。可以通过RESET命令进行退出此模式,因此那些希望每个命令都有响应的client不能执行此命令。
2.1 首先排除拒绝阻塞的client
对于有CLIENT_DENY_BLOCKING标志的client,这些client期望每个命令都有响应数据,所以不能执行这个命令,但是为了向后兼容,对MULTI命令做了特殊处理。
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
2.2 订阅
遍历每个channel,进行实际的订阅操作,并且回复client的订阅信息。
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
2.2.1 将channel加入到client的订阅
将订阅的channel插入到client的hash表pubsub_channels中,如果已经存在则添加失败,返回DICT_ERR,继续后续操作。
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
...
}
2.2.2 将channel加入到全局的channel中
当channel加入到client的订阅hash中成功,说明此channel是此client新订阅的,然后判断此channel是否存在于全局channel的hash中,如果不存在,则加入,而此全局hash的channel对象对应的值是一个链表,存放的是订阅当前channel的所有client。可以看到加到client的channel对象和全局hash中的channel对象是同一个,节省空间。
retval = 1;
incrRefCount(channel);
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
2.2.3 将client加入到channel的队列
订阅同一个channel的所有client都在一个双向链表中保存,当发布消息时,就能快速根据channel找到所有的client。
listAddNodeTail(clients,c);
2.2.4 通知client
将回复client使用的命令,订阅的channel,以及当前client以及订阅的channel总个数。
addReplyPubsubSubscribed(c,channel);
void addReplyPubsubSubscribed(client *c, robj *channel) {
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
}
int clientSubscriptionsCount(client *c) {
return dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns);
}
2.3 设置client发布订阅标志
c->flags |= CLIENT_PUBSUB;
2.3 实际操作
- 使用redis-cli进行连接
可以看出直接使用subscribe命令后,就阻塞了,一致等待读取channel的数据,无法继续操作。
127.0.0.1:6379> subscribe mychannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "mychannel"
3) (integer) 1
- 使用telnet登录
可以看到进入发布订阅模式的client只能执行那几个命令。
$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
subscribe mychannel mychannel2
*3
$9
subscribe
$9
mychannel
:1
*3
$9
subscribe
$10
mychannel2
:2
get name
-ERR Can't execute 'get': only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context
int processCommand(client *c) {
...
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand &&
c->cmd->proc != resetCommand) {
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
c->cmd->name);
return C_OK;
}
...
}
三、发布消息(公众号发布更新)
通过publish命令向某个channel写入数据, redis服务器将通过publishCommand函数进行处理,将这些数据发送给订阅了这个channel的client。
struct redisCommand redisCommandTable[] = {
...
{"publish",publishCommand,3,
"pub-sub ok-loading ok-stale fast may-replicate",
0,NULL,0,0,0,0,0,0},
...
};
3.1 对通道发送数据
void publishCommand(client *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
...
}
3.1.1 从全局hash中找到channel对象
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
listNode *ln;
listIter li;
de = dictFind(server.pubsub_channels,channel);
3.1.2 遍历client队列
如果找到channel对象,则从对象中获取到client链表,然后遍历链表,逐一发送消息。
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
...
}
}
3.1.3 逐一发送消息
...
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message);
receivers++;
}
...
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
if (msg) addReplyBulk(c,msg);
}
3.2 数据的传播
具体的还没有梳理清楚,后续再看。
...
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,PROPAGATE_REPL);
...
3.3 响应发布数据的client
响应发布数据的client当前有多少个订阅此channel的client接收了数据。
addReplyLongLong(c,receivers);
3.4 实际操作
四、取消订阅(取消公众号)
对于channel数据不再关心时,可以取消订阅。通过unsubscribe命令进行取消订阅,redis服务器通过unsubscribeCommand函数进行处理。
struct redisCommand redisCommandTable[] = {
...
{"unsubscribe",unsubscribeCommand,-1,
"pub-sub no-script ok-loading ok-stale",
0,NULL,0,0,0,0,0,0},
...
};
由处理函数可以看出,当没有指定取消的channel时,将取消所有的channel。
void unsubscribeCommand(client *c) {
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
4.1 从client中的hash中删除channel
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
...
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
...
4.2 从全局的链表中删除当前client
将当前client从全局的hash的channel的链表中删除此client。如果整个链表为空了,则将此hash项从全局hash表中删除。
retval = 1;
de = dictFind(server.pubsub_channels,channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
dictDelete(server.pubsub_channels,channel);
}
4.3 通知client
和订阅时通知client一样,只是返回的命令不同。
if (notify) addReplyPubsubUnsubscribed(c,channel);
void addReplyPubsubUnsubscribed(client *c, robj *channel) {
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.unsubscribebulk);
if (channel)
addReplyBulk(c,channel);
else
addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c));
}
4.4 实际操作
未指定取消的channel,则取消了所有的订阅。 指定删除某个channel, 可以通过返回看到已经从3个channel变成了2个channel。
五 、组织结构
client1和client2分别订阅了channel1, client2还订阅了channel2, 整个组织结构如下图所示。
|