参考文章: Redis发布订阅模式(publish/subscribe)。
一 订阅发布
1 概念
- 1)为了支持消息的多播机制,redis 引入了发布订阅模块。Redis发布/订阅(Pub/Sub)是一种通信机制,将数据推到某个信息管道中,其他客户端可通过订阅这些管道来获取推送信息,以此用于消息的传输。
- 2)发布者发布的消息分到不同的频道,不需要知道什么样的订阅者者订阅。订阅者对一个或多个频道感兴趣,只需要接收感兴趣的消息,不需要知道什么样的发布者发布。主要目的是解除消息的发布者与订阅者之间的耦合关系。
- 3)发布者和订阅者都是Redis客户端,频道则是服务器端。
- 4)由三部分组成:发布者(Publisher)、频道(Channel)、订阅者(Subscriber)。
2 原理
2.1 底层结构原理 Redis通过PUBLISH、PUBSUB、SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE和PUNSUBSCRIBE等命令实现发布和订阅功能。
在Redis底层结构中,客户端和频道的订阅关系是通过一个字典加链表的结构保存的,如下图:
在 Redis 的底层结构中,redis服务器中定义了一个pubsub_channels字典,用于保存所有频道的订阅关系,在这个字典中,key为所有频道名称,value结构是一个链表,其中存放的是所有订阅这个频道的订阅者客户端。subscribe命令的实质即为在key中添加value的订阅链。
若频道首次被订阅说明在字典中并不存在该渠道的信息,那么程序首先要新建一个对应的 key,并且要赋值一个空链表,然后将对应的客户端加入到链表中。此时链表只有一个元素。若该渠道已经被其他客户端订阅过:这个时候就直接找到key值对应的value客户端信息添加到链表的末尾即可。
2.2 消费者消费消息原理 生产者生产一次消息,由redis负责将消息复制到多个消息队列中,每个消息队列由相应的消费者进行消费。 它是分布式系统中常用的一种解耦方式,用于将多个消费者的逻辑进行拆分。多个消费者的逻辑就可以放到不同的子系统中完成。
看下图,此时m1队列可以认为是上面结构原理的频道,多个subscriber及其内部的队列相当于上面结构原理的客户端链表。
3 订阅时服务器推送的消息格式
所有订阅接收的消息均为由三个元素组成的多块响应,其中第一个元素是消息类型,有四种类型:
-
1)subscribe:该类型表示成功订阅到频道响应,第一个元素是消息类型,第二个元素为订阅的频道名称,第三个元素为已订阅的频道数量,例: -
2)unsubscribe:该类型表示成功取消订阅到的频道响应,第一个元素是消息类型,第二个元素为订阅的频道名称,第三个元素为已订阅的频道数量,例: -
3)message:该类型表示订阅者客户端接收到其他客户端发出的发布命令结果,第一个元素是消息类型,第二个元素表示来源频道的名称,第三个元素是实际的消息内容,例: 首先在一个redis客户端连接随便订阅一到多个通道,我这里订阅两个。 然后开启一个新的客户端连接。用于往上面的某个通道发布消息。此时原来的客户端即可看到发布的消息。 -
4)pmessage:这个类型是使用订阅模式频道时,redis返回推送消息的类型。以下面的例子的截图为例,第一行元素是消息类型,第二个是订阅模式,第三个是具体的订阅频道,第四个是时基的消息内容。和message相比,只多了第二行的订阅模式的信息返回。
4 发布订阅命令
4.1 发布订阅命令语法
1. 订阅频道 SUBSCRIBE命令为订阅频道,返回值如上面的消息格式所述。
SUBSCRIBE channel [channel ...]
2. 订阅模式频道 PSUBSCRIBE是订阅给定的模式(patterns)。
PSUBSCRIBE pattern [pattern ...]
例如:
h?llo subscribes to hello, hallo and hxllo
h*llo subscribes to hllo and heeeello
h[ae]llo subscribes to hello and hallo, but not hillo
如果想输入普通的字符,可以在前面添加\
3. 取消订阅频道 指示客户端退订给定的频道,若没有指定频道,则退订所有频道。
如果没有频道被指定,即,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。 在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
UNSUBSCRIBE [channel [channel ...]]
4. 取消订阅模式频道
指示客户端退订指定模式,若果没有提供模式则退出所有模式。
如果没有模式被指定,即一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。 在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
PUNSUBSCRIBE [pattern [pattern ...]]
5. 发布具体频道或模式频道的内容 将信息 message 发送到指定的频道channel。
返回值integer-reply: 收到消息的客户端数量。
PUBLISH channel message
6. pubsub PUBSUB 是自省命令,能够检测PUB/SUB子系统的状态。它由分别详细描述的子命令组成。 返回值array-reply: 活跃的信道列表,或者符合指定模式的信道。
PUBSUB subcommand [argument [argument ...]]
4.2 发布订阅命令例子
简单的发布订阅命令就不列举了,主要使用订阅模式频道以及发布模式频道来举例。 例子1:
PSUBSCRIBE new.*
publish new.showbiz 'c++ c go'
结果如下:
例子2:多个客户端订阅,其中一个客户端订阅方式为订阅频道,另一个客户端订阅方式为订阅模式频道。
这个例子想说明的是:当不同的订阅客户端使用 订阅某种模式 或者 符合该模式的具体某个频道 时,那么这些客户端都会接收到发布者推送的信息,但是两次接收的信息格式不同,一个为message类型,另一个为pmessage类型,消息内容一致。 例子和上面例子1差不多。
PSUBSCRIBE news.*
subscribe news.a news.b news.c
publish news.a 'c++ c go'
下面看到,两个客户端都能收到发布的消息。
例子3: 这个想说明的是pubsub命令的简单使用。 下面用到两个客户端,直接看图操作即可。
关于更详细的发布订阅命令说明,请参考中文手册:http://redis.cn/commands.html。想要看什么命令,直接在 “直接搜索” 中输入命令即可。
5 注意
发布订阅功能一般要区别命令连接重新开启一个连接,因为命令连接严格遵循请求回应模式。 而pub/sub能收到redis主动推送的内容,所以实际项目中如果支持pub/sub的话,需要另开一条连接用于处理发布订阅。
6 缺点
发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去。这个传递可能发生以下情况:
- 1)假如没有消费者,消息直接丢弃。
- 2)假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失了。
- 3)若订阅者订阅了频道,但自己读取消息的速度很慢的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
- 4)另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃。
总结上面例子的缺点:
- 1)redis无法对消息持久化存储,消息一旦被发送,如果没有订阅者接收,数据会丢失。(客户端原因)
- 2)若订阅者订阅了频道,但自己读取消息的速度很慢的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。(客户端原因)
- 3)另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃。(服务器原因)
二 stream
1 stream的介绍
- 1)stream支持多播的可持久化消息队列。
- 2)一个消息链表将加入的消息都串起来,每个消息都有一个唯一的消息ID和对应的内容。消息都是持久化的,redis 重启后,内容还在。
- 3)每个 stream 对象通过一个 key 来唯一索引。每个 stream 都可以挂多个消费组(consumer group),每个消费组会有个游标 last_delivered_id 在 stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。
- 4)stream 在第一次使用 xadd 命令后自动创建,而消费组不会自动创建,需要通过命令 xgroup create 进行创建,并且需要指定从 stream 的某个消息 ID 开始消费。
- 5)每个消费组都是相互独立的,互相不受影响;也就是同一份 stream 内部的消息会被每个消费组都消费到。
- 6)同一个消费组可以挂接多个消费者,这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标往前移动。
- 7)消费者内部会有一个状态变量 pending_ids,它记录了当前已经被客户端读取,但是还没有 ack 的消息。当客户端 ack 一条消息后,pending_ids 将会删除该消息 ID。它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没有被处理。
上面几点关键是看懂stream、消费组、消费者这三者的作用。stream是用于持久化消息队列即存储消息的。消费组是独立的,不同消费组互不影响。每个消费组包含多个消费者,这些消费者之间是竞争关系。
例如stream、消费组、消费者三者作用的示例图:
2 基本命令
2.1 XADD、XDEL、XRANGE、XLEN、DEL
XADD key ID field string [field string ...]
XDEL key ID [ID ...]
XRANGE key start end [COUNT count]
XLEN key
DEL key [key ...]
2.2 XREAD、XGROUP、XREADGROUP、XACK
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XACK key group ID [ID ...]
2.3 演示上面的基本命令
例如添加消息后,再里面读取消息:
192.168.1.9:6379> XADD stream * message "hello world"
"1646747759983-0"
192.168.1.9:6379> XGROUP CREATE stream g1 0-0
OK
192.168.1.9:6379> XLEN stream
(integer) 1
192.168.1.9:6379> XRANGE stream - +
1) 1) "1646747759983-0"
2) 1) "message"
2) "hello world"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 0
1) 1) "stream"
2) (empty array)
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 1
1) 1) "stream"
2) (empty array)
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 1111111111111
1) 1) "stream"
2) (empty array)
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 11111111111111
1) 1) "stream"
2) (empty array)
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream >
1) 1) "stream"
2) 1) 1) "1646747759983-0"
2) 1) "message"
2) "hello world"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream -2
(error) ERR Invalid stream ID specified as stream command argument
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 2
1) 1) "stream"
2) 1) 1) "1646747759983-0"
2) 1) "message"
2) "hello world"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 1111111111111
1) 1) "stream"
2) 1) 1) "1646747759983-0"
2) 1) "message"
2) "hello world"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 11111111111111
1) 1) "stream"
2) (empty array)
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 111111111111
1) 1) "stream"
2) 1) 1) "1646747759983-0"
2) 1) "message"
2) "hello world"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream >
(nil)
192.168.1.9:6379> XACK stream g1 1646747759983-0
(integer) 1
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 0
1) 1) "stream"
2) (empty array)
192.168.1.9:6379>
192.168.1.9:6379> XADD stream * message "hello tyy"
"1646747906781-0"
192.168.1.9:6379> XADD stream * message "hello lqq"
"1646747915368-0"
192.168.1.9:6379> XADD stream * message "hello hc"
"1646747918299-0"
192.168.1.9:6379> XRANGE stream - +
1) 1) "1646747759983-0"
2) 1) "message"
2) "hello world"
2) 1) "1646747906781-0"
2) 1) "message"
2) "hello tyy"
3) 1) "1646747915368-0"
2) 1) "message"
2) "hello lqq"
4) 1) "1646747918299-0"
2) 1) "message"
2) "hello hc"
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream >
1) 1) "stream"
2) 1) 1) "1646747906781-0"
2) 1) "message"
2) "hello tyy"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 0
1) 1) "stream"
2) 1) 1) "1646747906781-0"
2) 1) "message"
2) "hello tyy"
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy STREAMS stream 0
1) 1) "stream"
2) 1) 1) "1646747906781-0"
2) 1) "message"
2) "hello tyy"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 3 STREAMS stream 0
1) 1) "stream"
2) 1) 1) "1646747906781-0"
2) 1) "message"
2) "hello tyy"
192.168.1.9:6379> XACK stream g1 1646747906781-0
(integer) 1
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 3 STREAMS stream 0
1) 1) "stream"
2) (empty array)
192.168.1.9:6379>
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 3 STREAMS stream >
1) 1) "stream"
2) 1) 1) "1646747915368-0"
2) 1) "message"
2) "hello lqq"
2) 1) "1646747918299-0"
2) 1) "message"
2) "hello hc"
192.168.1.9:6379>
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 3 STREAMS stream 1646747915368-0
1) 1) "stream"
2) 1) 1) "1646747918299-0"
2) 1) "message"
2) "hello hc"
192.168.1.9:6379>
192.168.1.9:6379> del s1
(integer) 1
192.168.1.9:6379>
192.168.1.9:6379> EXISTS s1
(integer) 0
192.168.1.9:6379>
192.168.1.9:6379> XADD s1 * message "hello tyy"
"1646834322070-0"
192.168.1.9:6379> XADD s1 * message "hello lqq"
"1646834325931-0"
192.168.1.9:6379> XADD s1 * message "hello hc"
"1646834331433-0"
192.168.1.9:6379> XADD s1 * message "hello hrc"
"1646834335236-0"
192.168.1.9:6379> XADD s1 * message "hello ljm"
"1646834338844-0"
192.168.1.9:6379>
192.168.1.9:6379> XGROUP CREATE s1 g1 0-0
OK
192.168.1.9:6379>
192.168.1.9:6379> XLEN s1
(integer) 5
192.168.1.9:6379> XRANGE s1 - +
1) 1) "1646834322070-0"
2) 1) "message"
2) "hello tyy"
2) 1) "1646834325931-0"
2) 1) "message"
2) "hello lqq"
3) 1) "1646834331433-0"
2) 1) "message"
2) "hello hc"
4) 1) "1646834335236-0"
2) 1) "message"
2) "hello hrc"
5) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 >
1) 1) "s1"
2) 1) 1) "1646834322070-0"
2) 1) "message"
2) "hello tyy"
2) 1) "1646834325931-0"
2) 1) "message"
2) "hello lqq"
3) 1) "1646834331433-0"
2) 1) "message"
2) "hello hc"
4) 1) "1646834335236-0"
2) 1) "message"
2) "hello hrc"
5) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 0
1) 1) "s1"
2) 1) 1) "1646834322070-0"
2) 1) "message"
2) "hello tyy"
2) 1) "1646834325931-0"
2) 1) "message"
2) "hello lqq"
3) 1) "1646834331433-0"
2) 1) "message"
2) "hello hc"
4) 1) "1646834335236-0"
2) 1) "message"
2) "hello hrc"
5) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1
1) 1) "s1"
2) 1) 1) "1646834322070-0"
2) 1) "message"
2) "hello tyy"
2) 1) "1646834325931-0"
2) 1) "message"
2) "hello lqq"
3) 1) "1646834331433-0"
2) 1) "message"
2) "hello hc"
4) 1) "1646834335236-0"
2) 1) "message"
2) "hello hrc"
5) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 11111111111
1) 1) "s1"
2) 1) 1) "1646834322070-0"
2) 1) "message"
2) "hello tyy"
2) 1) "1646834325931-0"
2) 1) "message"
2) "hello lqq"
3) 1) "1646834331433-0"
2) 1) "message"
2) "hello hc"
4) 1) "1646834335236-0"
2) 1) "message"
2) "hello hrc"
5) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1111111111111
1) 1) "s1"
2) 1) 1) "1646834322070-0"
2) 1) "message"
2) "hello tyy"
2) 1) "1646834325931-0"
2) 1) "message"
2) "hello lqq"
3) 1) "1646834331433-0"
2) 1) "message"
2) "hello hc"
4) 1) "1646834335236-0"
2) 1) "message"
2) "hello hrc"
5) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 11111111111111
1) 1) "s1"
2) (empty array)
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834322070-0
1) 1) "s1"
2) 1) 1) "1646834325931-0"
2) 1) "message"
2) "hello lqq"
2) 1) "1646834331433-0"
2) 1) "message"
2) "hello hc"
3) 1) "1646834335236-0"
2) 1) "message"
2) "hello hrc"
4) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379>
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834325931-0
1) 1) "s1"
2) 1) 1) "1646834331433-0"
2) 1) "message"
2) "hello hc"
2) 1) "1646834335236-0"
2) 1) "message"
2) "hello hrc"
3) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834331433-0
1) 1) "s1"
2) 1) 1) "1646834335236-0"
2) 1) "message"
2) "hello hrc"
2) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834335236-0
1) 1) "s1"
2) 1) 1) "1646834338844-0"
2) 1) "message"
2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834338844-0
1) 1) "s1"
2) (empty array)
192.168.1.9:6379>
|