IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ -> 正文阅读

[Java知识库]RabbitMQ

RabbitMQ

一、首次安装

普通部署

下载地址 - Release RabbitMQ 3.6.14 · rabbitmq/rabbitmq-server

# 安装erlang
yum -y update
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
yum -y install epel-release
yum -y install socat
yum -y install erlang
# 检查erl版本
erl -version
# 安装RabbitMQ - 先去官网下载RPM包 rabbitmq-server-3.6.14-1.el6.noarch.rpm
yum install -y net-tools
rpm -Uvh rabbitmq-server-3.6.14-1.el6.noarch.rpm
# 启动MQ并关闭防火墙
systemctl start rabbitmq-server
systemctl status rabbitmq-server
iptables -F
# Rabbit的默认通讯端口是5672
# 安装Web控制台
rabbitmq-plugins enable rabbitmq_management
# 设置初始用户并设置为管理员
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin “.*” “.*” “.*”
# 访问Web控制台
http://192.168.247.177:15672

Docker部署

两步完事

docker pull rabbitmq:management
docker run -dit --name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

二、基本使用

2.1 第一个示例工程

公共部分

依赖导入

 <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <version>2.3.4.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.13</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
                <version>2.3.4.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-test</artifactId>
                <version>2.3.4.RELEASE</version>
            </dependency>
            <!-- 没有集成boot的时候 -->
            <!--        <dependency>-->
            <!--            <groupId>com.rabbitmq</groupId>-->
            <!--            <artifactId>amqp-client</artifactId>-->
            <!--            <version>5.7.3</version>-->
            <!--        </dependency>-->

            <!-- 集成了boot后 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.3.4.RELEASE</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

application.yaml

spring:
  rabbitmq:
    host: 192.168.247.173
    port: 5672
    username: admin
    password: admin
    virtual-host: /

生产者

做常规启动类,然后写个配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    /**
     * 配置交换机/队列的名字
     */
    public static final String EXCHANGE_NAME = "Demo01_direct_exchange";
    public static final String QUEUE_NAME = "Demo01_queue";

    /**
     * Exchange 交换机
     * 路由模式
     *
     * @return
     */
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
    }

    /**
     * Queue 队列
     * 持久化类型的队列
     * 我查看源码,它就两种类型,持久化/非持久化
     *
     * @return
     */
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 交换机与队列的绑定关系
     * noargs():表示不指定参数
     *
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,
                                     @Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("routingKeyForDemo01").noargs();
    }

}

Test发送消息到MQ

import com.ljm.ProducerSpringBootApplication;
import com.ljm.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;


@SpringBootTest(classes = ProducerSpringBootApplication.class)
@RunWith(SpringRunner.class)
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 第一个参数:交换机名字
     * 第二个参数:routingKey
     * 第三个参数:发送的消息
     */
    @Test
    public void testSend01() throws InterruptedException {
        rabbitTemplate.convertAndSend
                (RabbitMQConfig.EXCHANGE_NAME,
                        "routingKeyForDemo01",
                        "is Demo01 Message");

    }
}

消费者

做常规启动类,做监听器监听消息,然后运行启动类

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbimtMQListener {

    @RabbitListener(queues = "Demo01_queue")
    public void listenerQueue(Message message){
        System.out.println(new String(message.getBody()));
    }
}

效果

在消费者上收到消息

总结流程

  1. 做好交换机和队列的绑定关系、交换机和队列的基础配置,可以在提供者上进行设置或走页面配置
  2. 提供者:指定交换机名称、routingKey进行发送消息
  3. 消费者:监听指定队列的消息
  4. 消费者提供者的数据流转在节点的过程,涉及到了序列化,中间件已经帮我们处理

2.2 正式介绍

根据上面的配置,相信大家对消息队列应该有个大概的认识,下面概括一下这玩意

首先我们先说一下消息中间件的主要的作用:

[1]异步处理

[2]解耦服务

[3]流量削峰

上面的三点是我们使用消息中间件最主要的目的


