上一篇讲的都是基础运用,今天来学习一下高级应用部分。
先提出一个面试题
面试1:消息中间件的优点?
答:解耦、流量削峰、异步。
解耦:可以进行分布式分解,用消息中间件作为各个微服务消息传递的桥梁。
异步:消息中间件是多线程的,可以使原本串行的代码变为并行,使请求被响应的时间成倍的缩减。
流量削峰:当请求大量涌入的时候,可以先保存在消息队列中,等待服务器依次处理,避免服务器崩溃。
RabbitMQ整合SpringBoot
每次到整合SpringBoot的时候我都很开心,哈哈
注意:交换机及队列一旦创建后,重新加载配置也不会修改,只会报错,所以创建交换机或队列时需要深思熟虑,如果必须要换,请直接选择新增队列,修改生产者投递消息的队列,不要删除原队列。
RabbitMQ整合fanout类型生产者
1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.编写配置文件
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
# 单机方式连接
host: 127.0.0.1
port: 5672
# 集群方式连接
addresses: 127.0.0.1:5672
3.编写rabbitMQ配置class文件
package com.haier.stock.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author ME
* @date 2022/2/4 11:15
*/
@Configuration
public class RabbitMQConfig {
// 声明注册fanout模式的交换机
// 返回值确定了交换机类型
@Bean
public FanoutExchange fanoutExchange() {
// 参数:1.交换机名称,2.是否持久化,3.是否自动删除
return new FanoutExchange("交换机名称", true, false);
}
// 声明队列
@Bean
public Queue fanoutQueue() {
// 参数:1.队列名称,2.是否持久化
return new Queue("队列名称", true);
}
// 完成交换机与队列绑定
@Bean
public Binding fanoutBinding() {
// 将fanoutQueue队列绑定到fanoutExchange交换机
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
//
}
4.在Service中进行调用
package com.haier.stock.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @author ME
* @date 2022/2/4 11:25
*/
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void orderSend() {
// 调用rabbitTemplate方法进行消息发送
rabbitTemplate.convertAndSend("交换机名称", "路由 key", "消息内容");
}
}
RabbitMQ整合fanout类型消费者
package com.haier.stock.config;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @author ME
* @date 2022/2/4 14:23
*/
@RabbitListener(queues = {"监听的队列名称"})
// 需要将类交给Spring管理,不然接收不到消息,可用@Service或@Component
@Service
public class RabbitMQConsumerConfig {
@RabbitHandler
public void getMessage(String message) {
System.out.println("接收到了消息" + message);
}
}
RabbitMQ整合direct类型生产者
与fanout类型生产者类似,只有配置class文件有区别
package com.haier.stock.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author ME
* @date 2022/2/4 11:15
*/
@Configuration
public class RabbitMQConfig {
// 声明注册direct模式的交换机
// 返回值确定了交换机类型
@Bean
public DirectExchange directExchange() {
// 参数:1.交换机名称,2.是否持久化,3.是否自动删除
return new DirectExchange("交换机名称", true, false);
}
// 声明队列
@Bean
public Queue directQueue() {
// 参数:1.队列名称,2.是否持久化
return new Queue("队列名称", true);
}
// 完成交换机与队列绑定
@Bean
public Binding directBinding() {
// 将directQueue队列绑定到directExchange交换机
// direct模式需要绑定路由key
return BindingBuilder.bind(directQueue()).to(directExchange()).with("路由key");
}
//
}
RabbitMQ的创建交换机、队列可以在生成者中创建,也可以消费者中创建,但是消费者消费时必须能够找到对应的交换机,所以在消费者中创建较为合适。
RabbitMQ整合topic类型,注解方式声明交换机及队列
package com.haier.stock.config;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @author ME
* @date 2022/2/4 14:23
*/
// @QueueBinding注解将队列与交换机进行绑定
// value = @Queue(value = "队列名称", durable = "是否持久化", autoDelete = "是否自动删除")注解声明队列
// exchange = @Exchange(value = "交换机名称", type = ExchangeTypes.TOPIC)注解声明交换机
// key表示路由Key
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "队列名称", durable = "是否持久化", autoDelete = "是否自动删除"),
exchange = @Exchange(value = "交换机名称", type = ExchangeTypes.TOPIC),
key = "")
)
// 需要将类交给Spring管理,不然接收不到消息,可用@Service或@Component
@Service
public class RabbitMQConsumerConfig {
@RabbitHandler
public void getMessage(String message) {
System.out.println("接收到了消息" + message);
}
}
RabbitMQTTL队列过期时间
过期时间TTL表示可以对消息设置预期的时间,在时间内可以被消费者获取,超过时间则自动删除。
那么该如何指定队列中消息过期时间呢?代码如下
// 声明队列
@Bean
public Queue directQueue() {
// 参数:1.队列名称,2.是否持久化,3.是否排他性,4.是否自动删除,5.携带参数
Map<String, Object> args = new HashMap<>(16);
// 放入ttl键值对,指定过期时间,int类型,单位为毫秒,1000毫秒 = 1秒
args.put("x-message-ttl", 5000);
return new Queue("队列名称", true, false, false, args);
}
如果您觉得队列中消息全部过期影响范围太大了,就想配置单条消息的过期时间也可以,代码如下
package com.haier.stock.service;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
/**
* @author ME
* @date 2022/2/4 11:25
*/
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void orderSend() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 配置此条消息过期时间为5000毫秒,1000毫秒 = 1秒
message.getMessageProperties().setExpiration("5000");
// 配置消息所用字符集
message.getMessageProperties().setContentEncoding(StandardCharsets.UTF_8.toString());
return message;
}
};
// 调用rabbitTemplate方法进行消息发送
rabbitTemplate.convertAndSend("交换机名称", "路由 key", "消息内容", messagePostProcessor);
}
}
那么问题来了,如果一个队列设置了过期时间,且单条消息也设置了过期时间,那么以哪个时间为准呢?
答案是以时间短的为准,两者的重叠部分最小时间就为此消息的过期时间。
两者区别在于,给队列设置了过期时间,我们可以将过期的消息放入死信队列中,但是给消息设置了过期时间,那就是真的过期删除掉了。
RabbitMQ死信队列
DLX,全称为(Dead-Letter-Exchange)死信交换机。当消息在一个队列中变成死亡信息,无法被消费的时候,他能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就被称为死信队列。
消息变为死亡信息可能因为:
1.消息被拒绝
2.消息过期
3.队列达到最大长度
DLX也是一个普通的交换机,只是当我们队列中出现死亡信息后,rabbitMQ会自动将死亡信息发送到另一个交换机的队列中,即死信队列。
使用死信队列只需要在定义队列的时候设置队列参数"x-dead-letter-exchange"指定交换机即可,与TTL设置过期时间方式相同。
// 声明队列
@Bean
public Queue directQueue() {
// 参数:1.队列名称,2.是否持久化,3.是否排他性,4.是否自动删除,5.携带参数
Map<String, Object> args = new HashMap<>(16);
// 放入ttl键值对,指定过期时间,int类型,单位为毫秒,1000毫秒 = 1秒
args.put("x-message-ttl", 5000);
// 配置队列的最大长度,int类型,超过此长度则自动放入死信队列
args.put("x-max-length", 5);
// 绑定死信队列
args.put("x-dead-letter-exchange", "死信队列交换机名称");
// fanout模式不需要配置路由key,只有direct模式与topic模式需要配置
args.put("x-dead-letter-routing-key", "死信队列绑定到交换机的路由key");
return new Queue("队列名称", true, false, false, args);
}
调整RabbitMQ的使用内存
rabbitMQ使用内存可以通过在服务器执行命令进行修改其内存大小。(设置后重启服务器或重启rabbitMQ服务设置会失效)
// 设置内存为相对大小,相当于百分比,建议在0.4-0.7之间
vm_memory_high_watermark.relative = 0.6
// 设置内存为绝对大小,确定具体使用值
vm_memory_high_watermark.absolute = 2GB
// 设置剩余硬盘可用空间小于多少进行预警与队列挂起
rabbitmqctl set_disk_free_limit 100GB
// 内存换页,确定内存中的数据何时写入磁盘(值需要小于1,不然没有意义)
vm_memory_high_watermark_paging_ratio = 0.6
rabbitMQ默认最大使用内存为0.4,大小为(服务器内存大小 * 0.4)。
rabbitMQ默认硬盘可用空间小于50M会进行预警。
rabbitMQ默认内存换页是0.5,当内存被用去一半的时候进行硬盘的写入。
当内存空间不够或剩余硬盘可用空间预警后队列会被挂起,无法继续投递消息。
RabbitMQ集群搭建
1.启动第一个节点
// 启动命令
RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=自定义节点名称 rabbitmq-server -detached
// 结束命令
rabbitmqctl -n 自定义节点名称 stop
// 启动第一个节点命令
sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=自定义节点名称 rabbitmq-server start &
2.启动第二个节点
// 启动第二个rabbitMQ节点
// 由于在一台服务器上启动,端口会出现冲突,所以修改了端口号
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port, 15673}]" RABBITMQ_NODENAME=自定义节点名称 rabbitmq-server start &
3.验证
ps aux|grep rabbitmq
4.设置主节点与从节点
// 主节点
// 停止应用
sudo rabbitmqctl -n 自定义节点名称 stop_app
// 清除节点上的历史数据(如果不清除,无法将节点加入集群)
sudo rabbitmqctl -n 自定义节点名称 reset
// 启动应用
sudo rabbitmqctl -n 自定义节点名称 start_app
// 从节点
// 停止应用
sudo rabbitmqctl -n 自定义节点名称 stop_app
// 清除节点上的历史数据(如果不清除,无法将节点加入集群)
sudo rabbitmqctl -n 自定义节点名称 reset
// 将从节点加入到主节点集群当中
// 单机器部署写主机名称,多机器部署写主机IP
sudo rabbitmqctl -n 自定义节点名称 join_cluster 主节点名称@主机名称
// 启动应用
sudo rabbitmqctl -n 自定义节点名称 start_app
5.验证集群状态
sudo rabbitmqctl cluster_status -n 主节点名称
6.打开Web监控页面
// 默认关闭的,需要使用命令打开
rabbitmq-plugins enable rabbitmq_management
7.添加用户
// 新增用户
rabbitmqctl -n 节点名称 add_user 自定义用户名 自定义用户密码
// 配置用户角色
rabbitmqctl -n 节点名称 set_user_tags 用户名 权限名称(在学习笔记一中有记录)
// 配置用户权限
rabbitmqctl -n 节点名称 set_permissions -p / admin ".*" ".*" ".*"
注意:如果采用多机器部署集群的方式时,需要读取其中一个节点的cookie并复制到其他节点上(主从节点之间用cookie确定是否可通信)。
cookie文件存放在/var/lib/rabbitmq/.erlang.cookie
分布式事务
我之前有介绍过什么是事务,如果有不了解的可以看一下下面连接↓
Spring事务及工作中的使用
我们在工作中的事务都是在同一个JVM下的事务处理,那么跨服务的事务如何来做呢?比如A买了一件衣服,创建好订单后发送消息给库存中心进行库存扣减,但是发现库存中心报错了,那么创建好的订单信息就需要回滚,所以就提出了分布式事务的概念。
实现分布式事务的目的就是为了最终一致性,那么只有生产者与消费者的可靠性提升了,那么就会提升项目的最终一致性。
创建可靠生产者
创建订单冗余表,记录消息投递是否成功,如果消息没有投递到消息队列中,就需要使用定时任务进行重试投递。
创建可靠消费者
当消费者获取到消息后代码报错,RabbitMQ会不断进行重试,引发死循环,所以我们要对RabbitMQ重试的次数进行限制。
然后通过如下几个方案提升可靠性:
1.修改yml文件重试配置,控制重发次数 + 死信队列
2.tryCatch + 手动ack -消息会丢失
3.tryCatch + 手动ack + 死信队列处理 + 人工干预
配置yml文件,修改重试配置
spring:
rabbitmq:
username: guest
password: guest
virtual-host: /
host: 127.0.0.1
port: 5672
listener:
simple:
acknowledge-mode: manual # 手动ack
retry:
enabled: true #开启重试
max-attempts: 3 #最大重试次数,默认为3
initial-interval: 2000ms #重试间隔时间,单位毫秒,1000毫秒 = 1秒
// 手动应答
channel.basicAck(tag, false);
// 重发
// 参数:1.消息的tag,2.是否多条处理,3.是否重发
// 是否重发为false-不重发,直接发送到死信队列中
// true-重发再加上tryCatch可能会造成死循环
// 如果添加了tryCatch并用此方法确定是否重发,则配置文件中配置的重试规则会失效
channel.basicNack(tag, false, false)
RabbitMQ就学到这里了,分布式部分的内容搞的我有点迷糊,可能写的不太好,希望对您有所帮助。
|