1. 概念
- 将一个流从逻辑上划分为多个不同的流,并让消费者组属下的消费者去处理组中的消息
2. 创建消费者组
XGROUP CREATE stream group id : 在流stream中创建一个消费者组group, 该消费者组group只能获取流中指定id之后的元素(创建消费者组时应该保证流存在)
127.0.0.1:6379> XADD s1 * k1 v1
"1645946886436-0"
127.0.0.1:6379> XADD s1 * k2 v2
"1645946890084-0"
127.0.0.1:6379> XADD s1 * k3 v3
"1645946894132-0"
127.0.0.1:6379> XADD s1 * k4 v4
"1645946902211-0"
127.0.0.1:6379> XGROUP CREATE s1 g1 0-0
OK
127.0.0.1:6379> XGROUP CREATE s1 g2 $
OK
127.0.0.1:6379> XINFO GROUPS s1
1) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
2) 1) "name"
2) "g2"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1645946902211-0"
3. 修改消费者组的最后递送消息ID
XGROUP SETID stream group id : 将流stream中的消费者组group的最后递送id修改为新的值
127.0.0.1:6379> XGROUP SETID s1 g1 1645946890084-0
OK
127.0.0.1:6379> XINFO GROUPS s1
1) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1645946890084-0"
- 如果新ID大于旧ID,那么消费者可能会漏掉一些原本应该读取的消息
- 如果新ID小于旧ID,那么消费者可能会重新读取到一些之前已经被确认过的消息
4. 销毁消费者组
XGROUP DESTROY stream group : 销毁流stream中的group消费者组
127.0.0.1:6379> XGROUP DESTROY s1 g1
(integer) 1
127.0.0.1:6379> XINFO GROUPS s1
1) 1) "name"
2) "g2"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1645946902211-0"
5. 添加/移除消费者
- 消费者不用显式地创建,用户只要在执行XREADGROUP命令时给定消费者的名字,Redis就会自动为新出现的消费者创建相应的数据结构
127.0.0.1:6379> XREADGROUP GROUP g1 c1 COUNT 2 STREAMS s1 >
1) 1) "s1"
2) 1) 1) "1645948838352-0"
2) 1) "k1"
2) "v1"
2) 1) "1645948858829-0"
2) 1) "k1"
2) "v1"
127.0.0.1:6379> XREADGROUP GROUP g1 c2 COUNT 2 STREAMS s1 >
1) 1) "s1"
2) 1) 1) "1645948861917-0"
2) 1) "k2"
2) "v2"
2) 1) "1645948865325-0"
2) 1) "k3"
2) "v3"
- 删除消费者
127.0.0.1:6379> XGROUP DELCONSUMER s1 g1 c1
(integer) 2
6. 读取消费者组中的消息
127.0.0.1:6379> XREADGROUP GROUP g1 c1 COUNT 2 STREAMS s1 >
1) 1) "s1"
2) 1) 1) "1645948838352-0"
2) 1) "k1"
2) "v1"
2) 1) "1645948858829-0"
2) 1) "k1"
2) "v1"
127.0.0.1:6379> XREADGROUP GROUP g1 c1 COUNT 2 STREAMS s1 1645948838352-0
1) 1) "s1"
2) 1) 1) "1645948858829-0"
2) 1) "k1"
2) "v1"
7. 显示待处理消息的相关信息
127.0.0.1:6379> XPENDING s1 g1
1) (integer) 2
2) "1645948838352-0"
3) "1645948858829-0"
4) 1) 1) "c1"
2) "2"
8. 将消息标记为已经处理
127.0.0.1:6379> XPENDING s1 g1 - + 1 c1
1) 1) "1645948858829-0"
2) "c1"
3) (integer) 850504
4) (integer) 2
127.0.0.1:6379> XACK s1 g1 1645948858829-0
(integer) 1
127.0.0.1:6379> XPENDING s1 g1 - + 1 c1
(empty list or set)
9. XCLAIM:转移消息的归属权
超过5s中c1如果没有处理 “1645948838352-0” "1645948858829-0"这两条消息,则将消息转移给c2
127.0.0.1:6379> XREADGROUP GROUP g1 c1 STREAMS s1 >
1) 1) "s1"
2) 1) 1) "1645948838352-0"
2) 1) "k1"
2) "v1"
2) 1) "1645948858829-0"
2) 1) "k1"
2) "v1"
3) 1) "1645948861917-0"
2) 1) "k2"
2) "v2"
4) 1) "1645948865325-0"
2) 1) "k3"
2) "v3"
5) 1) "1645948872253-0"
2) 1) "k4"
2) "v4"
6) 1) "1645948876582-0"
2) 1) "k5"
2) "v5"
127.0.0.1:6379> XCLAIM s1 g1 c2 5000 1645948838352-0 1645948858829-0
1) 1) "1645948838352-0"
2) 1) "k1"
2) "v1"
2) 1) "1645948858829-0"
2) 1) "k1"
2) "v1"
127.0.0.1:6379> XREADGROUP GROUP g1 c1 STREAMS s1 0
1) 1) "s1"
2) 1) 1) "1645948861917-0"
2) 1) "k2"
2) "v2"
2) 1) "1645948865325-0"
2) 1) "k3"
2) "v3"
3) 1) "1645948872253-0"
2) 1) "k4"
2) "v4"
4) 1) "1645948876582-0"
2) 1) "k5"
2) "v5"
127.0.0.1:6379> XREADGROUP GROUP g1 c2 STREAMS s1 0
1) 1) "s1"
2) 1) 1) "1645948838352-0"
2) 1) "k1"
2) "v1"
2) 1) "1645948858829-0"
2) 1) "k1"
2) "v1"
10. XINFO:查看流和消费者组的相关信息
- 打印消费者信息
127.0.0.1:6379> XINFO CONSUMERS s1 g1
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 4
5) "idle"
6) (integer) 161297
2) 1) "name"
2) "c2"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 78816
3) 1) "name"
2) "c3"
3) "pending"
4) (integer) 0
5) "idle"
6) (integer) 1189776
- 打印消费者组信息
127.0.0.1:6379> XINFO GROUPS s1
1) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 3
5) "pending"
6) (integer) 6
7) "last-delivered-id"
8) "1645948876582-0"
- 打印流信息
127.0.0.1:6379> XINFO STREAM s1
1) "length"
2) (integer) 6
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1
9) "last-generated-id"
10) "1645948876582-0"
11) "first-entry"
12) 1) "1645948838352-0"
2) 1) "k1"
2) "v1"
13) "last-entry"
14) 1) "1645948876582-0"
2) 1) "k5"
2) "v5"
11. 小结
- 消费者组是一个逻辑结构,数据可以共享,即各个消费者组之间可以有交集
- 在同一个消费者组中,一个元素只能给1个消费者
- 消费者在消费者组中读取的第一个元素是last-delivered-id的后一个元素
- XACK命令不会改变last-delivered-id
参考文章: 《Redis使用手册》 黄健宏
|