架构图
安装
https://www.jianshu.com/p/0e7dad0bed0e
两种方式:
1. 下载源码,maven打包后,进行安装
2. 直接下载二进制码,减压后就行
/usr/local/rocketmq
/root/logs/rocketmq
rocke有9876
非vip通道端口:10911
vip通道端口:10909
10909是VIP通道对应的端口,在JAVA中的消费者对象或者是生产者对象中关闭VIP通道即可无需开放10909端口
如果是broker集群的话,还要开放10912,否则master的消息将无法复制到slave节点
集成
https://www.cnblogs.com/qdhxhz/p/11109696.html
两种方式:
1.引用rocketmq-client,自己进行封装
2.下载rocketmq-sprintboot集成代码,因为没有上传中央仓库,需要自己install,才可以引用
坑
参考:
https://www.jianshu.com/p/12aa00215dd6
https://blog.csdn.net/qq_16241519/article/details/103926356
https://www.cnblogs.com/smail-bao/p/6905460.html?utm_source=itdadao&utm_medium=referral
RemotingTooMuchRequestException: sendDefaultImpl call timeout
namesrvAddr = 192.168.118.162:9876
brokerIP1 = 192.168.118.162
nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.properties &
三种发送模式
? 同步发送 sync
rocketMQTemplate.syncSend("topic-name", "send sync message !");
? 异步发送 async
发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。
public void async() {
rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("send successful");
}
@Override
public void onException(Throwable throwable) {
log.info("send fail; {}", throwable.getMessage());
}
});
}
? 直接发送 one-way
rocketMQTemplate.sendOneWay("topic-name", "send one-way message");
发送端发送完消息后会立即返回,不会等待来自broker的ack来告知本次消息发送是否完全完成发送。这种方式吞吐量很大,但是存在消息丢失的风险,所以其适用于不重要的消息发送,比如日志收集。
1. 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
2. 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
3. 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;
延迟消息
RocketMQ 目前只支持固定精度的定时消息。
延迟级别(18个等级)
1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h Message message = MessageBuilder.withPayload(msg).build();
rocketMQTemplate.syncSend(topic, message,1000,2);//表示延时5秒
去重
1. 幂等性
2. 保存已处理的信息,新消息进行判重
3. 如果消息有顺序性,则可以添加时间戳,客户端维护一个最后处理的时间戳,如果是之前消息则抛弃,否则处理更新
幂等性
业务 关联
查询:天然的幂等性
删除:删除多次也是删除成功
添加:
1. 业务主键 的分析
2. DB 对此表创建唯一索引
3. 插入的时候,先判断 ,然后在插入
- 查询的时候用分布式锁子
- 更新的时候查询
insert into sys_user(name,password) select 'admin', '123456' from dual where not exists(select 1 from sys_user where name='admin');
更新:
1. 乐观锁,添加一个version
- 查询出的version=1,update的时候 需要将version添加到where version=1为条件
update sys_user set age=age+1, version=version+1 where version=xx
这个版本号也可以使用更新时间来判断。
2. 更新前查询
- 分布式锁
- 悲观锁,select for update
- 获取数据,判断是否修改
备注:insert,update 及时没有 业务唯一序列号,也需要手动添加一个。
拉取消息
推拉是站在MQ端。
push 推,MQ给客户端推送
pull 拉,客户端主动定时去轮训MQ,RocketMQMessageListener就是一种拉模式
实际没有推模式,push是通过pull来实现。
顺序消息
保证发送的是同一个队列。
1. 所有消息都放在同一个队列
2. 将一个订单的消息放在同一个队列(√)
发送在同一个消息队列中,根据先到先出的原则,消息就会按照发送的顺序挨个消费。及时是多个消费者也是挨个消费。
这是默认的消息队列选择器:
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
事务消息
? 设置事务监听
package com.xxx.listener;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "rocket")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("本地事务和消息发送:" + JSON.toJSONString(message));
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("回查信息:" + JSON.toJSONString(message));
return RocketMQLocalTransactionState.COMMIT;
}
}
? 发送消息
package com.xxx;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@Slf4j
@SpringBootApplication
public class SpringBootRocketMqApplication {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(SpringBootRocketMqApplication.class, args);
RocketMQTemplate template = context.getBean(RocketMQTemplate.class);
while (true) {
String msg = "demo msg test";
log.info("开始发送消息:"+msg);
Message message = MessageBuilder.withPayload(msg).build();
TransactionSendResult result = template.sendMessageInTransaction("rocket", "ts", message, null);
log.info("消息发送响应信息:"+result.toString());
Thread.sleep(10);
}
}
}
广播消息
发送消息不变,消费端设置消息类型(默认负载均衡)
@RocketMQMessageListener(messageModel=MessageModel.BROADCASTING)
end
|