IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ课程笔记 -> 正文阅读

[大数据]RabbitMQ课程笔记

               RabbitMQ消息中间件

1.MQ引言

1.1 什么是MQ

MQ----Message Queue消息队列,别名 消息中间件,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间的解耦。别名为消息中间件。通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2 MQ有哪些

? 当今市面上有很多主流的消息中间件,如老牌的ActiveMQ,RabbitMQ,阿里巴巴自主开发的RocketMQ,炙手可热的Kafka。

1.3 不同MQ的特点

1.ActiveMQ

? ActiveMQ是Apache出品的,最流行的,能力最强的开源消息总线,它是一个安全支持JMS规范的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小学企业颇受欢迎!

2.RabbitMQ ----追求数据一致性 数据可靠 Spring默认集成 不会丢失数据

? RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性,安全,AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

3.RocketMQ —阿里巴巴 开源的功能相对比较少,官方的版本功能不错

? RocketMQ是阿里巴巴开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

4.Kafa —追求高吞吐量,抗非常高的并发,会丢失数据

? Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复,丢失,错误没有严格的要求,适合产生大量数据的互联网服务的数据收集业务。

 RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

2.RabbitMQ的引言

2.1 RabbitMQ

基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

SpringBoot默认集成RabbitMQ消息中间件,

RabbitMQ不丢失任何的数据

? AMQP协议是在2003年时被提出的,最早用于解决金融领域不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wirelevel protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是之间定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

2.2 RabbitMQ的安装

2.2.1 下载

Erlang下载地址(各版本都可下载)

? http://www.erlang.org/downloads

? 我下载的是19.3,地址:http://erlang.org/download/otp_win64_19.3.exe

RabbitMQ下载地址:

? Github仓库:

? https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.7/rabbitmq-server-3.7.7.exe
? Bintray仓库:

? https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.7/rabbitmq-server-3.7.7.exe

下载完如下图:

image-20211017194322372

2.2.2 安装

Erlang安装:

? 安装过程简单粗暴,以管理员身份运行,然后一直next即可

img

? 安装

img

? 完成

?

配置ERLANG_HOME:

image-20211017195959014

配置Erlang的path:

image-20211017200312257

RabbitMQ安装:

? img

安装

img

完成

配置RABBITMQ_HOME:

image-20211017200420627

配置RabbitMQ的path:

image-20211017200636424

到此,RabbitMQ已经安装完毕了,打开【任务管理器】中的【服务】项,即可看到有一个【RabbitMQ】服务正在执行。

img

以管理员身份运行cmd.exe,进入目录D:\Program Files\rabbitmq_server-3.7.9\sbin(RabbitMQ Server安装目录),

运行cmd命令:rabbitmq-plugins.bat enable rabbitmq_management 激活管理插件

img

以管理员身份运行cmd.exe,运行命令:rabbitmq-service install 安装服务,运行 net stop RabbitMQ && net start RabbitMQ。启动RabbitMQ Server:

进入控制台:

地 址:http://localhost:15672/

用户名:guest

密 码:guest

image-20211017201014108

2.2.3 MQ的应用场景

1.异步处理

注册 —> 发送邮件 —>发送短信

? 串行方式:

image-20211017202101763

? 并行方式:

? image-20211017202218950

? 消息队列:

image-20211017202323345

2.应用解耦

? 场景:

image-20211017202413697

? 缺点:当库存系统出现障碍时,订单就会失败。订单系统和库存系统高耦合,引入消息队列。

image-20211017202526230

3.流量削峰

image-20211017202631749

3.RabbitMQ配置

访问RabbitMQ控制台:

? http://localhost:15672/

image-20211004170125942

添加RabbitMQ依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>

4.点对点模型

在这里插入图片描述

一个生产者,对应一个消费者

在上图的模型中,有以下概念:

。P:生产者,也就是要发送消息的程序

。C: 消费者,消息的接收者,会一直等待消息的到来

。queue:消息队列,图中的红色部分,类似于一个邮箱,可以缓存消息,生产者想其中投递消息,消费者从其中

? 取出消息。

4.1.生产者:发送消息

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import com.tangguanlin.rabbitmq.Utils.RabbitMQUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:点对点模型
 * 作者:汤观林
 * 日期:2021年10月04日 17时
 */
