IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> redis专题-----11-----redis订阅发布以及stream -> 正文阅读

[大数据]redis专题-----11-----redis订阅发布以及stream

参考文章:
Redis发布订阅模式(publish/subscribe)

一 订阅发布

1 概念

  • 1)为了支持消息的多播机制,redis 引入了发布订阅模块。Redis发布/订阅(Pub/Sub)是一种通信机制,将数据推到某个信息管道中,其他客户端可通过订阅这些管道来获取推送信息,以此用于消息的传输。
  • 2)发布者发布的消息分到不同的频道,不需要知道什么样的订阅者者订阅。订阅者对一个或多个频道感兴趣,只需要接收感兴趣的消息,不需要知道什么样的发布者发布。主要目的是解除消息的发布者与订阅者之间的耦合关系。
  • 3)发布者和订阅者都是Redis客户端,频道则是服务器端。
  • 4)由三部分组成:发布者(Publisher)、频道(Channel)、订阅者(Subscriber)。

2 原理

2.1 底层结构原理
Redis通过PUBLISH、PUBSUB、SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE和PUNSUBSCRIBE等命令实现发布和订阅功能。

在Redis底层结构中,客户端和频道的订阅关系是通过一个字典加链表的结构保存的,如下图:

在这里插入图片描述

在 Redis 的底层结构中,redis服务器中定义了一个pubsub_channels字典,用于保存所有频道的订阅关系,在这个字典中,key为所有频道名称,value结构是一个链表,其中存放的是所有订阅这个频道的订阅者客户端。subscribe命令的实质即为在key中添加value的订阅链。

若频道首次被订阅说明在字典中并不存在该渠道的信息,那么程序首先要新建一个对应的 key,并且要赋值一个空链表,然后将对应的客户端加入到链表中。此时链表只有一个元素。若该渠道已经被其他客户端订阅过:这个时候就直接找到key值对应的value客户端信息添加到链表的末尾即可。

2.2 消费者消费消息原理
生产者生产一次消息,由redis负责将消息复制到多个消息队列中,每个消息队列由相应的消费者进行消费。
它是分布式系统中常用的一种解耦方式,用于将多个消费者的逻辑进行拆分。多个消费者的逻辑就可以放到不同的子系统中完成。

看下图,此时m1队列可以认为是上面结构原理的频道,多个subscriber及其内部的队列相当于上面结构原理的客户端链表。
在这里插入图片描述

3 订阅时服务器推送的消息格式

所有订阅接收的消息均为由三个元素组成的多块响应,其中第一个元素是消息类型,有四种类型:

  • 1)subscribe:该类型表示成功订阅到频道响应,第一个元素是消息类型,第二个元素为订阅的频道名称,第三个元素为已订阅的频道数量,例:
    在这里插入图片描述

  • 2)unsubscribe:该类型表示成功取消订阅到的频道响应,第一个元素是消息类型,第二个元素为订阅的频道名称,第三个元素为已订阅的频道数量,例:
    在这里插入图片描述

  • 3)message:该类型表示订阅者客户端接收到其他客户端发出的发布命令结果,第一个元素是消息类型,第二个元素表示来源频道的名称,第三个元素是实际的消息内容,例:
    首先在一个redis客户端连接随便订阅一到多个通道,我这里订阅两个。
    在这里插入图片描述
    然后开启一个新的客户端连接。用于往上面的某个通道发布消息。此时原来的客户端即可看到发布的消息。
    在这里插入图片描述

  • 4)pmessage:这个类型是使用订阅模式频道时,redis返回推送消息的类型。以下面的例子的截图为例,第一行元素是消息类型,第二个是订阅模式,第三个是具体的订阅频道,第四个是时基的消息内容。和message相比,只多了第二行的订阅模式的信息返回。
    在这里插入图片描述

4 发布订阅命令

4.1 发布订阅命令语法

1. 订阅频道
SUBSCRIBE命令为订阅频道,返回值如上面的消息格式所述。

SUBSCRIBE channel [channel ...]

2. 订阅模式频道
PSUBSCRIBE是订阅给定的模式(patterns)。

PSUBSCRIBE pattern [pattern ...]

例如:

h?llo subscribes to hello, hallo and hxllo
h*llo subscribes to hllo and heeeello
h[ae]llo subscribes to hello and hallo, but not hillo
如果想输入普通的字符,可以在前面添加\

