IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 《Reids 设计与实现》第十七章 发布与订阅 -> 正文阅读

[大数据]《Reids 设计与实现》第十七章 发布与订阅

《Reids 设计与实现》第十七章 发布与订阅

一、简介

Redis 的发布与订阅功能由 PUBLISH、SUBSCRIBE、PSUBSCRIBE 等命令组成

通过执行 SUBSCRIBE 命令,客户端可以订阅一个或多个频道,从而成为这些频道的订阅者(subscriber):每当有其他客户端向被订阅的频道发送消息(message)时,频道的所有订阅者都会收到这条消息

举个例子,假设 A、B、C 三个客户端都执行了命令:

SUBSCRIBE “news.it”

那么这三个客户端就是 “news.it” 频道的订阅者,如图 18-1 所示

在这里插入图片描述

如果这时某个客户端执行命令

PUBLISH “news.it” "hello"

向 “news.it” 频道发送消息 “hello”,那么 “news.it” 的三个订阅者都将收到这条消息,如图 18-2 所示

在这里插入图片描述

除了订阅频道之外,客户端还可以通过执行 PSUBSCRIBE 命令订阅一个或多个模式,从而成为这些模式的订阅者:每当有其他客户端向某个频道发送消息时,消息不仅会被发送给这个频道的所有订阅者,它还会被发送给所有与这个频道相匹配的模式订阅者

在这里插入图片描述

举个例子,假设如图 18-3 所示:

  • 客户端 A 正在订阅频道 “news.it”
  • 客户端 B 正在订阅频道 “news.et”
  • 客户端 C 和客户端 D 正在订阅与 “news.it” 频道和 “news.et” 频道相匹配的模式 “news.[ie]t”

如果这时某个客户端执行命令

PUBLISH "news.it" "hello"

向 “news.it” 频道发送消息 “hello”,那么不仅正在订阅 “news.it” 频道的客户端 A 会收到消息,客户端 C 和客户端 D 也同样会收到消息,因为这两个客户端正在订阅匹配 “news.it” 频道的 “news.[ie]t” 模式,如图 18-4 所示

在这里插入图片描述

二、频道的订阅与退订

当一个客户端执行 SUBSCRIBE 命令订阅某个或某些频道的时候,这个客户端与被订阅频道之间就建立起了一种订阅关系

Redis 将所有频道的订阅关系都保存在服务器状态的 pubsub_channels 字典里面,这个字典的键时某个被订阅的频道,而键的值则是一个链表,链表里面记录了所有订阅这个频道的客户端

struct redisServer{
	//...
	//保存所有频道的订阅关系
	dict *pubsub_channels;
	//...
};

在这里插入图片描述

比如说,图 18-6 就展示了一个 pubsub_channels 字典示例,这个字典记录了以下信息:

  • client-1、client-2、client-3 三个客户端正在订阅 “news.it” 频道
  • 客户端 client-4 正在订阅 “news.sport” 频道
  • client-5 和 client-6 两个客户端正在订阅 “news.business” 频道

1.订阅频道

每当客户端执行 SUBSCRIBE 命令订阅某个或某些频道的时候,服务器都会将客户端与被订阅的频道在 pubsub_channels 字典中进行关联

根据频道是否已经有其他订阅者,关联操作分为两种情况执行:

  • 如果频道已经有其他订阅者,那么它在 pubsub_channels 字典中必然有相应的订阅者链表,程序唯一要做的就是将客户端添加到订阅者链表的末尾
  • 如果频道还未有任何订阅者,那么它必然不存在与 pubsub_channels 字典,程序首先要在 pubsub_channels 字典中为频道创建一个键,并将这个键的值设置为空链表,然后再将客户端添加到链表,成为链表的第一个元素

举个例子,假设服务器 pubsub_channels 字典的当前状态如图 18-6 所示,那么当客户端 client-10086 执行命令

SUBSCRIBE "news.sport" "news.movie"

在这里插入图片描述

之后,pubsub_channels 字典将更新至图 19-7 所示的状态,其中用虚线包围的是新添加的节点:

  • 更新后的 pubsub_channels 字典新增了 “news.movie” 键,该键对应的链表值只包含一个 client-20086 节点,表示目前只有 client-10086 一个客户端在订阅 “news.movie” 频道
  • 至于原本就已经有客户端在订阅的 “news.sport” 频道,client-10086 的节点放在了频道对应链表的末尾,排在 client-4 节点的后面

SUBSCRIBE 命令的实现可以用以下伪代码来描述:

