rocketMQ介绍
环境搭建
upzip rocketmq-all-4.9.2-bin-release.zip
编辑:vim runserver.sh
编辑:vim runbroker.sh
如果使用docker进行搭建的话 还需要要修改一个配置broker.conf
sh mqnamesrv
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
tail -f ~/logs/rocketmqlogs/broker.log
基本使用
- 环境搭建
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
- 基本使用
#发送
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("10.199.12.155:9876");
producer.start();
Message msg = new Message("topic1","hello rocketmq".getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
producer.shutdown();
}
#消费者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("10.199.12.155:9876");
consumer.subscribe("topic1","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt msg : list){
System.out.println("消息:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("接收消息服务已开启运行");
}
# 生产者 跟上面一样
# 消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
System.out.println(consumer.getInstanceName());
consumer.setNamesrvAddr("10.199.12.155:9876");
consumer.subscribe("topic1","*");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt msg : list){
System.out.println("消费者1:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("接收消息服务已开启运行");
广播模式的特点:
- 如果生产者先发送消息,后启动消费者,消息只能被消费一次。
- 如果多个消费者先启动(广播模式),后发消息,才有广播效果。
- 要想有广播模式,必须先启动消费者,然后在启动发送者。
# 同步消息发送/异步消息发送/单向消息
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("10.199.12.155:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
Message msg2 = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
public void onException(Throwable t) {
System.out.println(t);
}
});
Message msg3 = new Message("topic2",("单向消息:hello rocketmq "+i).getBytes("UTF-8"));
producer.sendOneway(msg);
}
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
# 延迟发送消息,比如取消订单的时候。
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("10.199.12.155:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
Message msg = new Message("topic3",("非延时消息:hello rocketmq "+i).getBytes("UTF-8"));
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
}
producer.shutdown();
}
List<Message> msgList = new ArrayList<Message>();
SendResult send = producer.send(msgList);
# 注意点:
消息的总长度不超过4M
消息内容长度包括如下:
topic(字符串字节数)
body(字节数组长度)
消息追加的属性(key与value对应字符串字节数)
日志(固定20字节)
#发送者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("10.199.12.155:9876");
producer.start();
Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
SendResult send = producer.send(msg);
System.out.println(send);
producer.shutdown();
}
#消费者
*代表任意tag
"tag1 || tag2" 代表两个 tag 那个都行
consumer.subscribe("topic6","tag1 || tag2");
默认情况下,MQ开启了多个队列,同时发送多个消息的话,发送给那个队列是不确定的,同时消息的消费者读取消息,每读取一个消息开启一个线程,也不能保证消息的顺序性。要想保证消息的有序性,需要指定消息的队列,同时消息的消费者应该一个队列开启一个线程进行接收而不是一个消息一个线程
#发送者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("10.199.12.155:9876");
producer.start();
List<Order> orderList = new ArrayList<Order>();
Order order11 = new Order();
order11.setId("a");
order11.setMsg("主单-1");
orderList.add(order11);
Order order12 = new Order();
order12.setId("a");
order12.setMsg("子单-2");
orderList.add(order12);
Order order13 = new Order();
order13.setId("a");
order13.setMsg("支付-3");
orderList.add(order13);
Order order14 = new Order();
order14.setId("a");
order14.setMsg("推送-4");
orderList.add(order14);
Order order21 = new Order();
order21.setId("b");
order21.setMsg("主单-1");
orderList.add(order21);
Order order22 = new Order();
order22.setId("b");
order22.setMsg("子单-2");
orderList.add(order22);
Order order31 = new Order();
order31.setId("c");
order31.setMsg("主单-1");
orderList.add(order31);
Order order32 = new Order();
order32.setId("c");
order32.setMsg("子单-2");
orderList.add(order32);
Order order33 = new Order();
order33.setId("c");
order33.setMsg("支付-3");
orderList.add(order33);
for(final Order order : orderList){
Message msg = new Message("orderTopic",order.toString().getBytes());
SendResult result = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
System.out.println(list.size());
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
producer.shutdown();
}
#接收者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("10.199.12.155:9876");
consumer.subscribe("orderTopic","*");
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt msg : list){
System.out.println(Thread.currentThread().getName()+" 消息:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("接收消息服务已开启运行");
}
- 事务消息
MQ的事务流程 MQ 的消息补偿过程(当本地代码执行失败时) 注意点 :事务消息仅与生产者有关系,与消费者无关。
#生产者代码
public static void main1(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("10.199.12.155:9876");
producer.setTransactionListener(new TransactionListener() {
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.COMMIT_MESSAGE;
}
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果:"+result);
producer.shutdown();
}
# 补偿代码
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("10.199.12.155:9876");
producer.setTransactionListener(new TransactionListener() {
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.UNKNOW;
}
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("事务补偿过程执行");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("topic11",("事务消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果:"+result);
}
- 不用数据库进行存储,而是使用的文件进行存储。
- 通过启动时初始化文件大小来保证,占用固定的磁盘空间,来保证磁盘读写速度
- 零拷贝技术,数据传输由传统的4次复制简化为3次复制,减少了一次复制的次数。
- java语言中使用MappedByteBuffer类实现了该技术
- 刷盘机制
同步刷盘:
1)生产者发送消息到MQ,MQ接到消息数据
2)MQ挂起生产者发送消息的线程
3)MQ将消息数据写入内存
4)内存数据写入硬盘
5)磁盘存储后返回SUCCESS
6)MQ恢复挂起的生产者线程
7)发送ACK到生产者
异步刷盘:
1)生产者发送消息到MQ,MQ接到消息数据
2)MQ将消息数据写入内存
3)发送ACK到生产者
--等消息量多了--
4)内存数据写入硬盘
优缺点对比:
同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)
配置方式
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
消息服务器:
主从架构(2M-2S) ,即使又一台服务器宕机, 服务依旧可以正常提供
注意: master 一旦宕机,slave 只提供消费服务,不能写入新的消息(slave 不会升级为master)
消息消费:
RocketMQ自身会根据master的压力确认是否由master承担消息读取的功能,当master繁忙时候,自动切换由slave承担数据读取的工作
- 主从数据复制
同步复制:
master接到消息后,先复制到slave,然后反馈给生产者写操作成功
优点:数据安全,不丢数据,出现故障容易恢复
缺点:影响数据吞吐量,整体性能低
异步复制:
master接到消息后,立即返回给生产者写操作成功,当消息达到一定量后再异步复制到slave
优点:数据吞吐量大,操作延迟低,性能高
缺点:数据不安全,会出现数据丢失的现象,一旦master出现故障,从上次数据同步到故障时间的数据将丢失
配置(配置在启动时 -c 指定的配置文件中 broker.conf):
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
- 负载均衡
Producer负载均衡:内部实现了不同broker集群中对同一topic对应消息队列的负载均衡
Consumer负载均衡:平均分配 循环平均分配
- 消息重试
顺序消息重试:
当消费者消费消息失败后,RocketMQ会自动进行消息重试(每次间隔时间为 1 秒)
注意:应用会出现消息消费被阻塞的情况,因此,要对顺序消息的消费情况进行监控,避免阻塞现象的发生
无序消息重试:
无序消息包括普通消息、定时消息、延时消息、事务消息
无序消息重试仅适用于负载均衡(集群)模型下的消息消费,不适用于广播模式下的消息消费
为保障无序消息的消费,MQ设定了合理的消息重试间隔时长
- 死信队列
概念:
当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message)
死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
特征:
- 归属某一个组(Gourp Id),而不归属Topic,也不归属消费者
- 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
- 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
- 不会被再次重复消费
- 死信队列中的消息有效期为3天,达到时限后将被清除
死信消息处理:
在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费
- 消息重复消费以及消息幂等性
重复消息原因:
1 生产者发送了重复的消息
网络闪断
生产者宕机
2 消息服务器投递了重复的消息
网络闪断
3 动态的负载均衡过程
网络闪断/抖动
broker重启
订阅方应用重启(消费者)
客户端扩容
客户端缩容
幂等性:
同一条消息只有一条。
解决方案:
使用业务id作为消息的key 消费的时候进行判断,未使用过则放过,使用过抛弃
rocketMQ与springboot整合
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
配置
rocketmq.name-server=192.168.31.81:9876
rocketmq.producer.group=tanhua
代码实现
@Autowired
private RocketMQTemplate rocketMQTemplate;
rocketMQTemplate.convertAndSend("tanhua-sso-login", msg);
@Service
@RocketMQMessageListener(consumerGroup = "tanhua",topic = "tanhua-sso-login")
public class loginService implements RocketMQListener {
@Override
public void onMessage(Object o) {
}
}
上文的 kafka介绍
|