3. 取消订阅频道
指示客户端退订给定的频道,若没有指定频道,则退订所有频道。

如果没有频道被指定,即,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。 在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。

UNSUBSCRIBE [channel [channel ...]]

4. 取消订阅模式频道

指示客户端退订指定模式,若果没有提供模式则退出所有模式。

如果没有模式被指定,即一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。 在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。

PUNSUBSCRIBE [pattern [pattern ...]]

5. 发布具体频道或模式频道的内容
将信息 message 发送到指定的频道channel。

返回值integer-reply: 收到消息的客户端数量。

PUBLISH channel message

6. pubsub
PUBSUB 是自省命令,能够检测PUB/SUB子系统的状态。它由分别详细描述的子命令组成。
返回值array-reply: 活跃的信道列表,或者符合指定模式的信道。

PUBSUB subcommand [argument [argument ...]]

4.2 发布订阅命令例子

简单的发布订阅命令就不列举了,主要使用订阅模式频道以及发布模式频道来举例。
例子1:

# 1. 首先一个客户端先订阅模式频道
 PSUBSCRIBE new.*

在这里插入图片描述

# 2. 然后开启另一个客户端,发布模式频道消息。
publish new.showbiz 'c++ c go'

结果如下:
在这里插入图片描述

例子2:多个客户端订阅,其中一个客户端订阅方式为订阅频道,另一个客户端订阅方式为订阅模式频道。

这个例子想说明的是:当不同的订阅客户端使用 订阅某种模式 或者 符合该模式的具体某个频道 时,那么这些客户端都会接收到发布者推送的信息,但是两次接收的信息格式不同,一个为message类型,另一个为pmessage类型,消息内容一致。
例子和上面例子1差不多。

# 1. 首先一个客户端先订阅模式频道
 PSUBSCRIBE news.*
# 2. 然后再开启另一个客户端,但是使用订阅频道
 subscribe news.a news.b news.c
# 3. 然后开启另一个客户端,发布模式频道消息。总共使用了3个客户端。
publish news.a 'c++ c go'

下面看到,两个客户端都能收到发布的消息。
在这里插入图片描述

例子3:
这个想说明的是pubsub命令的简单使用。
下面用到两个客户端,直接看图操作即可。
在这里插入图片描述

关于更详细的发布订阅命令说明,请参考中文手册:http://redis.cn/commands.html。想要看什么命令,直接在 “直接搜索” 中输入命令即可。

5 注意

发布订阅功能一般要区别命令连接重新开启一个连接,因为命令连接严格遵循请求回应模式。
而pub/sub能收到redis主动推送的内容,所以实际项目中如果支持pub/sub的话,需要另开一条连接用于处理发布订阅。

在这里插入图片描述

6 缺点

发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去。这个传递可能发生以下情况:

  • 1)假如没有消费者,消息直接丢弃。
  • 2)假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失了。
  • 3)若订阅者订阅了频道,但自己读取消息的速度很慢的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
  • 4)另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃。

总结上面例子的缺点:

  • 1)redis无法对消息持久化存储,消息一旦被发送,如果没有订阅者接收,数据会丢失。(客户端原因)
  • 2)若订阅者订阅了频道,但自己读取消息的速度很慢的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。(客户端原因)
  • 3)另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃。(服务器原因)

二 stream

1 stream的介绍

  • 1)stream支持多播的可持久化消息队列。
  • 2)一个消息链表将加入的消息都串起来,每个消息都有一个唯一的消息ID和对应的内容。消息都是持久化的,redis 重启后,内容还在。
  • 3)每个 stream 对象通过一个 key 来唯一索引。每个 stream 都可以挂多个消费组(consumer group),每个消费组会有个游标 last_delivered_id 在 stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。
  • 4)stream 在第一次使用 xadd 命令后自动创建,而消费组不会自动创建,需要通过命令 xgroup create 进行创建,并且需要指定从 stream 的某个消息 ID 开始消费。
  • 5)每个消费组都是相互独立的,互相不受影响;也就是同一份 stream 内部的消息会被每个消费组都消费到。
  • 6)同一个消费组可以挂接多个消费者,这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标往前移动。
  • 7)消费者内部会有一个状态变量 pending_ids,它记录了当前已经被客户端读取,但是还没有 ack 的消息。当客户端 ack 一条消息后,pending_ids 将会删除该消息 ID。它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没有被处理。