def subscribe(*all_input_channels):
	#遍历输入的所有频道
	for channel in all_input_channels:
		
		#如果 channel 不存在于 pubsub_channels 字典(没有任何订阅者)
		#那么在字典中添加 channel 键,并设置它的值为空链表
		if channel not in server.pubsub_channels:
			server.pubsub_channels[channel] = []

		#将订阅者添加到频道所对应的链表的末尾
		server.pubsub_channels[channel].append(client)

2.退订频道

UNSUBSCRIBE 命令的行为和 SUBSCRIBE 命令的行为正好相反,当一个客户端退订某个或某些频道的时候,服务器将从 pubsub_channels 中解除客户端与退订频道之间的关联:

  • 程序会根据被退订频道的名字,在 pubsub_channels 字典中找到频道对应的订阅者链表,然后从订阅者链表中删除退订客户端的信息
  • 如果删除退订客户端之后,频道的订阅者链表变成了空链表,那么说明这个频道已经没有任何订阅者了,程序将从 pubsub_channels 字典中删除频道对应的键

在这里插入图片描述

举个例子,假设 pubsub_channels 的当前状态如图 18-8 所示,那么当客户端 client-10086 执行命令

UNSUBSCRIBE "news.sport" "news.movie"

之后,图中用虚线包围的两个节点将被删除:

  • 在 pubsub_channels 字典更新之后,client-10086 的信息已经从 “news.sport” 频道和 “news.movie” 频道的订阅者链表中被删除了
  • 另外,因为删除 client-10086 之后,频道 “news.movie” 已经没有任何订阅者,因此键 “news.movie” 也从字典中被删了

UNSUBSCRIBE 命令的实现可以用以下伪代码来描述:

def unsubscribe(*all_input_channels):
	#遍历要退订的所有频道
	for channel in all_input_channels:
		
		#在订阅者链表中删除退订的客户端
		server.pubsub_channels[channel].remove(client)

		#如果频道已经没有任何订阅者了(订阅者链表为空)
		#那么将频道从字典中删除
		if len(server.pubsub_channels[channel]) == 0:
			server.pubsub_channels.remove(channel)

三、模式的订阅与退订

前面说过,服务器将所有频道的订阅关系都保存在服务器状态的 pubsub_channels 属性里面,与此类似,服务器也将所有模式的订阅关系都保存在服务器状态的 pubsub_patterns 属性里面:

struct redisServer{
	//...
	//保存所有模式订阅关系
	list *pubsub_patterns;
	//...
};

pubsub_patterns 属性是一个链表,链表中的每个节点都包含一个 pubsub patern 结构,这个结构的 pattern 属性记录了被订阅的模式,而 client 属性则记录了订阅模式的客户端:

typedef struct pubsubPattern{
	//订阅模式的客户端
	redisClient %client;

	//被订阅的模式
	robj *pattern;
}pubsubPattern;

图 18-10 是一个 pubsubPattern 结构示例,它显示客户端 client-9 正在订阅模式 “news.*”

在这里插入图片描述

在这里插入图片描述

图 18-11 展示了一个 pubsub_patternts 链表示例,这个链表记录了以下信息:

  • 客户端 client-7 正在订阅模式 “music.*”
  • 客户端 client-8 正在订阅模式 “book.*”
  • 客户端 client-9 正在订阅模式 “news.*”

1.订阅模式

每当客户端执行 PSUBSCRIBE 命令订阅某个或某些模式的时候,服务器会对每隔被订阅的模式执行以下两个操作:

  1. 新建一个 pubsubPattern 结构,将结构的 pattern 属性设置为被订阅的模式,client 属性设置为订阅模式的客户端
  2. 将 pubsubPattern 结构添加到 pubsub_patterns 链表的表尾。举个例子,假设服务器中 pubsub_patterns 链表的当前状态如图 18-12 所示
    在这里插入图片描述

那么当客户端 client-9 执行命令

PSUBSCRIBE "news.*"

之后,pubsub-patterns 链表将更新至 18-13 所示的状态,其中用虚线包围的是新添加的 pubsubPattern 结构

在这里插入图片描述

PSUBSCRIBE 命令的实现原理可以用以下伪代码来描述:

def psubscribe(*all_input_pattern):
	#遍历输入的所有模式
	for pattern in all_input_patterns:
		
		#创建新的 pubsubPattern 结构
		#记录被订阅的模式,以及订阅模式的客户端
		pubsubPattern = create_new_pubsubPattern()
		pubsubPattern.client = client;
		pubsubPattern.pattern = pattern

		#将新的 pubsubPattern 追加到 pubsub_patterns 链表末尾
		server.pubsub_patterns.append(pubsubPattern)

