IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> redis之关注公众号(发布订阅) -> 正文阅读

[大数据]redis之关注公众号(发布订阅)

一、 简介

发布订阅类似于关注公众号,关注公众号就如同进行了订阅,当公众号发布新文章时,将消息推送给所有关注公众号的用户,用户就能看到新的文章。
redis的发布订阅类似,某个用户通过subscribe订阅某个channel,当某个用户通过publish对相应的channel发布消息,订阅了此channel的用户都会收到此消息。

源码文件pubsub.c,redis6.2.6

二、订阅消息(关注公众号)

订阅主要通过subscribe命令设置,redis服务器将使用subscribeCommand函数进行处理。

struct redisCommand redisCommandTable[] = {
...
 {"subscribe",subscribeCommand,-2,
     "pub-sub no-script ok-loading ok-stale",
     0,NULL,0,0,0,0,0,0},
 ...
 };

对于client一旦执行了此命令将会进入订阅模式,以后就只能执行SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING, RESET and QUIT 命令,其他命令都不能执行了。可以通过RESET命令进行退出此模式,因此那些希望每个命令都有响应的client不能执行此命令。

2.1 首先排除拒绝阻塞的client

对于有CLIENT_DENY_BLOCKING标志的client,这些client期望每个命令都有响应数据,所以不能执行这个命令,但是为了向后兼容,对MULTI命令做了特殊处理。

if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
        /**
         * A client that has CLIENT_DENY_BLOCKING flag on
         * expect a reply per command and so can not execute subscribe.
         *
         * Notice that we have a special treatment for multi because of
         * backword compatibility
         */
        addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
        return;
    }

2.2 订阅

遍历每个channel,进行实际的订阅操作,并且回复client的订阅信息。

 for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);

2.2.1 将channel加入到client的订阅

将订阅的channel插入到client的hash表pubsub_channels中,如果已经存在则添加失败,返回DICT_ERR,继续后续操作。

/* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
    ...
    }

2.2.2 将channel加入到全局的channel中

当channel加入到client的订阅hash中成功,说明此channel是此client新订阅的,然后判断此channel是否存在于全局channel的hash中,如果不存在,则加入,而此全局hash的channel对象对应的值是一个链表,存放的是订阅当前channel的所有client。可以看到加到client的channel对象和全局hash中的channel对象是同一个,节省空间。

 		retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }

2.2.3 将client加入到channel的队列

订阅同一个channel的所有client都在一个双向链表中保存,当发布消息时,就能快速根据channel找到所有的client。

 listAddNodeTail(clients,c);

2.2.4 通知client

将回复client使用的命令,订阅的channel,以及当前client以及订阅的channel总个数。

 /* Notify the client */
    addReplyPubsubSubscribed(c,channel);
/* Send the pubsub subscription notification to the client. */
void addReplyPubsubSubscribed(client *c, robj *channel) {
    if (c->resp == 2)
        addReply(c,shared.mbulkhdr[3]);
    else
        addReplyPushLen(c,3);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
}
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
    return dictSize(c->pubsub_channels)+
           listLength(c->pubsub_patterns);
}

2.3 设置client发布订阅标志

c->flags |= CLIENT_PUBSUB;

2.3 实际操作

  • 使用redis-cli进行连接
    可以看出直接使用subscribe命令后,就阻塞了,一致等待读取channel的数据,无法继续操作。
# ./redis-cli 
127.0.0.1:6379> subscribe mychannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "mychannel"
3) (integer) 1

  • 使用telnet登录
    可以看到进入发布订阅模式的client只能执行那几个命令。
$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
subscribe mychannel mychannel2
*3
$9
subscribe
$9
mychannel
:1
*3
$9
subscribe
$10
mychannel2
:2
get name
-ERR Can't execute 'get': only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context

int processCommand(client *c) {
...
/* Only allow a subset of commands in the context of Pub/Sub if the
     * connection is in RESP2 mode. With RESP3 there are no limits. */
    if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand &&
        c->cmd->proc != resetCommand) {
        rejectCommandFormat(c,
            "Can't execute '%s': only (P)SUBSCRIBE / "
            "(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
            c->cmd->name);
        return C_OK;
    }
    ...
}