上面几点关键是看懂stream、消费组、消费者这三者的作用。stream是用于持久化消息队列即存储消息的。消费组是独立的,不同消费组互不影响。每个消费组包含多个消费者,这些消费者之间是竞争关系。

例如stream、消费组、消费者三者作用的示例图:

在这里插入图片描述

2 基本命令

2.1 XADD、XDEL、XRANGE、XLEN、DEL

# 1. 向 stream 追加消息。
# 1)将指定的流条目追加到指定key的流中。 如果key不存在,作为运行这个命令的副作用,将使用流的条目自动创建key。
# 2)一个条目是由一组键值对组成的,它基本上是一个小的字典。 键值对以用户给定的顺序存储,并且读取流的命令(如XRANGE 或者 XREAD) 可以保证按照通过XADD添加的顺序返回。
# 3)XADD是唯一可以向流添加数据的Redis命令,但是还有其他命令, 例如XDEL和XTRIM,他们能够从流中删除数据。
# 4)将Stream ID指定为参数。流条目ID标识流内的给定条目。 如果指定的ID参数是字符*(星号ASCII字符),XADD命令会自动为您生成一个唯一的ID。 
# 但是,也可以指定一个良好格式的ID,以便新的条目以指定的ID准确存储, 虽然仅在极少数情况下有用。
# ID是由-隔开的两个数字组成的,例如:1526919030474-55。关于手动生成具体看中文文档解释:http://redis.cn/commands/xadd.html。
# 5)返回值bulk-string-reply:
# 该命令返回添加的条目的ID。如果ID参数传的是*,那么ID是自动生成的, 否则,命令仅返回用户在插入期间指定的相同的ID。
# 强调:ID不要自己设置,填*就好。除非用到MYSQL+REDIS、或者有分布式系统集群了REDIS,他们需要一个全局的唯一id,否则统一使用*即可。
XADD key ID field string [field string ...]

# 2. 从 stream 中删除消息。
# 1)从指定流中移除指定的条目,并返回成功删除的条目的数量,在传递的ID不存在的情况下, 返回的数量可能与传递的ID数量不同。
# 2)返回值integer-reply:返回成功删除的条目的数量。
# 3)理解删除条目的底层细节:
# Redis流以一种使其内存高效的方式表示:使用基数树来索引包含线性数十个Stream条目的宏节点。 通常,当你从Stream中删除一个条目的时候,条目并没有真正被驱逐,只是被标记为删除。
# 最终,如果宏节点中的所有条目都被标记为删除,则会销毁整个节点,并回收内存。 这意味着如果你从Stream里删除大量的条目,比如超过50%的条目,则每一个条目的内存占用可能会增加, 因为Stream将会开始变得碎片化。然而,流的表现将保持不变。
# 在Redis未来的版本中,当一个宏节点内删除条目达到一定数量的时候,我们有可能会触发节点垃圾回收机制。 目前,根据我们对这种数据结构的预期用途,还不太适合增加这样的复杂度。
XDEL key ID [ID ...]

# 3. 获取 stream 中消息列表,会自动过滤已经删除的消息。
# 1)特殊ID:- 和 +。特殊ID-和+分别表示流中可能的最小ID和最大ID。
# 2)不完全ID:具体看中文文档,很简单,但是注意,-18446744073709551615的-只范围而不是减法符号,例如0-10的-代表范围。
# 3)使用COUNT返回最大条目数:具体看中文文档,很简单。
# 4)迭代流:具体看中文文档,很简单。
# 5)获取单个项目:具体看中文文档,很简单。
# 6)返回值array-reply:
# 该命令返回ID与指定范围匹配的条目。返回的条目是完整的,这意味着ID和所有组成条目的字段都将返回。此外,返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。具体看中文文档解释。
# XRANGE的上面几点都理解了,看文档即可,主要知道如何遍历即可。
XRANGE key start end [COUNT count]

# 4. 获取 stream 消息长度。
# 1)返回流中的条目数。如果指定的key不存在,则此命令返回0,就好像该流为空。 但是请注意,与其他的Redis类型不同,零长度流是可能的,所以你应该调用TYPE 或者 EXISTS 来检查一个key是否存在。
# 2)一旦内部没有任何的条目(例如调用XDEL后),流不会被自动删除,因为可能还存在与其相关联的消费者组。
XLEN key 

