IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ学习笔记(二) -> 正文阅读

[Java知识库]RabbitMQ学习笔记(二)

上一篇讲的都是基础运用,今天来学习一下高级应用部分。

先提出一个面试题

面试1:消息中间件的优点?

答:解耦、流量削峰、异步。

解耦:可以进行分布式分解,用消息中间件作为各个微服务消息传递的桥梁。

异步:消息中间件是多线程的,可以使原本串行的代码变为并行,使请求被响应的时间成倍的缩减。

流量削峰:当请求大量涌入的时候,可以先保存在消息队列中,等待服务器依次处理,避免服务器崩溃。

RabbitMQ整合SpringBoot

每次到整合SpringBoot的时候我都很开心,哈哈

注意:交换机及队列一旦创建后,重新加载配置也不会修改,只会报错,所以创建交换机或队列时需要深思熟虑,如果必须要换,请直接选择新增队列修改生产者投递消息的队列,不要删除原队列。

RabbitMQ整合fanout类型生产者

1.导入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.编写配置文件

spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    # 单机方式连接
    host: 127.0.0.1
    port: 5672
    # 集群方式连接
    addresses: 127.0.0.1:5672

3.编写rabbitMQ配置class文件

package com.haier.stock.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author ME
 * @date 2022/2/4 11:15
 */
@Configuration
public class RabbitMQConfig {
    // 声明注册fanout模式的交换机
    // 返回值确定了交换机类型
    @Bean
    public FanoutExchange fanoutExchange() {
        // 参数:1.交换机名称,2.是否持久化,3.是否自动删除
        return new FanoutExchange("交换机名称", true, false);
    }
    // 声明队列
    @Bean
    public Queue fanoutQueue() {
        // 参数:1.队列名称,2.是否持久化
        return new Queue("队列名称", true);
    }
    // 完成交换机与队列绑定
    @Bean
    public Binding fanoutBinding() {
        // 将fanoutQueue队列绑定到fanoutExchange交换机
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }
    //
}

4.在Service中进行调用

package com.haier.stock.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author ME
 * @date 2022/2/4 11:25
 */
@Service
public class RabbitMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void orderSend() {
        // 调用rabbitTemplate方法进行消息发送
        rabbitTemplate.convertAndSend("交换机名称", "路由 key", "消息内容");
    }
}

RabbitMQ整合fanout类型消费者

package com.haier.stock.config;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @author ME
 * @date 2022/2/4 14:23
 */
@RabbitListener(queues = {"监听的队列名称"})
// 需要将类交给Spring管理,不然接收不到消息,可用@Service或@Component
@Service
public class RabbitMQConsumerConfig {

    @RabbitHandler
    public void getMessage(String message) {
        System.out.println("接收到了消息" + message);
    }
}

RabbitMQ整合direct类型生产者

与fanout类型生产者类似,只有配置class文件有区别

package com.haier.stock.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author ME
 * @date 2022/2/4 11:15
 */
@Configuration
public class RabbitMQConfig {
    // 声明注册direct模式的交换机
    // 返回值确定了交换机类型
    @Bean
    public DirectExchange directExchange() {
        // 参数:1.交换机名称,2.是否持久化,3.是否自动删除
        return new DirectExchange("交换机名称", true, false);
    }
    // 声明队列
    @Bean
    public Queue directQueue() {
        // 参数:1.队列名称,2.是否持久化
        return new Queue("队列名称", true);
    }
    // 完成交换机与队列绑定
    @Bean
    public Binding directBinding() {
        // 将directQueue队列绑定到directExchange交换机
        // direct模式需要绑定路由key
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("路由key");
    }
    //
}

RabbitMQ的创建交换机、队列可以在生成者中创建,也可以消费者中创建,但是消费者消费时必须能够找到对应的交换机,所以在消费者中创建较为合适。

RabbitMQ整合topic类型,注解方式声明交换机及队列

package com.haier.stock.config;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @author ME
 * @date 2022/2/4 14:23
 */
// @QueueBinding注解将队列与交换机进行绑定
// value = @Queue(value = "队列名称", durable = "是否持久化", autoDelete = "是否自动删除")注解声明队列
// exchange = @Exchange(value = "交换机名称", type = ExchangeTypes.TOPIC)注解声明交换机
// key表示路由Key
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "队列名称", durable = "是否持久化", autoDelete = "是否自动删除"),
        exchange = @Exchange(value = "交换机名称", type = ExchangeTypes.TOPIC),
        key = "")
)
// 需要将类交给Spring管理,不然接收不到消息,可用@Service或@Component
@Service
public class RabbitMQConsumerConfig {

