IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案) -> 正文阅读

[大数据]Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)

一.消息队列

消息队列:分布式系统必备的一个基础软件,能支持组件通信消息快速读写

Redis本身支持数据的快速访问,满足消息队列的读写性能需求

二.Redis适合做消息队列吗?

消息队列的消息存取需求

消息队列存取消息的过程

  • 在分布式系统中,两个组件要基于消息队列进行通信,一个组件就会把要处理的数据以消息的形式传递给消息队列,然后这个组件就可以继续执行其他操作;
  • 远端的另一个组件从消息队列中把消息读取出来,在本地进行处理。

需求:

  • 组件1需要对采集到的数据进行求和计算,并写入数据库
  • 消息到达速度很快,组件1没有办法及时既做采集又做计算,并写入数据库

解决方案:

消息队列:

  • 组件1把数据x和y保存为JSON格式的消息,再把它发送到消息队列,这样就可以继续接受新的数据。
  • 组件2从消息队列中 把数据读取出来在服务器2上进行求和计算上,再写入数据库。

通用的消息队列的架构模型:

? ? ??

消息队列存取消息时候,必须要满足的三个需求:

  • ? ? 消息顺序性?
  • ? ? 消息幂等性
  • ? ? 保证消息的可靠性

消息的顺序性

? ? ?消息顺序被消费者异步处理,但是消费者仍然按照生产者发送消息的顺序来处理消息,避免后被发送的消息先被处理了。

? ? ?需求:对于消息顺序性的场景来看,一旦出现消息乱序处理时,会导致业务逻辑被错误执行,给业务方造成损失。

重复消息处理

? ? ?消费者从 消息队列读取消息时,有时候会因为网络堵塞出现消息重传的情况。此时,消费者可能会收到多条重复消息。对于重复消息,消费者如果多次处理的话,可能造成一个业务逻辑被多次执行,如果业务逻辑正好要修改数据,就会出现数据被多次修改的问题。

消息可靠性

? ? ? 消费者在处理消息的时候,可能出现因为故障 或者宕机导致消息没有处理完就丢失的情况。当消费者重启时候,可以重新读取消息再次进行处理,否则就会 出现消息漏处理的问题。

Redis如何实现消息队列的需求

? ? ?基于List消息队列解决方案

? ? ?List本身就是按照先进先出的顺序对数据进行存取,所以如果使用List作为消息队列保存 消息的话,就可以满足消息的顺序性

? ? 生产者使用LPUSH命令要把发送的消息依次写入list,消费者通过RPOP命令从LIST的另一端按照消息的写入顺序,依次读取消息并处理。? ?

? ?存在问题:

? ? ?生产者往list写入数据时,List并不会主动通知消费者有新消息写入,如果消费者想要及时处理消息,就需要程序不断调用RPOP命令(比如使用一个while(1)循环),如果新消息写入,RPOP就会返回结果,否则,RPOP命令返回空值,再继续循环

? ? ?危害:

? ? ? ? 没有新消息写入LIST消费者也要不停的调用RPOP命令,这就会导致消费者程序cpu一直消耗在执行RPOP命令上,带来不必要的性能损失

? ? 解决:

? ? ? ? ?Redis提供了BRPOP命令。BRPOP命令,也称为阻塞式读取客户端在没有读取到队列数据时,自动阻塞,知道有新的数据写入队列,再开始读取新数据,和消费者程序在自己不停调用RPOP命令相比,这种方式能节省CPU开销。

? ? ? ??

重复消息的处理:消息的幂等性

? ? ? ?消费者程序本身可以对重复消息进行判断

? ? ? 消息队列要能给每个消息提供全局唯一的ID号;另一方面,消费者程序要把已经处理过的消息ID记录下来。当收到一条消息后,消费者程序可以对比收到的消息ID和记录处理过的消息ID。来判断当前收到的消息有么有经过处理。

? ? ?如果已经处理 过了就不再处理了。这种处理特性被称为消息 幂等性。

? ? ?幂等性:对于同一消息,消费者收到生成一次的处理结果和收到多次的处理结果是一致的。

不过List本身不会为每个消息生成ID号的,所以,消息的全局唯一ID号就需要生产者程序发送消息前自行生成,生成之后,我们在用LPUSH命令把消息插入List中,需要在消息中包含这个全局唯一ID。

消息可靠性:

? ? ? List 类型是如何保证消息可靠性---?备份

? ? ?背景:? 消费者List中读取一条消息后,List就不会存留这条消息,所以如果消费者程序在处理消息的过程中出现了故障或者宕机,就会导致消息没有处理完成,那么消费者程序再次启动就会导致消息丢失。

? ? 解决方案:为了存留消息,list提供了BRPOPLUSH命令,这个命令的作用就是让消费者从一个List中读取消息,同时Redis会把这个消息再插入到另一个List(可以叫作备份 List)留存。

? ? ? 如果消费者程序读取了消息但是没能正常处理,等它重启以后就可以从备份List中重新读取消息并进行处理。

? ? ? 生产者消息发送很快,而消费者处理消息的速度缓慢,这就导致List中消息堆积的很多,给Redis内存带来压力

