IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Redis之消费者组 -> 正文阅读

[大数据]Redis之消费者组

1. 概念

  • 将一个流从逻辑上划分为多个不同的流,并让消费者组属下的消费者去处理组中的消息

在这里插入图片描述


2. 创建消费者组

XGROUP CREATE stream group id: 在流stream中创建一个消费者组group, 该消费者组group只能获取流中指定id之后的元素(创建消费者组时应该保证流存在)

127.0.0.1:6379> XADD s1 * k1 v1
"1645946886436-0"
127.0.0.1:6379> XADD s1 * k2 v2
"1645946890084-0"
127.0.0.1:6379> XADD s1 * k3 v3
"1645946894132-0"
127.0.0.1:6379> XADD s1 * k4 v4
"1645946902211-0"
127.0.0.1:6379> XGROUP CREATE s1 g1 0-0  # g1消费者组中含有流s1中的所有元素
OK
127.0.0.1:6379> XGROUP CREATE s1 g2 $  # $ 表示只获取流中最新的元素 此时g2中没有元素 后面新添加的元素会进入g2
OK
127.0.0.1:6379> XINFO GROUPS s1
1) 1) "name"
   2) "g1"   # 消费者组g1
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"  # 最后递送消息ID
2) 1) "name"
   2) "g2"   # 消费者组g2
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1645946902211-0"  # 最后递送消息ID

3. 修改消费者组的最后递送消息ID

XGROUP SETID stream group id: 将流stream中的消费者组group的最后递送id修改为新的值

127.0.0.1:6379> XGROUP SETID s1 g1 1645946890084-0
OK
127.0.0.1:6379> XINFO GROUPS s1
1) 1) "name"
   2) "g1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1645946890084-0"
  • 如果新ID大于旧ID,那么消费者可能会漏掉一些原本应该读取的消息
  • 如果新ID小于旧ID,那么消费者可能会重新读取到一些之前已经被确认过的消息

4. 销毁消费者组

XGROUP DESTROY stream group: 销毁流stream中的group消费者组

127.0.0.1:6379> XGROUP DESTROY s1 g1
(integer) 1
127.0.0.1:6379> XINFO GROUPS s1
1) 1) "name"
   2) "g2"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1645946902211-0"

5. 添加/移除消费者

  1. 消费者不用显式地创建,用户只要在执行XREADGROUP命令时给定消费者的名字,Redis就会自动为新出现的消费者创建相应的数据结构
127.0.0.1:6379> XREADGROUP GROUP g1 c1 COUNT 2 STREAMS s1 >  # 从g1开始处往后读2个元素
1) 1) "s1"
   2) 1) 1) "1645948838352-0"
         2) 1) "k1"
            2) "v1"
      2) 1) "1645948858829-0"
         2) 1) "k1"
            2) "v1"
127.0.0.1:6379> XREADGROUP GROUP g1 c2 COUNT 2 STREAMS s1 >  # 从1645948858829-0处往后读2个元素
1) 1) "s1"
   2) 1) 1) "1645948861917-0"
         2) 1) "k2"
            2) "v2"
      2) 1) "1645948865325-0"
         2) 1) "k3"
            2) "v3"
  1. 删除消费者
127.0.0.1:6379> XGROUP DELCONSUMER s1 g1 c1
(integer) 2   # 返回2说明c1中有两条消息未被处理

6. 读取消费者组中的消息

127.0.0.1:6379> XREADGROUP GROUP g1 c1 COUNT 2 STREAMS s1 >  # 往c1中加入元素(>表示从没有被发送的消息中开始读)
1) 1) "s1"
   2) 1) 1) "1645948838352-0"
         2) 1) "k1"
            2) "v1"
      2) 1) "1645948858829-0"
         2) 1) "k1"
            2) "v1"
127.0.0.1:6379> XREADGROUP GROUP g1 c1 COUNT 2 STREAMS s1 1645948838352-0  # 读取c1中id大于1645948838352-0的元素
1) 1) "s1"
   2) 1) 1) "1645948858829-0"
         2) 1) "k1"
            2) "v1"

7. 显示待处理消息的相关信息