    @RabbitHandler
    public void getMessage(String message) {
        System.out.println("接收到了消息" + message);
    }
}

RabbitMQTTL队列过期时间

过期时间TTL表示可以对消息设置预期的时间,在时间内可以被消费者获取,超过时间则自动删除。

那么该如何指定队列中消息过期时间呢?代码如下

    // 声明队列
    @Bean
    public Queue directQueue() {
        // 参数:1.队列名称,2.是否持久化,3.是否排他性,4.是否自动删除,5.携带参数
        Map<String, Object> args = new HashMap<>(16);
        // 放入ttl键值对,指定过期时间,int类型,单位为毫秒,1000毫秒 = 1秒
        args.put("x-message-ttl", 5000);
        return new Queue("队列名称", true, false, false, args);
    }

如果您觉得队列中消息全部过期影响范围太大了,就想配置单条消息的过期时间也可以,代码如下

package com.haier.stock.service;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

/**
 * @author ME
 * @date 2022/2/4 11:25
 */
@Service
public class RabbitMQService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void orderSend() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 配置此条消息过期时间为5000毫秒,1000毫秒 = 1秒
                message.getMessageProperties().setExpiration("5000");
                // 配置消息所用字符集
                message.getMessageProperties().setContentEncoding(StandardCharsets.UTF_8.toString());
                return message;
            }
        };
        // 调用rabbitTemplate方法进行消息发送
        rabbitTemplate.convertAndSend("交换机名称", "路由 key", "消息内容", messagePostProcessor);
    }
}

那么问题来了,如果一个队列设置了过期时间,且单条消息也设置了过期时间,那么以哪个时间为准呢?

答案是以时间短的为准,两者的重叠部分最小时间就为此消息的过期时间。

两者区别在于,给队列设置了过期时间,我们可以将过期的消息放入死信队列中,但是给消息设置了过期时间,那就是真的过期删除掉了。

RabbitMQ死信队列

DLX,全称为(Dead-Letter-Exchange)死信交换机。当消息在一个队列中变成死亡信息,无法被消费的时候,他能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就被称为死信队列。

消息变为死亡信息可能因为:

1.消息被拒绝

2.消息过期

3.队列达到最大长度

DLX也是一个普通的交换机,只是当我们队列中出现死亡信息后,rabbitMQ会自动将死亡信息发送到另一个交换机的队列中,即死信队列。

使用死信队列只需要在定义队列的时候设置队列参数"x-dead-letter-exchange"指定交换机即可,与TTL设置过期时间方式相同。

    // 声明队列
    @Bean
    public Queue directQueue() {
        // 参数:1.队列名称,2.是否持久化,3.是否排他性,4.是否自动删除,5.携带参数
        Map<String, Object> args = new HashMap<>(16);
        // 放入ttl键值对,指定过期时间,int类型,单位为毫秒,1000毫秒 = 1秒
        args.put("x-message-ttl", 5000);
        // 配置队列的最大长度,int类型,超过此长度则自动放入死信队列
        args.put("x-max-length", 5);
        // 绑定死信队列
        args.put("x-dead-letter-exchange", "死信队列交换机名称");
        // fanout模式不需要配置路由key,只有direct模式与topic模式需要配置
        args.put("x-dead-letter-routing-key", "死信队列绑定到交换机的路由key");
        return new Queue("队列名称", true, false, false, args);
    }

调整RabbitMQ的使用内存

rabbitMQ使用内存可以通过在服务器执行命令进行修改其内存大小。(设置后重启服务器或重启rabbitMQ服务设置会失效)

// 设置内存为相对大小,相当于百分比,建议在0.4-0.7之间
vm_memory_high_watermark.relative = 0.6

// 设置内存为绝对大小,确定具体使用值
vm_memory_high_watermark.absolute = 2GB

// 设置剩余硬盘可用空间小于多少进行预警与队列挂起
rabbitmqctl set_disk_free_limit 100GB

// 内存换页,确定内存中的数据何时写入磁盘(值需要小于1,不然没有意义)
vm_memory_high_watermark_paging_ratio = 0.6

rabbitMQ默认最大使用内存为0.4,大小为(服务器内存大小 * 0.4)。

rabbitMQ默认硬盘可用空间小于50M会进行预警

rabbitMQ默认内存换页是0.5,当内存被用去一半的时候进行硬盘的写入