# 5. 删除 stream 中所有消息。
# 1)返回值integer-reply:返回被删除的keys的数量。删除指定的一批keys,如果删除中的某些key不存在,则直接忽略。
DEL key [key ...]

2.2 XREAD、XGROUP、XREADGROUP、XACK

# 1. 独立消费。注:XREAD、COUNT、BLOCK、STREAMS这四个是关键字。
# 1)返回值array-reply:该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组(键名和为该键报告的条目)。
# 	报告的条目是完整的流条目,具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。
# 关于更详细的看中文文档(不过这个命令的中文文档我还不是看得很懂,还是要项目开发时才能理解更深入)。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

# 2. 创建消费者。
# 该命令实际可以参创建消费者、删除消费者组、删除消费者。具体看中文文档,比较简单,已看懂。
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

# 3. 消费消息。
# 关于更详细的看中文文档(不过这个命令的中文文档我还不是看得很懂(看下面例子,现在比较熟了),还是要项目开发时才能理解更深入,与XREAD一样)。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

# 4. 确认消费消息。
# 比较简单,就是用于处理服务器已经发送给客户端,但客户端未处理的消息。
# 返回值integer-reply:该命令返回成功确认的消息数。 某些消息ID可能不再是PEL的一部分(例如因为它们已经被确认), 而且XACK不会把他们算到成功确认的数量中。
XACK key group ID [ID ...]

2.3 演示上面的基本命令

例如添加消息后,再里面读取消息:

# 1. 往stream添加消息,redis服务会返回这个消息的ID。
192.168.1.9:6379> XADD stream * message "hello world"
"1646747759983-0"

# 2. 创建消费者组。因为消费者组必须自己创建,而上面的stream不存在时,XADD会自动创建。
# 此时的ID即0-0代表的是这个消费者组从0-0开始消费消息,即从头开始读取,意思更多的是代表一个范围。
192.168.1.9:6379> XGROUP CREATE stream g1 0-0
OK

# 3. 查看stream消息队列的长度。
192.168.1.9:6379> XLEN stream
(integer) 1

# 4. 查看具体的消息内容。
192.168.1.9:6379> XRANGE stream - +
1) 1) "1646747759983-0"
   2) 1) "message"
      2) "hello world"
      
# 5. 读取消息。
# 1)GROUP代表关键字。g1是上面创建的消费者组的名字。tyy是消费者名字,不存在会自动创建。
# 2)COUNT 1不写时,默认也是1.      
# 3)STREAMS代表关键字。后面stream是我们XADD时创建的流名字,可在XADD自行取名。
# 4)0代表我们要读取已经发送给客户端,但客户端未ACK的消息。因为这里还没有将任何消息发送过给任何客户端(消费者),所以返回空数组。
# 这里强调,这个命令的ID实际一般只有特殊ID">"与其它ID的区别。
#	1. 特殊ID>,意味着消费者希望只接收从未发送给任何其他消费者的消息。
#	2. 任意其他的ID,即0或任意其他有效ID或不完整的ID(只有毫秒时间部分),将具有返回发送命令的消费者的待处理条目的效果。所以,基本上如果ID不是>,命令将让客户端访问它的待处理条目(已发送给它,但尚未确认的条目)。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 0
1) 1) "stream"
   2) (empty array)
# 5)同4一样,ID=1、1111111111111、11111111111111不是特殊ID,所以返回空数组。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 1
1) 1) "stream"
   2) (empty array)
192.168.1.9:6379> 
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 1111111111111
1) 1) "stream"
   2) (empty array)
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 11111111111111
1) 1) "stream"
   2) (empty array)
192.168.1.9:6379>
# 6)传特殊字符>,能够读取从未发送给任何其他消费者的消息。例如上面XADD的message "hello world"消息还没有其它消费者读取过,那么使用特殊字符必定会返回。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream >
1) 1) "stream"
   2) 1) 1) "1646747759983-0"
         2) 1) "message"
            2) "hello world"

# 7)ID传负数会报错。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream -2
(error) ERR Invalid stream ID specified as stream command argument