127.0.0.1:6379> XPENDING s1 g1
1) (integer) 2  # 消费者组中待处理消息的数量
2) "1645948838352-0"  # 首条待处理消息的id
3) "1645948858829-0"  # 最后一条待处理消息的id
4) 1) 1) "c1"   # 消费者的信息
      2) "2"

8. 将消息标记为已经处理

127.0.0.1:6379> XPENDING s1 g1 - +  1 c1
1) 1) "1645948858829-0"
   2) "c1"
   3) (integer) 850504
   4) (integer) 2
127.0.0.1:6379> XACK s1 g1 1645948858829-0  # 将g1中c1中的消息标记为已处理
(integer) 1
127.0.0.1:6379> XPENDING s1 g1 - +  1 c1
(empty list or set)

9. XCLAIM:转移消息的归属权

超过5s中c1如果没有处理 “1645948838352-0” "1645948858829-0"这两条消息,则将消息转移给c2

127.0.0.1:6379> XREADGROUP GROUP g1 c1 STREAMS s1 >
1) 1) "s1"
   2) 1) 1) "1645948838352-0"
         2) 1) "k1"
            2) "v1"
      2) 1) "1645948858829-0"
         2) 1) "k1"
            2) "v1"
      3) 1) "1645948861917-0"
         2) 1) "k2"
            2) "v2"
      4) 1) "1645948865325-0"
         2) 1) "k3"
            2) "v3"
      5) 1) "1645948872253-0"
         2) 1) "k4"
            2) "v4"
      6) 1) "1645948876582-0"
         2) 1) "k5"
            2) "v5"
127.0.0.1:6379> XCLAIM s1 g1 c2 5000 1645948838352-0 1645948858829-0
1) 1) "1645948838352-0"
   2) 1) "k1"
      2) "v1"
2) 1) "1645948858829-0"
   2) 1) "k1"
      2) "v1"
127.0.0.1:6379> XREADGROUP GROUP g1 c1 STREAMS s1 0
1) 1) "s1"
   2) 1) 1) "1645948861917-0"
         2) 1) "k2"
            2) "v2"
      2) 1) "1645948865325-0"
         2) 1) "k3"
            2) "v3"
      3) 1) "1645948872253-0"
         2) 1) "k4"
            2) "v4"
      4) 1) "1645948876582-0"
         2) 1) "k5"
            2) "v5"
127.0.0.1:6379> XREADGROUP GROUP g1 c2 STREAMS s1 0
1) 1) "s1"
   2) 1) 1) "1645948838352-0"
         2) 1) "k1"
            2) "v1"
      2) 1) "1645948858829-0"
         2) 1) "k1"
            2) "v1"

10. XINFO:查看流和消费者组的相关信息

  1. 打印消费者信息
127.0.0.1:6379> XINFO CONSUMERS s1 g1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 4
   5) "idle"
   6) (integer) 161297
2) 1) "name"
   2) "c2"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 78816
3) 1) "name"
   2) "c3"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 1189776
  1. 打印消费者组信息
127.0.0.1:6379> XINFO GROUPS s1
1) 1) "name"
   2) "g1"
   3) "consumers"
   4) (integer) 3
   5) "pending"
   6) (integer) 6
   7) "last-delivered-id"
   8) "1645948876582-0"
  1. 打印流信息
127.0.0.1:6379> XINFO STREAM s1
 1) "length"
 2) (integer) 6
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1645948876582-0"
11) "first-entry"
12) 1) "1645948838352-0"
    2) 1) "k1"
       2) "v1"
13) "last-entry"
14) 1) "1645948876582-0"
    2) 1) "k5"
       2) "v5"

11. 小结

在这里插入图片描述

  1. 消费者组是一个逻辑结构,数据可以共享,即各个消费者组之间可以有交集
  2. 在同一个消费者组中,一个元素只能给1个消费者
  3. 消费者在消费者组中读取的第一个元素是last-delivered-id的后一个元素
  4. XACK命令不会改变last-delivered-id

参考文章:
《Redis使用手册》 黄健宏

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-28 15:36:32  更:2022-02-28 15:37:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 21:55:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码