public class Producer {

    //生产消息
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        /**
         * 4.通道绑定对应消息队列
         * 参数1: queue 队列的名称 (不存在自动创建)
         * 参数2: durable 用来定义队列的特性是否要持久化 true:持久化  false:不持久化 重启后 队列还在
         * 参数3: exclusive 是否独占队列  true:独占  false:不独占
         * 参数4: autoDelete 是否在消费完成后自动删除队列 true:自动删除 false:不自动删除
         * 参数5: 额外附加参数
         */
        channel.queueDeclare("hello",false ,false,false,null);

        /**
         * 5.发布消息
         * 参数1: exchange 交换机名称
         * 参数2: routingKey 队列名称
         * 参数3: props 传递消息额外设置  MessageProperties.PERSISTENT_TEXT_PLAIN 消息重启后不会丢失
         * 参数4:  消息的具体内容
         */
        channel.basicPublish("","hello", null,"hello rabbitmq".getBytes());

        //6.释放资源
        channel.close();;
        connection.close();

        //RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

4.2.消费者:接收消息

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:接收消息
 * 作者:汤观林
 * 日期:2021年10月04日 18时
 */
public class Customer {
    public static void main(String[] args) throws IOException, TimeoutException {

         //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        /**
         * 4.通道绑定对应消息队列
         * 参数1: queue 队列的名称 (不存在自动创建)
         * 参数2: durable 用来定义队列的特性是否要持久化 true:持久化  false:不持久化 重启后 队列还在
         * 参数3: exclusive 是否独占队列  true:独占  false:不独占
         * 参数4: autoDelete 是否在消费完成后自动删除队列 true:自动删除 false:不自动删除
         * 参数5: 额外附加参数
         */
        channel.queueDeclare("hello",false ,false,false,null);

        /**
         * 5.接收消息
         * 参数1:消息那个队列的消息  队列名称
         * 参数2:开始消息的自动确认机制  true:自动确认  false:不自动确认
         * 参数3:消费时的回调接口
         */
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            /**
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body 消息队列中取出消息
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                System.out.println(new String(body));
            }
        });

        //6.释放资源
        channel.close();;
        connection.close();
    }
}

5.工作队列work

? img

? work queue,任务模型,当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型,让多个消费者绑定到一个队列同事消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

? 类似于1个CPU,多个打印机

角色: 一对多

。p:生产者 任务的发布者

。C1:消费者1 领取任务并且完成任务,假完成任务速度较慢

。C2:消费者2 领取任务并且完成任务,假设完成任务速度快

5.1 工作队列平均分配

image-20211017201824631

平均分配的缺点:

? RabbitMQ给消费者1分配5个,消费者2分配5个,当消费者1执行完第3个时,宕机了,这是消费者1的第4个,第5个消息丢失,这是业务逻辑无法接受的。业务功能不希望失去任何消息。

5.1.1 生产者

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:工作队列  生产者
 * 作者:汤观林
 * 日期:2021年10月05日 11时
 */
public class WorkQueueProducer {

    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //Connection connection = RabbitMQUtils.getConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通过通道绑定队列
        channel.queueDeclare("work",true,false,false,null);

        //5.发布消息
        for(int i=1;i<=20;i++){
            channel.basicPublish("","work",null,(i+"hello work queue").getBytes());
        }

        //6.释放资源
        channel.close();
        connection.close();
    }
}

5.1.2 消费者1

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.logging.Handler;
/**
 * 说明:工作队列 消费者1
 * 作者:汤观林
 * 日期:2021年10月05日 11时
 */
public class WorkQueueCustomer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
    
        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //Connection connection = RabbitMQUtils.getConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通过通道绑定队列
        channel.queueDeclare("work",true,false,false,null);

        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1"+new String(body));
               try{
                   Thread.sleep(2000);  //消费者1  处理速度更慢
               }catch(Exception e){
                   e.printStackTrace();
               }
            }
        });

        channel.close();;
        connection.close();;
    }
}

5.1.3 消费者2

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:工作队列 消费者2
 * 作者:汤观林
 * 日期:2021年10月05日 11时
 */