# 8)由于上面传完特殊ID后,消息被消费者读取过即redis已经发送了,但是目前消费者还没有ACK,所以我们可以使用其它字符可以读取到message "hello world"这条消息。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 2
1) 1) "stream"
   2) 1) 1) "1646747759983-0"
         2) 1) "message"
            2) "hello world"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 1111111111111
1) 1) "stream"
   2) 1) 1) "1646747759983-0"
         2) 1) "message"
            2) "hello world"
# 不过注意,当我上面传13个1时,可以读取到消息,传14个1返回空数组了。估计与ID的位数有关(具体可自行查看ID的范围)。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 11111111111111
1) 1) "stream"
   2) (empty array)
# 传完14个1返回空数组后,消息正常仍然能传其它ID正常读取的。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 111111111111
1) 1) "stream"
   2) 1) 1) "1646747759983-0"
         2) 1) "message"
            2) "hello world"

# 9)若此时再用特殊字符读取,则会返回nil,因为此时stream中没有这样的消息,即不存在未发送过给客户端的消息。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream >
(nil)

# 10)若此时使用XACK确认处理完这条消息,redis会从待处理的消息队列去掉,那么使用其它任意ID读取消息时,那么不会再像上面第8点一样返回消息,而是返回空数组。
192.168.1.9:6379> XACK stream g1 1646747759983-0
(integer) 1
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 0
1) 1) "stream"
   2) (empty array)
192.168.1.9:6379> 

# 我在进行到下面的第6点的XADD时,发现redis报错:"MISCONF Redis is configured to save RDB snapshotsxxx"。
# 这个错误和一些配置修改看这里:https://blog.csdn.net/xc_zhou/article/details/80893326?spm=1001.2014.3001.5502。	大概意思就是内存和硬盘不足。
# 扩展硬盘大小的文章(内存很简单的,改一下配置即可):https://blog.csdn.net/hanpengyu/article/details/7475645
# 然后下面的命令就可以继续正常执行了,不过上面的命令内容最好重新输入一遍,防止影响正确的结果。

# 6. 然后我们多添加几条消息。
# 注意:message可以相同,因为每个消息可以认为是队列的一个元素,类似上图的id1,id2。
# 	这个元素可认为是一个map,它可以存放多个key-value结构,这里我只存放了一个key-value结构,即message "hello tyy"。
192.168.1.9:6379> XADD stream * message "hello tyy"
"1646747906781-0"
192.168.1.9:6379> XADD stream * message "hello lqq"
"1646747915368-0"
192.168.1.9:6379> XADD stream * message "hello hc"
"1646747918299-0"
# 加上原来那条,总共有4条数据。
192.168.1.9:6379> XRANGE stream - +
1) 1) "1646747759983-0"
   2) 1) "message"
      2) "hello world"
2) 1) "1646747906781-0"
   2) 1) "message"
      2) "hello tyy"
3) 1) "1646747915368-0"
   2) 1) "message"
      2) "hello lqq"
4) 1) "1646747918299-0"
   2) 1) "message"
      2) "hello hc"
192.168.1.9:6379> 

# 7. 然后使用特殊字符读取一条消息。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream >
1) 1) "stream"
   2) 1) 1) "1646747906781-0"
         2) 1) "message"
            2) "hello tyy"
            
# 8. 由于第7点读取了一条消息,且现在还未XACK,所以使用0可以从待处理中读取到这条消息。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 1 STREAMS stream 0
1) 1) "stream"
   2) 1) 1) "1646747906781-0"
         2) 1) "message"
            2) "hello tyy"
192.168.1.9:6379> 
# 注意,COUNT参数不填,默认是返回1条。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy STREAMS stream 0
1) 1) "stream"
   2) 1) 1) "1646747906781-0"
         2) 1) "message"
            2) "hello tyy"
# 如果COUNT填3条,由于此时待处理的消息只有一条1646747906781-0(原先的1646747759983被XACK了,所以不会存在待处理的队列中),所以也会返回一条。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 3 STREAMS stream 0
1) 1) "stream"
   2) 1) 1) "1646747906781-0"
         2) 1) "message"
            2) "hello tyy"

# 9. 使用XACK确认消息。
192.168.1.9:6379> XACK stream g1 1646747906781-0
(integer) 1
# 所以此时再次从待处理的队列中读取消息,便为空了。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 3 STREAMS stream 0
1) 1) "stream"
   2) (empty array)
