概念模型
?
- Producer:消息发布的角色(消息生产者),支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色(消息消费者),支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。
- 提供心跳检测机制,检查Broker是否还存活;
- 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
- BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
- Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
- Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
文档:rocketmq/docs/cn at master · apache/rocketmq · GitHub
一、添加环境变量
- 下载 地址Apache Downloads
- 解压
- 配置环境变量
ROCKETMQ_HOME="D:\rocketmq" //RocketMQ安装目录
NAMESRV_ADDR="localhost:9876" //配置RocketMQ本地地址
或者只是在打开的 powershell 中,键入所需的环境变量。
$Env:ROCKETMQ_HOME="D:\rocketmq"
$Env:NAMESRV_ADDR="localhost:9876"
如果你选择powershell方式。您应该为每个新打开的 powershell 窗口执行此操作。
?启动名称服务器:.\bin\mqnamesrv.cmd
.\bin\mqnamesrv.cmd
?启动成功如图
启动Broker?:.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
?启动成功如图
?二、使用总结
简单发送接收消息示例:
1、创建maven quickstart项目
2、添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
3、消息生产者(发送)
package com.wnxy.rocletmq.test;
import com.wnxy.rocletmq.commons.AppConstants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.Scanner;
/**
* 消息生产者
*/
public class Sender {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("SenderTest");
producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
//启动
producer.start();
//发送消息
Scanner scanner = new Scanner(System.in);
while (true){
System.out.println("请输入发送消息");
String sendMessage = scanner.next();
if (sendMessage.equals("exit")) {
//关闭
producer.shutdown();
}
Message mesg = new Message(AppConstants.SMS_TOPIC, "register", sendMessage.getBytes("UTF-8"));
SendResult sendResult = producer.send(mesg);
System.out.println("发送内容:"+sendMessage);
System.out.println("sendResult:"+sendResult);
}
}
}
消息消费者(接收)
package com.wnxy.rocletmq.test;
import com.wnxy.rocletmq.commons.AppConstants;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
/**
* 消息消费者
*/
public class Receiver {
public static void main(String[] args) throws MQClientException {
//(1)创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ReceiveTest");
consumer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
//(2)订阅指定实例接收特定消息
consumer.subscribe(AppConstants.SMS_TOPIC,"*");
//(3)注册一个监听器监听消息
/**
* msgs 消息列表
* context 消息上下文
*/
consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {
for (MessageExt msg : msgs) {
try {
System.out.println("接收消息内容:"+new String(msg.getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// (4)启动消费者实例
consumer.start();
}
}
?发送的三种模式:
1、同步发送消息(producer发送MQ响应信息)
同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。类似于点对点模式
- 应用场景:此种方式应用场景非常广泛,例如重要通知邮件、短信通知等。
2、异步发送消息
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
3、单向发送消息
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
- 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
总结:
发送方式 | 发送TPS | 发送结果反馈 | 可靠性 | 同步发送 | 快 | 有结果反馈 | 消息不丢失 | 异步发送 | 更快 | 有结果反馈 | 消息不丢失 | 单向发送 | 最快 | 无结果反馈 | 消息可能丢失 |
消息模式
两种模式,一种集群模式,一种群发模式
1、集群模式(点对点):
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
?2、群发模式(广播模式):
同一个ConsumerGroup 里的Consumer 都消费订阅topic 全部信息,也就是一条消息会被每一个Consumer消费(每个消息可以有多个消费者)
//在BROADCASTING模式下,需要设置实例名
consumer.setInstanceName(name);
//设置消息模型
//consumer.setMessageModel(MessageModel.CLUSTERING); //集群模式
consumer.setMessageModel(MessageModel.BROADCASTING); //群发模式(广播)
消息类型
分类:普通消息、顺序消息、延时消息、事务消息
顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析:在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的
延时消息
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
producer.setRetryTimesWhenSendAsyncFailed(0); //异步发送失败后的重试次数
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
// org/apache/rocketmq/store/config/MessageStoreConfig.java
//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
mesg.setDelayTimeLevel(3); //等级3对应10s
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关
事务消息
MQ与DB之间的数据一致性问题
当DB与MQ需要同时成功或者失败时,无法直接实现。如果DB成功提交事务,MQ失败,DB难以回滚;如果MQ发送成功,但是DB写入失败,MQ无法回滚;
交互流程
名词解释:
半事务消息:是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer对消息的二次确认后,Consumer才能去消费它。 消息回查:由于网络闪段,生产者应用重启等原因。导致 Producer 端一直没有对 Half Message(半消息) 进行 二次确认。这是Brock服务器会定时扫描长期处于半消息的消息,会主动询问 Producer端 该消息的最终状态(Commit或者Rollback),该消息即为 消息回查。
事务消息发送步骤如下:
- 发送方将半事务消息发送至消息队列RocketMQ版服务端。
- 消息队列RocketMQ版服务端将消息持久化成功之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit或是Rollback),服务端收到Commit状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到Rollback状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤如下:
- 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对消息发送方即生产者集群中任意一生产者实例发起消息回查。
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
?发送事务消息必须要实现回查 Check 机制: ????????当发送端发送半事务消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方 Check 该 Half 状态消息,并上报其最终状态。
在事务消息的 Check 方法中,需要完成两件事情:
- 检查该半事务消息对应的本地事务的状态(committed or rollback)。
- 向 Broker 提交该半事务消息本地事务的状态。
事务消息代码示例(只是简单模拟):
发送者
package com.wnxy.rocletmq.transaction;
import com.wnxy.rocletmq.commons.AppConstants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 事务消息生产者
*/
public class TransactionSender {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
TransactionMQProducer producer = new TransactionMQProducer("SenderTest");
producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);
//设置监听器
producer.setTransactionListener(new MyTxListener());
//设置线程池,执行异步的消息检查
producer.setExecutorService(new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000)));
//启动
producer.start();
//发送事务消息
Scanner scanner = new Scanner(System.in);
while (true){
System.out.println("请输入发送消息");
String sendMessage = scanner.next();
if (sendMessage.equals("exit")) {
//关闭
producer.shutdown();
System.out.println("发送者关闭");
}
for (int i = 0; i < 10; i++) {
Message mesg = new Message(AppConstants.SMS_TOPIC, "register","key"+i, sendMessage.getBytes("UTF-8"));
TransactionSendResult sendResult = producer.sendMessageInTransaction(mesg, null);
System.out.println("sendResult结果:"+sendResult);
//等一会儿再发下一条
Thread.sleep(10);
}
//事务监听器是异步执行的,所以需要等
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
}
}
}
?事务监听器:
package com.wnxy.rocletmq.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 事务监听器
*/
public class MyTxListener implements TransactionListener {
//临时存储事务状态,真实情况存入数据库
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 在半消息发送成功后,执行本地事务
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
//执行本地事务
//存储事务执行结果
int transactionValue = transactionIndex.incrementAndGet();
//0 1 2分别代表 提交 回滚 未知
localTrans.put(message.getTransactionId(),transactionValue);
System.out.println("本地事务结束");
return LocalTransactionState.UNKNOW;
}
/**
* 当事务消息没有确认commit或者rollback时进行消息回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("消息回查:"+messageExt.getTransactionId());
Integer status = localTrans.get(messageExt.getTransactionId());
switch (status){
case 0: //提交
return LocalTransactionState.UNKNOW;
case 1: //回滚
return LocalTransactionState.COMMIT_MESSAGE;
case 2: //未响应(未知)
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
|