public class WorkQueueCustomer2 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //Connection connection = RabbitMQUtils.getConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通过通道绑定队列
        channel.queueDeclare("work",true,false,false,null);

        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2"+new String(body));
            }
        });

        channel.close();;
        connection.close();;
    }
}

5.2 工作队列多劳多得

多劳多得能解决平均分配模式的缺点。

消息放队列中,每次给通道1个消息,每次只能消费一个消费,取消自动确认。

//一次只接受一条未确认的消息
channel.basicQos(1);
//参数2:关闭自动确认消息
channel.basicConsume("work",false,new DefaultConsumer(channel){
//手动确认  参数1:手动确认消息标识   参数2: false 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),true);

5.2.1 生产者

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:工作队列生产者
 * 作者:汤观林
 * 日期:2021年10月05日 11时
 */
public class WorkQueueProducer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //Connection connection = RabbitMQUtils.getConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通过通道绑定队列
        channel.queueDeclare("work",true,false,false,null);

        //5.发布消息
        for(int i=1;i<=20;i++){
            channel.basicPublish("","work",null,(i+"hello work queue").getBytes());
        }

        //6.释放资源
        channel.close();
        connection.close();
    }
}

5.2.2 消费者1

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:工作队列多劳多得 消费者1
 * 作者:汤观林
 * 日期:2021年10月05日 11时
 */
public class WorkQueueCustomer1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //Connection connection = RabbitMQUtils.getConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通过通道绑定队列
        channel.queueDeclare("work",true,false,false,null);
        //一次只接受一条未确认的消息
        channel.basicQos(1);

        //参数2:关闭自动确认消息
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            /**
             * 读取消息
             * @param consumerTag  队列名称
             * @param envelope  消息自动确认 true:消费者自动向rabbitmq确认消息消费  false:不会自动确认消费
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try{
                    Thread.sleep(2000);
                }catch(Exception e){
                    e.printStackTrace();
                }
                System.out.println("消费者1"+new String(body));
                //手动确认  参数1:手动确认消息标识   参数2: false 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),true);
            }
        });

        //channel.close();;
        //connection.close();;
    }
}

5.2.3 消费者2

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:工作队列多劳多得 消费者2
 * 作者:汤观林
 * 日期:2021年10月05日 11时
 */
public class WorkQueueCustomer2 {

    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //Connection connection = RabbitMQUtils.getConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通过通道绑定队列
        channel.queueDeclare("work",true,false,false,null);
        //一次只接受一条未确认的消息
        channel.basicQos(1);

        channel.basicConsume("work",false,new DefaultConsumer(channel){
            /**
             * 读取消息
             * @param consumerTag  队列名称
             * @param envelope  消息自动确认 true:消费者自动向rabbitmq确认消息消费  false:不会自动确认消费
             * @param properties
             * @param body
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2"+new String(body));
                //手动确认  参数1:手动确认消息标识   参数2: false 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),true);
            }
        });

        //channel.close();;
        //connection.close();;
    }
}

6.发布订阅模式fanout

img场景:用户登录成功后,既要发邮件,又要加积分,

? 用户登录成功后,发送一条消息给发邮件队列,再发送一条消息给加积分队列。

? 购物车-------下订单 订单系统

? 下订单--------库存要更新 库存系统

? 在开发中,用得比较多

在广播模式下,消息发送的流程是这样的:

。可以有多个消费者

。每个消费者有自己的队列queue

。每个队列都要绑定到交换机Exchange

。生产者发送的消息,只能发送到交换机,交换机来决定发送给哪个队列,生产者无法决定

。交换机把消息发送给绑定过的所有队列

。队列的消息都能拿到消息,实现一条消息被多个消费者消费

缺点:广播,每个队列接收的消息 都是整个交换机的消息,这个时候,其实分2个队列和1个队列效果是一样的,不能对消息进行筛选处理,不能部分消费。

6.1 生产者

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:发布订阅模式 生产者
 * 作者:汤观林
 * 日期:2021年10月05日 15时
 */
public class FanoutProducer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        /**
         * 4.通道绑定交换机
         * 参数1 exchange 交换机名称
         * 参数2 type  交换机类型 fanout 广播类型
         */
        channel.exchangeDeclare("logs","fanout");

        /**
         *  5.发送消息
         *  参数1 exchange  交换机名称
         *  参数2 routingKey  路由
         *  参数3 额外参数
         *  参数4 消息内容
         */
        channel.basicPublish("logs","",null,"fanout type message".getBytes());

        //6.释放资源
        channel.close();
        connection.close();
    }
}

6.2 消费者1

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:发布订阅模式 消费者1
 * 作者:汤观林
 * 日期:2021年10月05日 15时
 */
public class FanoutCustomer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //Connection connection = RabbitMQUtils.getConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通道绑定交换机
        channel.exchangeDeclare("logs","fanout");

        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //5.绑定交换机和队列
        channel.queueBind(queueName,"logs","");

