一 简介
RocketMQ是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
官方文档:https://rocketmq.apache.org/docs/quick-start/ github中文主页:https://github.com/apache/rocketmq/tree/master/docs/cn
1.1 核心概念
- Topic:消息主题,一级消息类型,通过Topic对消息进行分类。更多信息,请参见Topic与Tag最佳实践。
- 消息(Message):消息队列中信息传递的载体。
- Message ID:消息的全局唯一标识,由消息队列RocketMQ版系统自动生成,唯一标识某条消息。
- Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。
- Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类。更多信息,请参见Topic与Tag最佳实践。
- Producer:消息生产者,也称为消息发布者,负责生产并发送消息。
- Producer实例:Producer的一个对象实例,不同的Producer实例可以运行在不同进程内或者不同机器上。Producer实例线程安全,可在同一进程内多线程之间共享。
- Consumer:消息消费者,也称为消息订阅者,负责接收并消费消息。可分为两类:
- (1)Push Consumer:消息由消息队列RocketMQ版推送至Consumer。
- (2)Pull Consumer:该类Consumer主动从消息队列RocketMQ版拉取消息。目前仅TCP Java SDK支持该类Consumer。
- 分区:即Topic Partition,物理上的概念。每个Topic包含一个或多个分区。
- 消费位点:每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset;分区的起始位置对应的位置叫做起始位点MinOffset。消息队列RocketMQ版的Pull Consumer会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为消费位点ConsumerOffset。剩余的未消费的条数(也称为消息堆积量)= 最大位点MaxOffset-消费位点ConsumerOffset。
- Consumer实例:Consumer的一个对象实例,不同的Consumer实例可以运行在不同进程内或者不同机器上。一个Consumer实例内配置线程池消费消息。
- Group:一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
- Group ID:Group的标识。
- 队列:每个Topic下会由一到多个队列来存储消息。每个Topic对应队列数与消息类型以及实例所处地域(Region)相关,具体的队列数可提交工单咨询。
- Exactly-Once投递语义:Exactly-Once投递语义是指发送到消息系统的消息只能被Consumer处理且仅处理一次,即使Producer重试消息发送导致某消息重复投递,该消息在Consumer也只被消费一次。更多信息,请参见Exactly-Once投递语义。
- 集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。更多信息,请参见集群消费和广播消费。
- 广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。更多信息,请参见集群消费和广播消费。
- 定时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。更多信息,请参见定时和延时消息。
- 延时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。更多信息,请参见定时和延时消息。
- 事务消息:消息队列RocketMQ版提供类似XA或Open XA的分布事务功能,通过消息队列RocketMQ版的事务消息能达到分布式事务的最终一致。更多信息,请参见事务消息。
- 顺序消息:消息队列RocketMQ版提供的一种按照顺序进行发布和消费的消息类型,分为全局顺序消息和分区顺序消息。更多信息,请参见顺序消息。
- 全局顺序消息:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。更多信息,请参见顺序消息。
- 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。更多信息,请参见顺序消息。
- 消息堆积:Producer已经将消息发送到消息队列RocketMQ版的服务端,但由于Consumer消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列RocketMQ版的服务端保存着未被消费的消息,该状态即消息堆积。
- 消息过滤:Consumer可以根据消息标签(Tag)对消息进行过滤,确保Consumer最终只接收被过滤后的消息类型。消息过滤在消息队列RocketMQ版的服务端完成。更多信息,请参见消息过滤。
- 消息轨迹:在一条消息从Producer发出到Consumer消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从Producer发出,经由消息队列RocketMQ版服务端,投递给Consumer的完整链路,方便定位排查问题。更多信息,请参见查询消息轨迹。
- 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅的Topic的消费进度,设置完成后Consumer将接收设定时间点之后由Producer发送到消息队列RocketMQ版服务端的消息。更多信息,请参见重置消费位点。
- 死信队列:死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列RocketMQ版会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明Consumer在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ版不会立刻将消息丢弃,而是将这条消息发送到该Consumer对应的特殊队列中。消息队列RocketMQ版将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
- 消息路由:消息路由常用于不同地域之间的消息同步,保证地域之间的数据一致性。消息队列RocketMQ版的全球消息路由功能依托阿里云优质基础设施实现的高速通道专线,可以高效地实现不同地域之间的消息同步复制。
1.2?消息收发模型
消息队列RocketMQ版支持发布和订阅模型,消息生产者应用创建Topic并将消息发送到Topic。消费者应用创建对Topic的订阅以便从其接收消息。通信可以是一对多(扇出)、多对一(扇入)和多对多。
?
- 生产者集群:用来表示发送消息应用,一个生产者集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。
一个生产者集群可以发送多个Topic消息。发送分布式事务消息时,如果生产者中途意外宕机,消息队列RocketMQ版服务端会主动回调生产者集群的任意一台机器来确认事务状态。 - 消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象。
一个消费者集群下的多个消费者以均摊方式消费消息。如果设置的是广播方式,那么这个消费者集群下的每个实例都消费全量数据。一个消费者集群对应一个Group ID,一个Group ID可以订阅多个Topic,如图中的Group 2所示。Group和Topic的订阅关系可以通过直接在程序中设置即可。
1.3?应用场景
- 削峰填谷
诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ版可提供削峰填谷的服务来解决该问题。 - 异步解耦
交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ版可实现异步通信和应用解耦,确保主站业务的连续性。 - 顺序收发
细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ版提供的顺序消息即保证消息FIFO。 - 分布式事务一致性
交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。 - 大数据分析
数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ版与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。 - 分布式缓存同步
天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ版构建分布式缓存,实时通知商品数据的变化。
1.4?架构设计
1.5?技术架构
- ?Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
- BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
????????1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。 ????????2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。 ????????3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。 ????????4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。 ????????5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
二 安装部署
使用 Docker 在 CentOS7 系统上安装单机版 RocketMQ,具体步骤见:https://blog.csdn.net/lovelichao12/article/details/118069080
三 Spring Boot2 集成实例
3.1 加入依赖
本文介绍Java SDK的收发消息教程。
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.7.Final</version>
</dependency>
3.2 application.properties 配置
# 生产者配置
rocketmq.producer.isOnOff=on
# 服务地址
rocketmq.producer.namesrvAddr=172.16.2.123:9876
# 消息最大长度 默认1024*4(4M)
rocketmq.producer.maxMessageSize=4096
# 发送消息超时时间,默认3000
rocketmq.producer.sendMsgTimeout=3000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2
# 消费者配置
rocketmq.consumer.isOnOff=on
# 服务地址
rocketmq.consumer.namesrvAddr=172.16.2.123:9876
# 接收该 Topic 下所有 Tag
rocketmq.consumer.topics=TestTopic~*;
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
# 设置一次消费消息的条数,默认为1条
rocketmq.consumer.consumeMessageBatchMaxSize=1
### 配置 Group Topic Tag
rocket.group=TestGroup
rocket.topic=TestTopic
rocket.tag=TestTag
3.3 初始化
生产者初始化
package com.modules.common.config;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Properties;
/**
* 生产者初始化
* lc
*/
@Slf4j
@Configuration
public class ProducerConfig {
@Value("${rocket.group}")
private String producerGroupName;
@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
@Value("36Rl3QPMNNXJifNC")
private String accessKey;
@Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
private String secretKey;
@Value("${rocketmq.producer.sendMsgTimeout}")
private String sendMsgTimeout;
private static Producer producer;
@PostConstruct
public void init() {
// producer 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的Producer ID
properties.setProperty(PropertyKeyConst.ProducerId,producerGroupName);
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, sendMsgTimeout);
// 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
producer = ONSFactory.createProducer(properties);
//在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
producer.start();
log.info("rocketmq生产者初始化成功");
}
/**
* 初始化生产者
* @return
*/
public Producer getProducer(){
return producer;
}
}
消费者初始化
package com.modules.common.config;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
import com.modules.rocketmq.RocketMqListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.UUID;
/**
* 消费者初始化
* lc
*/
@Slf4j
@Configuration
public class ConsumerConfig {
@Autowired
private RocketMqListener rocketMqListener;
@Value("${rocket.group}")
private String consumerGroupName;
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("36Rl3QPMNNXJifNC")
private String accessKey;
@Value("ENpAJPWOnKcSdKcXNkw5XVPGNMTYk0")
private String secretKey;
@Value("${rocketmq.producer.sendMsgTimeout}")
private String sendMsgTimeout;
@Value("${rocket.topic}")
private String topic;
@Value("${rocket.tag}")
private String tag;
private static Consumer consumer;
@PostConstruct()
public void init() {
// consumer 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的consumer ID
properties.setProperty(PropertyKeyConst.ConsumerId, consumerGroupName);
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, sendMsgTimeout);
// 设置 TCP 接入域名(此处以公共云生产环境为例),设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在左侧操作栏单击获取接入点获取
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
properties.setProperty(PropertyKeyConst.InstanceName, UUID.randomUUID().toString());
// 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)。
// 同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,
// 一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息
properties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 广播订阅方式设置。
// 同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,
// 一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
//properties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
// 客户端本地的最大缓存消息数据,默认值:1000,单位:条
properties.setProperty(PropertyKeyConst.MaxCachedMessageAmount, "1000");
// 客户端本地的最大缓存消息大小,取值范围:16 MB~2 GB,默认值:512 MB
properties.setProperty(PropertyKeyConst.MaxCachedMessageSizeInMiB, "2048");
// 设置消息消费失败的最大重试次数,默认值:16。
properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "3");
// 设置消费端线程数固定为20
properties.setProperty(PropertyKeyConst.ConsumeThreadNums,"20");
consumer = ONSFactory.createConsumer(properties);
//------------------------------订阅topic-------------------------------------------------
consumer.subscribe(topic, tag, rocketMqListener);//监听第一个topic,new对应的监听器
// 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
consumer.start();
log.info("rocketmq消费者初始化成功");
}
/**
* 初始化消费者
* @return
*/
public Consumer getconsumer(){
return consumer;
}
}
3.4 消息发送
package com.modules.service;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.CountDownLatch2;
import com.modules.common.config.ProducerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* 生产者 业务处理发送
* lc
*
*/
@Slf4j
@Service
public class ProducerService {
@Value("${rocket.topic}")
private String topic;
@Value("${rocket.tag}")
private String tag;
@Autowired
private ProducerConfig producer;
/**
* 同步发送
* 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
*/
public String syncSendMsg(String json){
Message msg = new Message(topic, tag, json.getBytes());
try {
SendResult result = producer.getProducer().send(msg);
if(result != null){
log.info("同步发送消息成功,topic:" + topic + " messageId : " + result.getMessageId());
} else {
log.info("同步发送消息成功失败!");;
}
return "同步发送消息成功";
} catch (Exception e) {
log.error("同步发送消息异常:" + e);
return "同步发送消息失败";
}
}
/**
* 异步发送
* 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
*/
public void asyncSendMsg(String json) {
Message msg = new Message(topic, tag, json.getBytes());
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
try {
// SendCallback接收异步返回结果的回调
producer.getProducer().sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
log.info("异步发送成功:" + sendResult.getTopic() + "-" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext onExceptionContext) {
countDownLatch.countDown();
log.info("异步发送失败:" + onExceptionContext);
}
});
} catch (Exception e) {
log.error("同步发送消息异常:" + e);
}
}
/**
* 单向发送
* 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
*/
public String onewaySendMsg(String json) {
Message msg = new Message(topic, tag, json.getBytes());
try {
producer.getProducer().sendOneway(msg);
return "单向发送消息成功";
} catch (Exception e) {
log.error("单向发送消息异常:" + e);
return "单向发送消息失败";
}
}
/**
* 延时发送
* 消费比存储时间晚10秒
*/
public void delaySendMsg(String json) {
Message msg = new Message(topic, tag, json.getBytes());
try {
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
msg.setStartDeliverTime(3);
producer.getProducer().send(msg);
} catch (Exception e) {
log.error("延时发送消息异常:" + e);
}
}
}
3.5 消息监听
package com.modules.rocketmq;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 自定义接收消息的监听类
*/
@Slf4j
@Service
public class RocketMqListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
byte[] body = message.getBody();
String messageBody = new String(body);// 获取到接收的消息,由于接收到的是byte数组,所以需要转换成字符串
log.info("监听接收到的信息: " + messageBody);
} catch (Exception e) {
log.error("监听接收信息异常:" + e);
}
// 如果想测试消息重投的功能,可以将Action.CommitMessage 替换成Action.ReconsumeLater
return Action.CommitMessage;
}
}
|