消息队列经典场景
优点
- 异步
- 原来的下单场景只是用户支付即可结束,现在需要发送成功短信,给用户增加积分,订阅物流信息等等,这就使得用户的 下单时间大大加长,这样就可以使用消息队列,把各个动作发到消息队列,每个服务再去拉取消息队列中的东西进行处理.大大减少时间
- 解耦:增加积分,发送短信这些可以单独拆分出来,需要使用直接发送到知道的消息队列就行,你只需要关注你当前的业务
- 削峰: 如果使用线程池来解决,一个服务一个线程在高峰期你的mysql或者redis可能撑不住,使用mq就可以限制主机每次只拉取多少条进行处理
缺点
- 可用性降低
引入了mq,一旦mq宕机对业务有影响 - 复杂度提高
数据链路变得复杂,如何保证顺序性,不重复消费 - 一致性问题
用户支付了,增加积分出错该怎么处理
整体架构
- nameserver 相当于注册中心,连接从这里取ip
- broker 消息仓库,里面有topic与队列
- product,consumer生产者消费者
安装
- 基本的环境
yum install java-1.8.0-openjdk-devel.x86_64 wget vim unzip -y - 下载mq安装包
wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip - 解压缩
unzip rocketmq-all-4.7.1-bin-release.zip -d /usr/local/ - 启动nameserver服务
vim bin/runserver.sh - 默认堆初始化最大都是4g,新生代2g,测试机没这么内存,不修改无法启动,改为256m,新生代128m
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" - 后台启动
nohup bin/mqnamesrv > n1.out & - 启动broker服务
vim bin/runbroker.sh - 默认堆初始化最大都是8g,新生代4g,测试机没这么内存,不修改无法启动,改为512m,新生代256m
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m" - 暴露namserver地址
echo 'export NAMESRV_ADDR=localhost:9876' >> ~/.bash_profile - 后台启动
nohup bin/runbroker.sh >n2.out & - 日志验证
- n1.out
The Name Server boot success. serializeType=JSON - n2.out
The broker[localhost.localdomain, 192.168.147.134:10911] boot success. serializeType=JSON and name server is localhost:9876 - 发送接收测试
- 发送
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer - 接收
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer - 关闭
- 关闭nameserver服务
bin/mqshutdown namesrv - 关闭broker服务
bin/mqshutdown broker
集群
- 4种高可用集群
- 多master模式
- 优点:配置简单,性能最高
- 缺点:单个宕机,这台机器上违背消费的消息不可订阅
- 多master多salve 异步复制
- 优点:消息丢失少(异步复制),消息实时性不受到影响,master宕机可以从slave上消费,性能与多master基本一致
- 缺点:master宕机下会丢失少量消息
- 多master多salve 同步双写
- 优点:master宕机,消息无延迟,可用性高
- 缺点:性能有所丢失
- dledger模式:4.5版本之前采用master-slave架构部署但是master挂掉都slave无法自动晋升为master,这种模式可以将多个master-slave组成一个组,当组内master挂了将选举一个master继续服务
集群搭建
- 修改vim conf/2m-2s-async/broker-a.properties配置文件
#名字一样一个集群
brokerClusterName=DefaultCluster
#名字一样一个主从
brokerName=broker-a
# 0表示master >0标识slave
brokerId=0
# 删除文件时间
deleteWhen=04
# namesrv集群
namesrvAddr=work1:9876;work2:9876
# 默认创建队列数
defaultTopicQueueNums=4
# 自动创建队列
autoCreateTopicEnable=true
# 对外监听端口
listenPort=10911
#文件保留时间 默认48h
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#强制销毁文件间隔时间
#destroyMapedFileIntervalForcibly=120000
#重载文件时间
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq-all-4.7.1-bin-release/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq-all-4.7.1-bin-release/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq-all-4.7.1-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq-all-4.7.1-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-all-4.7.1-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq-all-4.7.1-bin-release/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
- 将broker-a.properties写入到broker-b-s.properties修改brokerName,brokerId,brokerRole和几个文件存储路径,同一台虚拟机注意端口号也需要修改
- 克隆当前虚拟机,修改broker-a-s.properties,broker-b.properties文件
- 修改host文件
vim /etc/hosts
192.168.147.134 work1
192.168.147.135 work2
- 启动两台nameserver
nohup bin/mqnamesrv >n1.out & - 启动broker,使用-c指定配置文件
nohup bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties >nb.out & - 关闭防火墙或者开放9876,两个broker服务的端口
firewall-cmd --zone=public --add-port=9876/tcp --add-port=10911/tcp --add-port=11011/tcp --permanent``firewall-cmd --reload - 四个broker服务都启动后验证集群
bin/mqadmin clusterList -n work1:9876
控制台搭建
- 项目地址rocketmq-dashboard
- 项目克隆
git clone https://github.com/apache/rocketmq-dashboard.git - 打开rocketmq-console导入idea,修改application.properties文件
rocketmq.config.namesrvAddr=work1:9876;work2:9876 以实际情况修改 - 打包项目上传jar包,启动
nohup java -jar rocketmq-console-ng-2.0.0.jar & - 打开浏览器访问当前服务器8080端口
dledger集群搭建
- 快速演示
- 在
bin/dledger/fast-try.sh 快速演示的脚本,但脚本给一个broker的内存是1g,虚拟机没有这么大修改一下 - 这里我修改为256m
function startNameserver() {
export JAVA_OPT_EXT=" -Xms256m -Xmx256m "
nohup bin/mqnamesrv &
}
function startBroker() {
export JAVA_OPT_EXT=" -Xms256m -Xmx256m "
conf_name=$1
nohup bin/mqbroker -c $conf_name &
}
- 启动
bin/dledger/fast-try.sh start - 查看集群情况
bin/mqadmin clusterList -n 127.0.0.1:9876 - 查询master节点进程号并把它kill,看slave是否能转为master
- 实际搭建
- 配置文件增加一下几条
#是否启动dledger
enableDLegerCommitLog=true
#组名与brokerName保持一致
dLegerGroup=broker-a
#当前组所有主机-专门监听端口号
dLegerPeers=n0-192.168.147.134:40911;n1-192.168.147.135:49011;n2-192.168.147.134:40912
#当前主机id
dLegerSelfId=n0
- 集群搭建成功
- 直接把135主机关机了
- 切换成功
基本概念
消息模型
producer生产消息,consumer消费消息,broker存储消息,每个broker对于一台服务器,每个broker可以存储多个opic消息,每个topic消息也可以分片存储于不同的broker上,message queue用于存储多个消息的物理地址,每个topic消息存储于多个message queue中
生产者
producer负责生产消费,将消费者消息发送到broker上,有多种发送方式:同步发送,异步发送,顺序发送,单向发送,同步与异步需要broker返回确认消息,单向发送不需要。同一类producer组成一个集合为生产组发送同一类消息且逻辑一致,如果有异常,broker服务器会联系同一生产者组提交或回滚
消费者
consumer消息者形式分为两种:
- 拉取式:主动式消费,消费者调用拉取的方法
- 推动式消费:broker有数据就会推给消费者
消费者组必须订阅同一个topic,消息模式两种: - 集群消费模式:平摊消费
- 广播消费模式:共享消费
主题
每个topic若干个消息,每个消息只能有一个主题,同一个topic下的数据分片保存到不同的broker,每个分片单位是messageQueue
代理服务器
- 几个模块
- remoting module:处理来自clients的请求
- client manager:负责管理客户端和维护消费者的topic订阅信息
- store service:处理消息的存储查询功能
- ha service:高可用服务,负责master与slave的数据同步
- index service:索引服务,以提高查询
- 普通集群
- 每个节点固定角色,master负责响应客户端请求并存储消息,slave负责同步数据并响应客户端部分读请求
- dledger高可用集群
- dledger
- 接管broker的commitlog消息存储
- 选举leader节点
- 完成消息同步
- 多副本消息同步
leader收到消息会将消息标记为uncommited状态,发给follower,follower收到消息后需要给leader返回一个ack,如果有超过半数的follower返回ack就会把消息改为commited状态,发给follower - leader选举机制
- 每个节点有三个状态,leader,follower,candidate(候选人)
- 每个时间点叫做term
- 集群启动时,每个节点都是follower,集群内部发送一个timeout信号,follower转为候选人,发起投票后收到半数以上的投票晋升为leader,
- 选举过程,集群启动,三个节点都是follower,三个节点都给自己投票,term都是1,三个节点随机休眠,a启动term加一为2,第二个节点醒来,发现a的term比自己大,承认a是leader,c同理
名字服务器
充当路由消息的提供者,broker会在启动时向nameserver注册自己的服务信息,后续通过心跳维护当前服务的可用性,生产者或消费者通过名字服务查找各主题消息相应的broker ip列表
消息
每个消息都必须拥有一个topic,每个消息拥有唯一的message id,且可以携带业务标识key, 可以为消息设置一个tag标签
消息存储
消息存储
- 时间
- mq收到消息标记为uncommit状态发给follower,follower收到消息,发给leader一个ack,超过半数follower返回ack,消息改为commit状态,存储,状态同步给follower
- mqpush消息给消费者,等待消费者ack响应,标记为已消费,没有标记消息会重复推送
- mq会定期删除一些过期的消息
- 存储介质:磁盘文件(采用顺序读写,保证存储的速度,采用mmap的方式,省去上下文切换,提高速度)
消息存储结构
- commitlog:存储消息元数据,每个文件1个G
- consumerQueue:消息队列,保存commitlog的索引
- indexFile:提供通过key或时间来查询消息的方法
刷盘机制
- 同步刷盘:消息写入机器的内存时,通知刷盘线程刷盘,等待刷盘线程写入完成后唤醒线程,返回写入完成
- 异步刷盘:消息写入内存后,返回写入完成,当内存累计到一定程度是统一触发刷盘操作
主从复制
- 同步复制:生产者发送消息,只有master与slave(半数slave)写入成功才反馈生产者写入成功
- 异步复制:生产者发送消息,只要master写入消息成功,就反馈生产者写入成功,再异步将消息同步到slave
负载均衡
- 生产者负载均衡:
- 生产者发送消息时,获取当前topic下所有broker集合,采用取模递增算法将消息往不同的broker上发送
- 消费者负载均衡
- 集群模式:六种分配算法
- AllocateMachineRoomNearby:同机房的消费者与broker分配一起
- AllocateMessageQueueAveragely:平均分配,将所有消息队列平均分配给消费者,先算数后分配
- AllocateMessageQueueAveragelyByCircle:先轮流给消费者分配一个队列,后面再增加
- AllocateMessageQueueByConfig:直接指定所有队列
- AllocateMessageQueueByMachineRoom:按逻辑机房进行分配
- AllocateMessageQueueConsistentHash:
- 广播模式:每个消费者分配所有的队列
消息重试
广播模式下不存在消息重试,会直接消费下一条
- 如何重试
消息监听器中配置
- 返回Action.ReconsumeLater
- 返回null
- 抛出异常
不重试返回Action.CommitMessage - 重试处理
重试的消息会进入“%RETRY%”+ConsumeGroup队列,最多16次,16次后会进入死信队列,可配置例如20次,16次后酶促间隔2h 16次每次间隔10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h - messageId
老版本中,无论重试多少次messageId是相同的,4.7.1中每次重试messageId会重建 - 配置覆盖
最大重试次数对同一个消费组实例有效,最后启动的消费者会覆盖之前的配置
死信队列
- 一个死信队列对于一个消费组,而不是一个消费者
- 一个消费组不需要死信队列是不会创建死信队列的
- 一个死信队列包含这个消费组所有无法消费的消息,不区分主题
- 消息无法再被消费者正常消费
- 默认存储3天,不管是否消费被删除
- 默认死信队列中的消息无法读取,需要将权限配置为6
消息幂等
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。支付时重复提交了多次但最后还是只支付了一次的钱
- 三种实现语义
- at most once:每条消息最多消费一次
- at least once:每条消息至少消费一次
- exactly one:确定消费一次
rocketmq支持at least once语义 - 消息重复情况
- 发送重复:消息发送到服务端并且持久化了,网络断开或者宕机了,生产者判断发送失败了会造次发送
- 投递重复:消费者收到消息并完成业务处理了,准备发送消息接收时宕机了,服务端在恢复后会再次发送一遍这个消息
- 负载均衡时消息重复:broker服务重启,扩容,缩容会触发rebalance造成消费者收到重复的消息
- 解决:
- 业务唯一标识:例如订单号
- 利用数据库唯一索引或主键索引
- 利用redis判断
实操
dledger模式不支持批量发送/升级v4.8+
基础消息
发送者
package cn.jaminye.sample.producer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer product = new DefaultMQProducer("product");
product.setNamesrvAddr("192.168.147.134:9876");
product.start();
Message message = new Message("java-topic", "hello-world".getBytes());
SendResult sendResult = product.send(message);
product.shutdown();
}
}
消费者
package cn.jaminye.sample.consumer;
/**
* 拉
*
* @author Jamin
* @date 2021/8/15 14:48
*/
public class Consumer {
/**
* 拉模式
*
* @param args
* @author Jamin
* @date 2021/8/16 10:13
*/
// public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// /**
// * 拉模式,已弃用方式
// */
// /*DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pull-group");
// consumer.setNamesrvAddr("192.168.147.134:9876");
// consumer.start();
// MessageQueue messageQueue = new MessageQueue();
// messageQueue.setQueueId(2);
// messageQueue.setBrokerName("broker-a");
// messageQueue.setTopic("java-topic");
// PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, 0, 2);
// pullResult.getMsgFoundList().forEach(System.out::println);
// consumer.shutdown();*/
// /**
// * 现用
// */
// DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("pull-group");
// consumer.setNamesrvAddr("192.168.147.134:9876");
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// consumer.subscribe("java-topic", "*");
// consumer.start();
// List<MessageExt> messageExtList = consumer.poll();
// messageExtList.forEach(System.out::println);
// consumer.shutdown();
// }
/**
* 推模式
*
* @param args
* @author Jamin
* @date 2021/8/16 10:13
*/
/*public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pull-group");
consumer.setNamesrvAddr("192.168.147.134:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("java-topic", "*");
//负载
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}*/
}
顺序消息
生产者
package cn.jaminye.order.product;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @author Jamin
* @date 2021/8/16 10:24
*/
public class Product {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("order-group");
producer.setNamesrvAddr("192.168.147.134:9876");
producer.start();
String[] strings = {"下单", "付款", "生成订单"};
for (int i = 0; i < 100; i++) {
for (int j = 0; j < 3; j++) {
String s = "订单__" + i + "___" + strings[j];
Message message = new Message("order-topic", s.getBytes(RemotingHelper.DEFAULT_CHARSET));
//根据id取模入队列使分类消息进一个队列
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
int index = ((Integer) o) % list.size();
return list.get(index);
}
}, i);
}
}
producer.shutdown();
}
}
消费者
package cn.jaminye.order.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @author Jamin
* @date 2021/8/16 10:36
*/
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer");
//消费组订阅的消息未过期从头开始,已过期从当前开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("order-topic", "*");
consumer.setNamesrvAddr("192.168.147.134:9876");
//顺序取
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
list.stream().map(messageExt -> new String(messageExt.getBody())).forEach(System.out::println);
return ConsumeOrderlyStatus.SUCCESS;
}
});
/*consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.stream().map(messageExt -> new String(messageExt.getBody())).forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});*/
consumer.start();
}
}
广播消息
consumer.setMessageModel(MessageModel.BROADCASTING);
延迟消息
//1-18 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
message1.setDelayTimeLevel(3);
批量消息
/**
* 批量发送 topic必须相同 官方示例中小于1m 不能是延迟,事务消息
*/
Message message1 = new Message("batch-topic2", "hello-world1".getBytes());
Message message2 = new Message("batch-topic2", "hello-world2".getBytes());
Message message3 = new Message("batch-topic2", "hello-world3".getBytes());
List<Message> messages = new ArrayList<>(8);
messages.add(message1);
messages.add(message2);
messages.add(message3);
SendResult sendResult = product.send(messages);
过滤消息
- 表达式过滤
consumer.subscribe(“filter-topic”, “TAG1 || TAG2”); - sql过滤
- 需要配置
enablePropertyFilter=true message1.putUserProperty("a", "1"); consumer.subscribe("filter-topic", MessageSelector.bySql("TAGS IN ('TAG1','TAG2') AND a between 0 and 1 ")); - 基本语法
>,<,>=,between,in,and,or,not 等
事务消息
- 代码
//组名不能与其他组名相同
TransactionMQProducer transactionGroupProducer = new TransactionMQProducer("transactionGroup");
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
transactionGroupProducer.setExecutorService(executorService);
transactionGroupProducer.setNamesrvAddr("192.168.147.134:9876");
TransactionListenerImpl transactionListener = new TransactionListenerImpl();
transactionGroupProducer.setTransactionListener(transactionListener);
transactionGroupProducer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("transaction-topic", String.valueOf(i).getBytes());
message.putUserProperty("name", String.valueOf(i));
TransactionSendResult transactionSendResult = transactionGroupProducer.sendMessageInTransaction(message, null);
System.out.println(transactionSendResult.getSendStatus());
}
}
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
//开启事务
// do something
if ("1".equals(msg.getProperty("name"))) {
System.out.println("unknow");
return LocalTransactionState.UNKNOW;
}
System.out.println("success");
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception ex) {
System.out.println("回滚事务");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("进入check");
// do something query db
return true ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
}
}
- 流程
- 发送消息到服务端,这个消息暂存在服务端,不会被消费者读取到
- 持久化成功后会返回生产者一个ack,确认消息是否成功
- 成功回调执行executeLocalTransaction方法,执行本地事务,持久化到数据库类的操作,这块的回滚自行处理,最终返回本地事务的执行结果
- 根据返回结果进行操作,commit的话会将当前消息移动到实际的topic下,回滚就删除消息
- 如果本地事务返回unknown,服务端会定时调用checkLocalTransaction方法进行查询,最多15次
- 根据checkLocalTransaction方法进行执行回滚或者提交
acl 权限控制
- 开启权限控制
aclEnable=true
- 配置文件
#全局白名单
globalWhiteRemoteAddresses:
#- 192.168.147.*
accounts:
- accessKey: RocketMQ
secretKey: 12345678
#白名单地址
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
#针对每个主题
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
- java-topic=DENY
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
- product2=DENY
- accessKey: rocketmq2
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# if it is admin, it could access all resources
admin: true
- 代码
DefaultMQProducer product = new DefaultMQProducer("product2", new AclClientRPCHook(new SessionCredentials("RocketMQ", "12345678")));
product.setNamesrvAddr("192.168.147.134:9876");
product.start();
Message message = new Message("java-topic", "hello-world".getBytes());
SendResult sendResult = product.send(message);
System.out.println(sendResult);
product.shutdown();
springboot整合rocketmq
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
配置
rocketmq.name-server=192.168.147.134:9876
rocketmq.consumer.group=springboot-group
普通消息发送
@Component
public class SpringProducer {
@Resource
RocketMQTemplate mqTemplate;
public void sendMessage() {
Message<String> message = MessageBuilder.withPayload("12345").build();
//topic:tag
mqTemplate.syncSend("topic-1" + ":" + "TAG1", message, 100000);
mqTemplate.syncSend("topic-1" + ":" + "TAG2", message, 100000);
}
}
@Component
// selectorType 过滤使用tag还是sql selectorExpression tag或者sql consumeMode顺序还是正常的 messageModel广播还是集群
@RocketMQMessageListener(topic = "topic-1", consumerGroup = "springboot-group", selectorType = SelectorType.SQL92, selectorExpression = "TAGS='TAG1'", consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println(s);
}
}
事务消息
@Component
public class Producer {
@Resource
RocketMQTemplate rocketMQTemplate;
public void sendMessage() {
Message<String> message1 =
MessageBuilder.withPayload("123").setHeader(RocketMQHeaders.TRANSACTION_ID, "1").setHeader(RocketMQHeaders.TOPIC, "123")
.setHeader(RocketMQHeaders.TAGS, "1231").setHeader("a", 1).build();
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("springboot-producer1:TAG1", message1, null);
System.out.println(transactionSendResult);
}
}
@RocketMQTransactionListener
public class Listener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("message===============" + message);
// 获取时添加前缀RocketMQHeaders.PREFIX
String tags = message.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS, String.class);
System.out.println("id==================" + tags);
System.out.println("UNKNOWN==================");
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("message===============" + message.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
}
总结
- 使用RocketMQTemplate进行发送消息,相关属性都以rocketmq_开头
- topic:tags
源码阅读
环境搭建
- 源码地址 源码地址 使用4.7.1版本源码
- 在项目根目录下创建conf文件夹,复制distribution下broker.conf,logback_broker.xml,logback_nameserv.xml三个文件到conf下
- 在本机添加环境变量ROCKETMQ_HOME指向项目根目录
- 启动nameser
- 修改conf目录下的broker.conf 添加namesrvAddr,storePathRootDir,storePathRootDir,storePathCommitLog,storePathConsumeQueue,storePathIndex,storeCheckpoint,abortFile等参数具体可参考上方配置
- 启动broker 配置启动参数-c broker.conf文件地址
namesever
- 配置信息:创建nameseverconfig与nettyserverconfig
- 初始化,启动,监听9876端口,提供给客户端拉取路由信息
- 创建处理请求的线程与定时扫描的线程(10s扫描一次,判断最后最后更新时间+2分钟,超出会删除这个broker并关闭连接)
broker
- 启动了很多组件
- 注册到nameserver,每30s(可以配置修改但最长为60s)发送一次心跳
producer
- DefaultMQProducerImpl:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
- 判断组名是否符合规定
- 启动各种定时任务,缓存nameserver上所有的主题,与broker建立心跳
- 发送消息采用索引自增取模的方式进行
文件存储
- org.apache.rocketmq.store.DefaultMessageStore#putMessage
- 使用零拷贝追加到commitlog,同步或异步刷盘,主从同步
- 定时任务:每10s启动启动一次,
|