        //6.接收消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });

        //7.释放资源
        //channel.close();
        //connection.close();
    }
}

6.3 消费者2

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:发布订阅模式 消费者2
 * 作者:汤观林
 * 日期:2021年10月05日 15时
 */
public class FanoutCustomer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();
        //Connection connection = RabbitMQUtils.getConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通道绑定交换机
        channel.exchangeDeclare("logs","fanout");

        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //5.绑定交换机和队列
        channel.queueBind(queueName,"logs","");

        //6.接收消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
            }
        });

        //7.释放资源
        //channel.close();
        //connection.close();
    }
}

7.路由模式direct

? 可以对发布订阅模式中交换机里的消息进行筛选获取,部分处理。

? 在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这是就要用到Direct类型的Exchange。

? 在Direct模型下:

? 。队列和交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

? 。消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey

? 。Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey完全一致,才会接收到消息。

流程:

在这里插入图片描述

图解:

。P:生产者 向Exchange发送消息,发送消息时,会指定一个Routing Key

。X:Exchange(交换机) 接收生产者的消息,然后把消息递交给与Routing Key完全匹配的队列

。C1:消费者1 其所在队列指定了需要Routing Key为error的消息

。C2:消费者2 其所在队列指定了需要Routing Key为info、error、warning的消息

7.1 生产者

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:路由模式 生产者
 * 作者:汤观林
 * 日期:2021年10月05日 16时
 */
public class DirectProducer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.绑定交换机     参数1:交换机名称   参数2:路由模式
        channel.exchangeDeclare("logs_direct","direct");

        //5.发送消息  指定 routingkey
        String routingKey = "error";
        channel.basicPublish("logs_direct",routingKey,null,("这是direct模型发布的基于routing key:"+routingKey+"的消息").getBytes());

        //6.释放资源
        channel.close();
        connection.close();
    }
}

7.2 消费者1

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:路由模式消费者1
 * 作者:汤观林
 * 日期:2021年10月05日 16时
 */
public class DirectCustomer1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通道绑定交换机
        channel.exchangeDeclare("logs_direct","direct");

        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //5.绑定交换机和队列
        channel.queueBind(queueName,"logs_direct","error");

        //6.接收消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+new String(body));
            }
        });

        //7.释放资源
        //channel.close();
        //connection.close();
    }
}

7.3 消费者2

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:路由模式消费者2
 * 作者:汤观林
 * 日期:2021年10月05日 16时
 */
public class DirectCustomer2 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.通道绑定交换机
        channel.exchangeDeclare("logs_direct","direct");

        //临时队列
        String queueName = channel.queueDeclare().getQueue();
        //5.绑定交换机和队列  根据routing key匹配 队列
        channel.queueBind(queueName,"logs_direct","error");
        channel.queueBind(queueName,"logs_direct","info");
        channel.queueBind(queueName,"logs_direct","warning");

        //6.接收消息
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+new String(body));
            }
        });

        //7.释放资源
        //channel.close();
        //connection.close();
    }
}

8.通配符模式topic

? Topic类型的Exchange与Direct相比,都是可以根据Routing key把消息路由到不同的队列。只不过topic类型Exchange可以让队列绑定Routing key的时候使用通配符,这种模型都是由一个或多个单词组成,多个单词之间以“.”分隔,例如:item.insert

.* 单个匹配

.# 无限匹配

流程:

这里写图片描述

8.1 生产者

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:通配符模式  生产者
 * 作者:汤观林
 * 日期:2021年10月05日 18时
 */