2.退订模式

模式的退订命令 PUNSUBSCRIBE 是 PSUBSCRIBE 命令的反操作:当一个客户端退订某个或某些模式的时候,服务器将在 pubsub_patterns 链表中查找并删除那些 pattern 属性为被退订模式,并且 client 属性为执行退订命令的客户端的 pubsubPattern 结构

举个例子,假设服务器 pubsub_patterns 链表的当前状态如图 18-14 所示

在这里插入图片描述

那么当客户端 client-9 执行命令

PUNSUBSCRIBE "news.*"

之后,client 属性为 client-9,pattern 属性为 “news.*” 的 pubsubPattern 结构将被删除

PUNSUBSCRIBE 命令的实现原理可以用以下伪代码来描述:

def punsubscribe(*all_input_patterns):
	#遍历所有要退订的模式
	for pattern in all_input_patterns:
		
		#遍历 pubsub_patterns 链表中的所有 pubsubPattern 结构
		for pubsubPattern in server..pubsub_patterns:

		#如果当前客户端和 pubsubPattern 记录的客户端相同
		#并且要退订的模式也和 pubsubPattern 记录的模式相同
		if client == pubsubPattern.client and pattern == pubsubPattern.pattern:
			
			#那么将这个 pubsubPattern 从链表中删除
			server.pubsub_pattern.remove(pubsubPattern)

四、发送消息

当一个 Redis 客户端执行 PUBLISH <channel> <message> 命令将消息 message 发送给频道 channel 的时候,服务器需要执行以下两个动作:

  1. 将消息 message 发送给 channel 频道的所有订阅者
  2. 如果有一个或多个模式 pattern 与频道 channel 相匹配,那么将消息 message 发送给 pattern 模式的订阅者

1.将消息发送给频道订阅者

因为服务器状态中的 pubsub-channels 字典记录了所有频道的订阅关系,所以为了将消息发送给 channel 频道的所有订阅者,PUBLISH 命令要做的就是在 pubsub_channels 字典里找到频道 channel 的订阅者名单(一个链表),然后将消息发送给名单上的所有客户端。举个例子,假设服务器 pubsub_channels 字典当前的状态如图 18-16 所示

在这里插入图片描述

如果这时某个客户端执行命令

PUBLISH "news.it" "hello"

那么 PUBLISH 命令将在 pubsub_channels 字典中查找键 “news.it” 对应的链表值,并通过遍历链表将消息 “hello” 发送给 “news.it” 频道的三个订阅者:client-1、client-2、client-3

PUBLISH 命令将消息发送给频道订阅者的方法可以用以下伪代码来描述:

def channel_publish(channel, message):
	#如果 channel 键不存在于 pubsub_channels 字典中
	#那么说明 channel 频道没有任何订阅者
	#程序不做发送动作,直接返回
	if channel not in server.pubsub_channels:
		return

	#运行到这里,说明 channel 频道至少有一个订阅者
	#程序遍历 channel 频道的订阅者链表
	#将消息发送给所有订阅者
	for subscriber in server.pubsub_channels[channel]:
		send_message(subscriber, message)

2.将消息发送给模式订阅者

因为服务器状态中的 pubsub_patterns 链表记录了所有模式的订阅关系,所以为了将消息发送给所有与 channel 频道相匹配的模式的订阅者,PUBLISH 命令要做的就是遍历整个 pubsub_patterns 链表,查找那些与 channel 频道相匹配的模式,并将消息发送给订阅了这些模式的客户端

举个例子,假设 pubsub_patterns 链表的当前状态如图 18-17 所示

在这里插入图片描述

如果这时某个客户端执行命令

PUBLISH "news.it" "hello"

那么 PUBLISH 命令会首先将消息 “hello” 发送给 “news.it” 频道的所有订阅者,然后开始在 pubsub_patterns 链表中查找是否有被订阅的模式与 “news.it” 频道相匹配,结果发现 “enws.it” 频道和客户端 client-9 订阅的 “news.*” 频道匹配,于是命令将消息 “hello” 发送给客户端 client-9

PUBLISH ,命令将消息发送给模式订阅者的方法可以用以下伪代码来描述:

def pattern_publish(channel, message):
	#遍历所有模式订阅消息
	for pubsubPattern in server.pubsub_pattern:
	
		#如果频道和模式相匹配
		if match(channel, pubsubPattern.pattern):

			#那么将消息发送给订阅该模式的客户端
			send_message(pubsubPattern.client, message)