? ? ?启动多个消费者程序组成消费组,一起分担处理 List中消息的消息。但是List类型并不支持消费组的实现。

基于Stream消息队列解决方案

streams是Redis专门为消息队列设计 的数据类型:

  • XADD插入消息,保证有序,可以自动生成全局唯一ID;
  • XREAD用于读取消息,可以按ID读取数据;
  • XREADGROUP按消费组的形式读取消息;
  • XPENDING和XACK:?XPENDING查询每个消费组内所有消费者已读取但是尚未确认消息,ASCK命令用于向消息队列确认消息处理已经完成。

XADD命令

可以往消息队列中插入新消息,消息的格式 是键-值对形式。对于插入的每一条消息,Streams可以自动为其生成一个全局唯一ID。

XADD mqstream * repo 5
"1599203861727-0"

可以往名称为mqstream的消息队列插入一条消息,消息的键为 repo, 值为5;

消息队列中的* ,表示让Redis为插入数据自动生成一个全局唯一的ID,例如"1599203861727-0"

也可以自行设定一个ID号,保证这个ID号是全局唯一的就行。不过使用*号会更加方便高效。

消息的全局唯一ID由两部分组成

  • ? ?第一部分"1599203861727"是指当前时间戳 毫秒级
  • ? ?第二部分表示插入消息在当前毫秒内的消息序列,这是从0开始编号的,
  • ? “1599203861727-0”就表示在“1599203861727”毫秒内的第 1 条消息。

XREAD 命令

? ? ? ?使用XREAD命令从消息队列读取

? ? ? ? XREAD在读取消息时候,可以指定一个消息ID,并从这个消息ID的下一条消息开始进行读取。例如我们可以执行下面的命令,从ID号为 1599203861727-0 的消息开始,读取后续的所有消息:

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      2) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      3) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

消息者也可以在调用XREAD时设定block配置项,实现类似于BRPOP的阻塞读取操作。

当消息队列中没有消息时,一旦设置了block配置项,XREAD就会阻塞;

阻塞的时长可以在block配置项进行设置。

XREAD block 10000 streams mqstream $
(nil)
(10.00s)

? ? ? ?,命令最后的$符号表示读取最新消息,同时设置block 10000配置项,1000的单位是毫秒,表示XREAD 在读取最新消息时,如果没有消息到来,XREAD 将阻塞 10000 毫秒(即 10 秒),然后再返回。上面命令中XREAD执行后,消息队列命令中mqstream 中一直没有消息XREAD 在 10 秒后返回空值(nil)。

? ? ?

消费组

? ? ??Stream本身可以使用XGROUP创建消费组,创建消费组后,Stream可以使用XREADGROUP命令让消费组内的消费者读取消息

? ? ??

XGROUP create mqstream group1 0
ok

? ?我们再执行一段命令,让GROUP1消费组中的消费者consumer1 从 mqstream 中读取所有消息

XREADGROUP group group1 cinsumer1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
      2) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      3) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      4) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

让group1消费组里的消费者consumer1从mqstream中读取所有消息,

命令">"表示从第一天尚未被消费的消息开始读取。

因为在consumer1读取消息前,group1并没有其他消费者读取过消息,所以consumer1就得到了mqstream消息队列中的所有消息。

消息队列中的消息一旦被消费组里的一个消息读取了,就不能再被该消费组内的其他消费者读取。

我们继续执行下面命令


XREADGROUP group group1 consumer2  streams mqstream 0
1) 1) "mqstream"
   2) (empty list or set)

比如说,我们执行完刚才的 XREADGROUP 命令后,再执行下面的命令,让 group1 内的 consumer2 读取消息时,consumer2 读到的就是空值,因为消息已经被 consumer1 读取完了?

消费组的目的?

? ? 让组内多个消费者共同分担读取消息,通常会让每个消费者读取部分消息,从而实现让组内的多个消费者共同分担读取消息,实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。

XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"

XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"

XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"

保证消费者在发生故障或者宕机再次重启时,让可以读取未处理完的消息,stream会自动使用内部队列(PENDING List)留存消费组里 每个消费者读取的消息;

直到消费者使用XACK命令通知Streams消息已经被处理完成。

如果消费者没有成功处理消息,他就不会给Stream发送XACK命令,消息仍然会留存。

此时消费者可以在重启后,用XPENDING 命令查看已读取、但尚未确认处理完成的消息。

XPEBDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

查看group2中各个消费者已读取,但是尚未确认的消息个数。其中,XPENDING返回结果的第二行第三行分别表示group2中所有消费者读取的消息最小ID和最大ID。

XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)

consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。

基于List基于Streams
消息顺序性LPUSH/RPOPXADD/XREAD
阻塞读取BRPOPXREAD block
重复消息处理生产者自行实现全局唯一IDStreams自动生成全局唯一ID
消息可靠性BRPOPLPUSH使用PENDING List自动存留消息,使用XPENDING查看,使XACK确认
适用场景

Redis 5.0前版本

部署环境消息总量小

Redis 5.0以后版本

部署环境消息总量大,需要以消费组的形式读取数据

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-03 16:21:05  更:2022-03-03 16:21:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 21:34:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码