public class TopicProducer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.绑定交换机
        channel.exchangeDeclare("logs_topic","topic");

        //5.发布消息
        String routkey = "user2.save";
        channel.basicPublish("logs_topic",routkey,null,"这里是topic通配符模式".getBytes());

        //6.释放资源
        channel.close();
        connection.close();
    }
}

8.2 消费者1

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import org.jdom2.output.StAXStreamOutputter;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:通配符模式 消费者1
 * 作者:汤观林
 * 日期:2021年10月05日 18时
 */
public class TopicCustomer1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.定义交换机
        channel.exchangeDeclare("logs_topic","topic");

        //5.绑定交换机
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs_topic","user1.*");

        //6.接收消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1"+new String(body));
            }
        });

        //7.释放资源
        //channel.close();
        //connection.close();//7.
    }
}

8.3 消费者2

package com.tangguanlin.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 说明:通配符模式 消费者2
 * 作者:汤观林
 * 日期:2021年10月05日 18时
 */
public class TopicCustomer2 {
    public static void main(String[] args) throws IOException, TimeoutException {

        //1.创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("127.0.0.1"); //设置连接rabbitmq服务器ip
        connectionFactory.setPort(5672);   //设置端口号
        connectionFactory.setVirtualHost("/ems"); //设置连接虚拟主机
        connectionFactory.setUsername("ems");  //访问虚拟主机的用户名
        connectionFactory.setPassword("123");  //访问虚拟主机的密码

        //2.创建连接
        Connection connection = connectionFactory.newConnection();

        //3.获取连接中的通道
        Channel channel = connection.createChannel();

        //4.定义交换机
        channel.exchangeDeclare("logs_topic","topic");

        //5.绑定交换机
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"logs_topic","user2.*");

        //6.接收消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2"+new String(body));
            }
        });

        //7.释放资源
        //channel.close();
        //connection.close();
    }
}

9. SpringBoot整合RabbitMQ

springboot和rabbitMQ集成的包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件:application.yml

rabbitmq:
  host: 127.0.0.1
  virtual-host: /ems
  port: 5672
  username: ems
  password: 123

RabbitTemplate简化操作

9.1 点对点模型

9.1.1 生产者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:点对点模式  生产者
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */
@RestController
public class RabbitProducer {
     @Autowired
     private RabbitTemplate rabbitTemplate;

     @GetMapping("/p2p")
     public void point2pointProducer(){
         rabbitTemplate.convertAndSend("p2p","hello world");

     }
}

9.1.2 消费者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:点对点模式  消费者
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */
 
@Component
@RabbitListener(queuesToDeclare = @Queue("p2p"))
public class RabbitCustomer {

    @RabbitHandler
    public void receivePoint2point(String message){
       System.out.println("message="+message);
    }
}

9.2 工作队列平均消费

9.2.1 生产者

package com.tangguanlin.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:工作队列 生产者
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */
@RestController
public class RabbitProducer {

     @Autowired
     private RabbitTemplate rabbitTemplate;

     @GetMapping("/work")
     public void workProducer(){
          rabbitTemplate.convertAndSend("work","work模型");
     }
}

9.2.2 消费者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:工作队列 消费者1 消费者2
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */

@Component
public class RabbitCustomer {

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveWork(String message){
        System.out.println("work1 message = " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receiveWork2(String message){
        System.out.println("work2 message = " + message);
    }
}

9.3 发布/订阅模式

9.3.1 生产者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:发布/订阅模式 生产者
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */
@RestController
public class RabbitProducer {

     @Autowired
     private RabbitTemplate rabbitTemplate;

     @GetMapping("/fanout")
     public void fanoutProducer(){
          rabbitTemplate.convertAndSend("fanout","","发布/订阅模式发送的消息");
     }
}

9.3.2 消费者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:发布/订阅模式 消费者1 消费者2
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */

@Component
public class RabbitCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时的队列
                    exchange = @Exchange(value = "fanout",type="fanout") //绑定的交换机
            )
    })
    public void receiveFanout(String message){
        System.out.println("fanout1 message ="+ message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时的队列
                    exchange = @Exchange(value = "fanout",type="fanout") //绑定的交换机
            )
    })
    public void receiveFanout2(String message){
        System.out.println("fanout2 message ="+ message);
    }
}