当内存空间不够或剩余硬盘可用空间预警后队列会被挂起无法继续投递消息

RabbitMQ集群搭建

1.启动第一个节点

// 启动命令
RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=自定义节点名称 rabbitmq-server -detached

// 结束命令
rabbitmqctl -n 自定义节点名称 stop

// 启动第一个节点命令
sudo RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=自定义节点名称 rabbitmq-server start &

2.启动第二个节点

// 启动第二个rabbitMQ节点
// 由于在一台服务器上启动,端口会出现冲突,所以修改了端口号
sudo RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port, 15673}]" RABBITMQ_NODENAME=自定义节点名称 rabbitmq-server start &

3.验证

ps aux|grep rabbitmq

4.设置主节点与从节点

// 主节点

// 停止应用
sudo rabbitmqctl -n 自定义节点名称 stop_app

// 清除节点上的历史数据(如果不清除,无法将节点加入集群)
sudo rabbitmqctl -n 自定义节点名称 reset

// 启动应用
sudo rabbitmqctl -n 自定义节点名称 start_app
// 从节点

// 停止应用
sudo rabbitmqctl -n 自定义节点名称 stop_app

// 清除节点上的历史数据(如果不清除,无法将节点加入集群)
sudo rabbitmqctl -n 自定义节点名称 reset

// 将从节点加入到主节点集群当中
// 单机器部署写主机名称,多机器部署写主机IP
sudo rabbitmqctl -n 自定义节点名称 join_cluster 主节点名称@主机名称

// 启动应用
sudo rabbitmqctl -n 自定义节点名称 start_app

5.验证集群状态

sudo rabbitmqctl cluster_status -n 主节点名称

6.打开Web监控页面

// 默认关闭的,需要使用命令打开
rabbitmq-plugins enable rabbitmq_management

7.添加用户

// 新增用户
rabbitmqctl -n 节点名称 add_user 自定义用户名 自定义用户密码

// 配置用户角色
rabbitmqctl -n 节点名称 set_user_tags 用户名 权限名称(在学习笔记一中有记录)

// 配置用户权限
rabbitmqctl -n 节点名称 set_permissions -p / admin ".*" ".*" ".*"

注意:如果采用多机器部署集群的方式时,需要读取其中一个节点的cookie并复制到其他节点上(主从节点之间用cookie确定是否可通信)。

cookie文件存放在/var/lib/rabbitmq/.erlang.cookie

分布式事务

我之前有介绍过什么是事务,如果有不了解的可以看一下下面连接↓

Spring事务及工作中的使用

我们在工作中的事务都是在同一个JVM下的事务处理,那么跨服务的事务如何来做呢?比如A买了一件衣服,创建好订单后发送消息给库存中心进行库存扣减,但是发现库存中心报错了,那么创建好的订单信息就需要回滚,所以就提出了分布式事务的概念。

实现分布式事务的目的就是为了最终一致性,那么只有生产者与消费者的可靠性提升了,那么就会提升项目的最终一致性。

创建可靠生产者

创建订单冗余表,记录消息投递是否成功,如果消息没有投递到消息队列中,就需要使用定时任务进行重试投递。

创建可靠消费者

当消费者获取到消息后代码报错,RabbitMQ会不断进行重试,引发死循环,所以我们要对RabbitMQ重试的次数进行限制。

然后通过如下几个方案提升可靠性:

1.修改yml文件重试配置,控制重发次数 + 死信队列

2.tryCatch + 手动ack -消息会丢失

3.tryCatch + 手动ack + 死信队列处理 + 人工干预

配置yml文件,修改重试配置

spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    host: 127.0.0.1
    port: 5672
    listener:
      simple:
        acknowledge-mode: manual # 手动ack
        retry:
          enabled: true #开启重试
          max-attempts: 3 #最大重试次数,默认为3
          initial-interval: 2000ms #重试间隔时间,单位毫秒,1000毫秒 = 1秒
// 手动应答
channel.basicAck(tag, false);

// 重发
// 参数:1.消息的tag,2.是否多条处理,3.是否重发
// 是否重发为false-不重发,直接发送到死信队列中
// true-重发再加上tryCatch可能会造成死循环
// 如果添加了tryCatch并用此方法确定是否重发,则配置文件中配置的重试规则会失效
channel.basicNack(tag, false, false)

RabbitMQ就学到这里了,分布式部分的内容搞的我有点迷糊,可能写的不太好,希望对您有所帮助。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-02-05 21:35:51  更:2022-02-05 21:37:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/28 4:22:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码