192.168.1.9:6379>

# 10. 再次读取未向客户端发送过的消息。因为还剩两条,所以我使COUNT=3。实际COUNT大于等于2都行。
192.168.1.9:6379> 
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 3 STREAMS stream >
1) 1) "stream"
   2) 1) 1) "1646747915368-0"
         2) 1) "message"
            2) "hello lqq"
      2) 1) "1646747918299-0"
         2) 1) "message"
            2) "hello hc"
192.168.1.9:6379> 

# 11. 最后传指定ID。它会从该ID的下一个ID开始返回消息,返回的个数由COUNT决定,如果待处理的消息不足COUNT的个数,那么全部返回。如果该ID的下一个ID没有消息了,那么返回空数组。
# 不过只有这里有个疑问:为什么中文文档对这个命令的解释不是很准确,它描述说只有特殊字符和其它任意ID的区别吗,不是很理解中文文档想表达的一样。
192.168.1.9:6379> 
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 3 STREAMS stream 1646747915368-0
1) 1) "stream"
   2) 1) 1) "1646747918299-0"
         2) 1) "message"
            2) "hello hc"
192.168.1.9:6379>
# 再次验证第11点,即它会从该ID的下一个ID开始返回消息,返回的个数由COUNT决定,如果待处理的消息不足COUNT的个数,那么全部返回。如果该ID的下一个ID没有消息了,那么返回空数组。
# 结果是没有问题的。
192.168.1.9:6379> del s1
(integer) 1
192.168.1.9:6379> 
192.168.1.9:6379> EXISTS s1
(integer) 0
192.168.1.9:6379> 

# 1. 往stream中添加多个消息。
192.168.1.9:6379> XADD s1 * message "hello tyy"
"1646834322070-0"
192.168.1.9:6379> XADD s1 * message "hello lqq"
"1646834325931-0"
192.168.1.9:6379> XADD s1 * message "hello hc"
"1646834331433-0"
192.168.1.9:6379> XADD s1 * message "hello hrc"
"1646834335236-0"
192.168.1.9:6379> XADD s1 * message "hello ljm"
"1646834338844-0"
192.168.1.9:6379> 

# 2. 创建消费者组,它只能通过手动调XGROUP命令去创建。
192.168.1.9:6379> XGROUP CREATE s1 g1 0-0
OK
192.168.1.9:6379> 

# 3. 查看流消息队列的长度和内容。
192.168.1.9:6379> XLEN s1
(integer) 5
192.168.1.9:6379> XRANGE s1 - +
1) 1) "1646834322070-0"
   2) 1) "message"
      2) "hello tyy"
2) 1) "1646834325931-0"
   2) 1) "message"
      2) "hello lqq"
3) 1) "1646834331433-0"
   2) 1) "message"
      2) "hello hc"
4) 1) "1646834335236-0"
   2) 1) "message"
      2) "hello hrc"
5) 1) "1646834338844-0"
   2) 1) "message"
      2) "hello ljm"
192.168.1.9:6379> 

# 4. 先把消息发送给客户端(这里我COUNT=10可以把5条消息全部发送完),使消息进入待处理队列。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 >
1) 1) "s1"
   2) 1) 1) "1646834322070-0"
         2) 1) "message"
            2) "hello tyy"
      2) 1) "1646834325931-0"
         2) 1) "message"
            2) "hello lqq"
      3) 1) "1646834331433-0"
         2) 1) "message"
            2) "hello hc"
      4) 1) "1646834335236-0"
         2) 1) "message"
            2) "hello hrc"
      5) 1) "1646834338844-0"
         2) 1) "message"
            2) "hello ljm"
192.168.1.9:6379> 

# 5. 传0或者其它ID(不包含消息的唯一ID,即下面的tyy的xxx070、lqq的xxx931等它们自己的ID),代表读取已经发送给客户端但客户端未ACK的消息。这里COUNT=10可以读取待处理队列的全部5条消息。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 0
1) 1) "s1"
   2) 1) 1) "1646834322070-0"
         2) 1) "message"
            2) "hello tyy"
      2) 1) "1646834325931-0"
         2) 1) "message"
            2) "hello lqq"
      3) 1) "1646834331433-0"
         2) 1) "message"
            2) "hello hc"
      4) 1) "1646834335236-0"
         2) 1) "message"
            2) "hello hrc"
      5) 1) "1646834338844-0"
         2) 1) "message"
            2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1