9.4 路由模式

9.4.1 生产者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:路由模式  生产者
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */
@RestController
public class RabbitProducer {

     @Autowired
     private RabbitTemplate rabbitTemplate;

     @GetMapping("/direct")
     public void directProducer(){
          rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
     }
}

9.4.2 消费者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:路由模式 消费者1  消费者2
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */

@Component
public class RabbitCustomer {

   @RabbitListener(bindings = {
           @QueueBinding(
                   value =  @Queue, //创建临时队列
                   exchange = @Exchange(value = "directs",type="direct"), //自定义交换机名称和类型
                   key = {"info"}
           )
   })
    public void receiveDirect(String message){
        System.out.println("direct1 message = "+ message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value =  @Queue, //创建临时队列
                    exchange = @Exchange(value = "directs",type="direct"), //自定义交换机名称和类型
                    key = {"error","warning"}
            )
    })
    public void receiveDirect2(String message){
        System.out.println("direct2 message = "+ message);
    }
}

9.5 通配符模式

9.5.1 生产者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:通配符模式  生产者
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */
@RestController
public class RabbitProducer {

     @Autowired
     private RabbitTemplate rabbitTemplate;

     @GetMapping("/topic")
     public void topicProducer(){
          rabbitTemplate.convertAndSend("topics","user.insert","user.save路由信息");
     }
}

9.5.2 消费者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;
/**
 * 说明:通配符模式 消费者1 消费者2
 * 作者:汤观林
 * 日期:2021年10月05日 21时
 */
 
@Component
public class RabbitCustomer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(value="topics",type = "topic"), //自定义交换机名称和类型
                    key ={"user.*"}
            )
    })
    public void receiveTopic(String message){
        System.out.println("topic message = "+ message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange = @Exchange(value="topics",type = "topic"), //自定义交换机名称和类型
                    key ={"user.save"}
            )
    })
    public void receiveTopic2(String message){
        System.out.println("topic2 message = "+ message);
    }
}

10.RabbitMQ的集群

10.1 主从集群

image-20211017202721350

10.2 镜像集群

image-20211017202815225

10.2.1 查看当前策略

rabbitmqctl list_policies

10.2.2 添加策略

rabbitmqctl set_policy ha-all “hello” ‘{“ha-mode”:“all”,“ha-sync-mode”:“automatic”}’

说明:策略正则表达式为 ^ 表示匹配所有队列名称 ^hello:匹配hello开头队列

10.2.3 删除策略

rabbitmqctl clear_policy ha-all
r.save路由信息");
}
}




### 9.5.2 消费者

package com.tangguanlin.controller;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RestController;
/**

  • 说明:通配符模式 消费者1 消费者2
  • 作者:汤观林
  • 日期:2021年10月05日 21时
    */

@Component
public class RabbitCustomer {

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue, //创建临时队列
                exchange = @Exchange(value="topics",type = "topic"), //自定义交换机名称和类型
                key ={"user.*"}
        )
})
public void receiveTopic(String message){
    System.out.println("topic message = "+ message);
}

@RabbitListener(bindings = {
        @QueueBinding(
                value = @Queue, //创建临时队列
                exchange = @Exchange(value="topics",type = "topic"), //自定义交换机名称和类型
                key ={"user.save"}
        )
})
public void receiveTopic2(String message){
    System.out.println("topic2 message = "+ message);
}

}




# 10.RabbitMQ的集群

## 10.1 主从集群

[外链图片转存中...(img-HBPlSjoh-1634474231305)]





## 10.2 镜像集群

[外链图片转存中...(img-im8DCxmZ-1634474231306)]

### 10.2.1 查看当前策略

 rabbitmqctl     list_policies



### 10.2.2 添加策略

  rabbitmqctl set_policy ha-all "hello" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

说明:策略正则表达式为 ^ 表示匹配所有队列名称  ^hello:匹配hello开头队列



### 10.2.3 删除策略

rabbitmqctl clear_policy ha-all
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-18 17:27:40  更:2021-10-18 17:29:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 2:58:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码