什么是QPS,PV , UV , PR

  • QPS:每秒查询率,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准
  • PV:页面浏览量
  • UV:访问某个站点或点击某条新闻的不同IP地址的人数
  • PR:即PageRank,网页的级别技术,用来标识网页的等级/重要性。级别从1到10级,10级为满分。PR值越高说明该网页越受欢迎(越重要)

MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS

  • AMQP

    即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ

  • JMS

    即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

  • AMQP 与 JMS 区别

    • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
    • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
    • JMS规定了两种消息模式;而AMQP的消息模式更加丰富

2.3 MQ工作模式

官网说明了有6种工作模式

  • 简单模式

    一比一的发送,在实际生产没有什么大用处

  • Work queues 工作队列模式

    一个提供者,多个消费者,消费者之间对于同一个消息的关系是竞争的关系

    对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度,只需要有一个节点成功发送即可

  • Publish/Subscribe 发布/订阅模式

    类似于广播,谁订阅了都能收到

  • Routing 路由模式

    队列和交换机的routingKey要完全一致才能收到消息

  • Topics 通配符模式

    队列和交换机的routingKey模糊匹配一直就能收到消息

  • RPC模式

    远程调用,我没用到

2.4 交换机类型

  • DIRECT

    定向,把消息发送到与这个交换机绑定的队列里面,路由键要求一致

  • FANOUT

    扇形(广播),发送消息到每一个与之绑定队列。

  • TOPIC

    通配符的方式,其中*用于匹配一个单词,#用于匹配多规格单词(可以为零个)

  • HEADERS

    参数匹配,不依赖路由键的匹配规则,而是根据消息内容的headers属性进行匹配,但是比较消耗性能,一般没人用

三、进阶玩法

3.1 消息的可靠性投递

正常情况下,RabbitMQ提供了两种解决方案

  • 事务机制
  • 发送方确认
    • 自动确认 acknowledge=“none”
    • 手动确认 acknowledge=“manual”

一般我们不推荐使用事务机制,因为那样会严重降低MQ的消息吞吐量

3.2 Return回退机制

在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到

这个时候我们需要监听这种不可达的消息,就要使用return listener

3.3 TTL消息超时

1

3.4 死信队列

什么是死信队列

顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列

成为死信队列的三种情况

  • 队列消息长度到达限制
  • 消费者拒接消费消息
  • 原队列存在消息过期设置,消息到达超时时间未被消费

常规的处理方案大概是以下几种

  • 丢掉
  • 记录死信入库,然后做后续的业务分析或处理
  • 通过死信队列,由负责监听死信的应用程序进行处理

3.5 延迟队列

从字面意思分析,这条队列是消息发送出去以后,消费者不会即刻得到消息,而是到了指定时间后才能拿到消息进行消费

3.6 优先级队列

1

3.7 消息百分百成功投递

对于一些重要的消息,我们是希望它必须被消费的,除了宕机这种无法挽救的结局,所以我们需要采取措施

  1. 生产者:消息信息存入数据库,并将消息记录存到数据库中的消息记录表(表中存在消息投递状态字段status
  2. 生产者:使用confirm手动确认的方式发送消息到MQ节点
  3. 消费者:收到消息,执行回调
  4. 生产者:收到消息成功被消费的回调,修改消息记录表中的消息状态字段status=1

这里可能会发生以下问题

  • 由于网络闪断导致消息未成功发送/收到
  • 消息重复消费

解决方案

  1. 消费未收到

    可以通过定时任务进行反复推送确认,并且,可以采用设置最大努力尝试次数,比如投递了3次,还是失败,那么我们可以将最终状态设置为Status = 2 ,最后 交由人工解决处理此类问题(或者把消息转储到失败表中)

  2. 消息重复消费

    可以采用乐观锁

四、三方框架集成

4.1 SpringBoot

1

1

4.2 SpringCloudStream

1

五、原理分析

1

没写完

over

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-11-19 17:30:07  更:2021-11-19 17:31:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 2:38:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码