最后,PUBLISH 命令的实现可以用以下伪代码来描述:

def publish(channel, message):

	#将消息发送给 channel 频道的所有订阅者
	channel_publish(channel, message)

	#将消息发送给所有和 channel 频道相匹配的模式的订阅者
	pattern_publish(channel, message)

五、查看订阅信息

1.PUBSUB CHANNELS

PUBSUB CHANNELS [pattern] 子命令用于返回服务器当前被订阅的频道,其中 pattern 参数是可选的:

  • 如果不给定 pattern 参数,那么命令返回服务器当前被订阅的所有频道
  • 如果给定 pattern 参数,那么命令返回服务器当前被订阅的频道中那些与 pattern 模式相匹配的频道

这个子命令是通过服务器 pubsub_channels 字典的所有键(每个键都是一个被订阅的频道),然后记录并返回所有符合条件的频道来实现的,这个过程可以用以下伪代码来描述:

def pubsub_channels(pattern=None):

	#一个列表,用于记录所有符合条件的频道
	channel_list = []

	#遍历服务器中的所有频道
	#(也即是 pubsub_channels 字典的所有键)
	for channel in server.pubsub_channels:
	
		#当以下两个条件的任意一个满足时,将频道添加到链表里面:
		#1)用户没有指定 pattern 参数
		#2)用户指定了 pattern 参数,并且 channel 和 pattern 匹配
		if (pattern is None) or match(channel, pattern):
			channel_list.append(channel)

#向客户端返回频道列表
return channel_list

在这里插入图片描述

举个例子,对于图 18-18 所示的 pubsub_channels 字典来说,执行 PUBSUB CHANNELS 命令将返回服务器目前被订阅的四个频道:

redis>PUBSUB CHANNELS
1)"news.it"
2)"news.sport"
3)"news.business"
4)"news.movie"

另一方面,执行 PUBSUB CHANNELS “news.[is]*” 命令将返回 “news.it” 和 “news.sport” 两个频道,因为只有这两个频道和 “news.[is]*” 模式相匹配:

redis>PUBSUB CHANNELS "news.[is]"
1)"news.it"
2)"news.sport"

2.PUBSUB NUMSUB

PUBSUB NUMSUB [channel-1 channel-2 … chanel-n] 子命令接受任意多个频道作为输入参数,并返回这些频道的订阅者数量

这个子命令是通过在 pubsub_channels 字典中找到频道对应的订阅者链表,然后返回订阅者链表的长度来实现的(订阅者链表的长度就是频道订阅者的数量),这个过程可以用一下伪代码来描述:

def pubsub_numsub(*all_input_channels):
	#遍历输入的所有频道
	for channel in all_input_channels:

		#如果 pubsub+channels 字典中没有 channel 这个键
		#那么说明 channel 频道没有任何订阅者
		if channel not in server.pubsub_channels:
			
				#返回频道名
				reply_channel_name(channel)
				#订阅者数量为 0
				reply_subscribe_count(0)

		#如果 pubsub_channels 字典中存在 channel 键
		#那么说明 channel 频道至少有一个订阅者
		else:
			#返回频道名
			reply_channel_name(channel)
			#订阅者链表的长度就是订阅者数量
			reply_subscribe_count(len(server.pubsub_channels[channel]))

在这里插入图片描述

举个例子,对于图 18-19 所示的 pubsub_channels 字典来说,对字典中的四个频道执行 PUBSUB NUMSUB 命令将获得以下回复:

redis>PUBSUB NUMSUB news.it news.sport news.business news.movie
1)"news.it"
2)"3"
3)"news.sport"
4)"2"
5)"news.business"
6)"2"
7)"news.movie"
8)"1"

六、重点回顾

  • 服务器状态在 pubsub_channels 字典保存了所有频道的订阅关系:SUBSCRIBE 命令负责将客户端和被订阅的频道关联到这个字典里面,而 UNSUBSCRIBE 命令则负责解除客户端和被退订频道之间的关联
  • 服务器状态在 pubsub_patterns 链表保存了所有模式的订阅关系:PSUBSCRIBE 命令负责移除客户端和被退订模式在链表中的记录
  • PUBLISH 命令通过访问 pubsub_channels 自字典来向频道的所有订阅者发送信息,通过访问 pubsub_patterns 链表来向所有匹配频道的模式的订阅发送消息
  • PUBSUB 命令的子命令都是通过读取 pubsub_channels 字典和 pubusb_patterns 链表中的信息来实现的
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-26 22:16:11  更:2021-12-26 22:17:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 3:34:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码