三、发布消息(公众号发布更新)

通过publish命令向某个channel写入数据, redis服务器将通过publishCommand函数进行处理,将这些数据发送给订阅了这个channel的client。

struct redisCommand redisCommandTable[] = {
	...
 	{"publish",publishCommand,3,
     "pub-sub ok-loading ok-stale fast may-replicate",
     0,NULL,0,0,0,0,0,0},
     ...
 };

3.1 对通道发送数据

/* PUBLISH <channel> <message> */
void publishCommand(client *c) {
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
   	...
}

3.1.1 从全局hash中找到channel对象

/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    dictIterator *di;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);

3.1.2 遍历client队列

如果找到channel对象,则从对象中获取到client链表,然后遍历链表,逐一发送消息。

if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
          ...
        }
    }

3.1.3 逐一发送消息

...		
while ((ln = listNext(&li)) != NULL) {
	client *c = ln->value;
	addReplyPubsubMessage(c,channel,message);
	receivers++;
}
...
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
    if (c->resp == 2)
        addReply(c,shared.mbulkhdr[3]);
    else
        addReplyPushLen(c,3);
    addReply(c,shared.messagebulk);
    addReplyBulk(c,channel);
    if (msg) addReplyBulk(c,msg);
}

3.2 数据的传播

具体的还没有梳理清楚,后续再看。

...
 if (server.cluster_enabled)
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        forceCommandPropagation(c,PROPAGATE_REPL);
...

3.3 响应发布数据的client

响应发布数据的client当前有多少个订阅此channel的client接收了数据。

addReplyLongLong(c,receivers);

3.4 实际操作

在这里插入图片描述

四、取消订阅(取消公众号)

对于channel数据不再关心时,可以取消订阅。通过unsubscribe命令进行取消订阅,redis服务器通过unsubscribeCommand函数进行处理。

struct redisCommand redisCommandTable[] = {
	...
	{"unsubscribe",unsubscribeCommand,-1,
     "pub-sub no-script ok-loading ok-stale",
     0,NULL,0,0,0,0,0,0},
     ...
};

由处理函数可以看出,当没有指定取消的channel时,将取消所有的channel。

/* UNSUBSCRIBE [channel [channel ...]] */
void unsubscribeCommand(client *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;

        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
    if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

4.1 从client中的hash中删除channel

int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
...
	if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
	...

4.2 从全局的链表中删除当前client

将当前client从全局的hash的channel的链表中删除此client。如果整个链表为空了,则将此hash项从全局hash表中删除。

	retval = 1;
       /* Remove the client from the channel -> clients list hash table */
       de = dictFind(server.pubsub_channels,channel);
       serverAssertWithInfo(c,NULL,de != NULL);
       clients = dictGetVal(de);
       ln = listSearchKey(clients,c);
       serverAssertWithInfo(c,NULL,ln != NULL);
       listDelNode(clients,ln);
       if (listLength(clients) == 0) {
           /* Free the list and associated hash entry at all if this was
            * the latest client, so that it will be possible to abuse
            * Redis PUBSUB creating millions of channels. */
           dictDelete(server.pubsub_channels,channel);
       }

4.3 通知client

和订阅时通知client一样,只是返回的命令不同。

 /* Notify the client */
    if (notify) addReplyPubsubUnsubscribed(c,channel);
void addReplyPubsubUnsubscribed(client *c, robj *channel) {
    if (c->resp == 2)
        addReply(c,shared.mbulkhdr[3]);
    else
        addReplyPushLen(c,3);
    addReply(c,shared.unsubscribebulk);
    if (channel)
        addReplyBulk(c,channel);
    else
        addReplyNull(c);
    addReplyLongLong(c,clientSubscriptionsCount(c));
}

4.4 实际操作

未指定取消的channel,则取消了所有的订阅。
在这里插入图片描述
指定删除某个channel, 可以通过返回看到已经从3个channel变成了2个channel。
在这里插入图片描述

五 、组织结构

client1和client2分别订阅了channel1, client2还订阅了channel2, 整个组织结构如下图所示。
请添加图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-03 13:06:19  更:2021-12-03 13:07:18 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:50:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码