1) 1) "s1"
   2) 1) 1) "1646834322070-0"
         2) 1) "message"
            2) "hello tyy"
      2) 1) "1646834325931-0"
         2) 1) "message"
            2) "hello lqq"
      3) 1) "1646834331433-0"
         2) 1) "message"
            2) "hello hc"
      4) 1) "1646834335236-0"
         2) 1) "message"
            2) "hello hrc"
      5) 1) "1646834338844-0"
         2) 1) "message"
            2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 11111111111
1) 1) "s1"
   2) 1) 1) "1646834322070-0"
         2) 1) "message"
            2) "hello tyy"
      2) 1) "1646834325931-0"
         2) 1) "message"
            2) "hello lqq"
      3) 1) "1646834331433-0"
         2) 1) "message"
            2) "hello hc"
      4) 1) "1646834335236-0"
         2) 1) "message"
            2) "hello hrc"
      5) 1) "1646834338844-0"
         2) 1) "message"
            2) "hello ljm"
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1111111111111
1) 1) "s1"
   2) 1) 1) "1646834322070-0"
         2) 1) "message"
            2) "hello tyy"
      2) 1) "1646834325931-0"
         2) 1) "message"
            2) "hello lqq"
      3) 1) "1646834331433-0"
         2) 1) "message"
            2) "hello hc"
      4) 1) "1646834335236-0"
         2) 1) "message"
            2) "hello hrc"
      5) 1) "1646834338844-0"
         2) 1) "message"
            2) "hello ljm"
# 但是这里注意,传14个1时返回了空数组。可能与位数有关。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 11111111111111
1) 1) "s1"
   2) (empty array)
192.168.1.9:6379> 

# 6. 传具体的消息ID。
# 它会从该ID的下一个ID开始返回消息,返回的个数由COUNT决定,如果待处理的消息不足COUNT的个数,那么全部返回。如果该ID的下一个ID没有消息了,那么返回空数组。
# 1)从070开始读,那么从下一个ID即931开始返回,共4条,由于COUNT=10,所以这4条会全部被返回。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834322070-0
1) 1) "s1"
   2) 1) 1) "1646834325931-0"
         2) 1) "message"
            2) "hello lqq"
      2) 1) "1646834331433-0"
         2) 1) "message"
            2) "hello hc"
      3) 1) "1646834335236-0"
         2) 1) "message"
            2) "hello hrc"
      4) 1) "1646834338844-0"
         2) 1) "message"
            2) "hello ljm"
192.168.1.9:6379> 
# 2)从931开始读,那么会从433开始返回,共返回3条。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834325931-0
1) 1) "s1"
   2) 1) 1) "1646834331433-0"
         2) 1) "message"
            2) "hello hc"
      2) 1) "1646834335236-0"
         2) 1) "message"
            2) "hello hrc"
      3) 1) "1646834338844-0"
         2) 1) "message"
            2) "hello ljm"
# 3)从433开始读,那么会从236开始返回,共返回2条。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834331433-0
1) 1) "s1"
   2) 1) 1) "1646834335236-0"
         2) 1) "message"
            2) "hello hrc"
      2) 1) "1646834338844-0"
         2) 1) "message"
            2) "hello ljm"
# 4)从236开始读,那么会从844开始返回,共返回1条。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834335236-0
1) 1) "s1"
   2) 1) 1) "1646834338844-0"
         2) 1) "message"
            2) "hello ljm"
# 5)从844开始读,由于844后面没有待处理的消息了,所以会返回空数组,代表这个ID是这个待处理消息队列的最后一个ID。
# 	但不能说待处理消息队列已经处理完毕,因为这个具体ID的前面可能有消息。判断是否待处理消息队列是否处理完毕,应该使用ID=0和XACK去判断,
#	因为ID=0时代表从头开始读取消息,如果返回空数组,那么这个待处理消息队列肯定是空,说明待处理消息队列已经处理完毕。
192.168.1.9:6379> XREADGROUP GROUP g1 tyy COUNT 10 STREAMS s1 1646834338844-0
1) 1) "s1"
   2) (empty array)
192.168.1.9:6379> 

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-10 22:36:15  更:2022-03-10 22:39:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:15:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码