IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ学习笔记 -> 正文阅读

[大数据]RabbitMQ学习笔记

前言:
学习B站UP主狂神说视频笔记整理视频链接

什么是中间件

中间件( Middleware ) 是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。

为什么要使用消息中间件

中间件( Middleware ) 是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。

消息中间件的应用场景

1:跨系统数据传递

2:高并发的流量削峰

3:数据的分发和异步处理

4:大数据分析与传递

5:分布式事务

比如你有一个数据要进行迁移或者请求并发过多的时候,比如你有10W的并发请求下订单,我们可以在这些订单入库之前,我们可以把订单请求堆积到消息队列中,让它稳健可靠的入库和执行。

常见的消息中间件

ActiveMQ.RabbitMQ、Kafka、RocketMQ等。

消息队列的本质及设计

它是一种接受数据,接受请求、存储数据、发送数据等功能的技术服务。

MQ消息队列:负责数据的传接受,存储和传递,所以性能要过于普通服务和技术。其背后肯定要遵循某种协议

在这里插入图片描述

消息队列的核心组成部分

1:消息的协议
2:消息的持久化机制
3:消息的分发策略
4:消息的高可用,高可靠
5:消息的容错机制

消息队列协议

在这里插入图片描述
我们知道消息中间件负责数据的传递,存储,和分发消费三个部分,数据的存储和分发的过程中肯定要遵循某种约定成俗的规范,你是采用底层的TCP/IP,UDP协议还是其他的自己取构建等,而这些约定成俗的规范就称之为:协议

所谓协议是指:
1:计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流。
2∶和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高。
3:协议对数据格式和计算机之间交换数据都必须严格遵守规范。

而消息中间件采用的并不是http协议,而常见的消息中间件协议有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议。

面试题:为什么消息中间件不直接使用http协议呢?
1:因为http请求报文头和响应报文头是比较复杂的,包含了cookie,数据的加密解密,状态码,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速。
2大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

AMQP协议

AMQP:(全称: Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
特性:
1:分布式事务支持。2:消息的持久化支持。
3:高性能和高可靠的消息处理优势。

在这里插入图片描述

OpenMessage协议

是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
特点:
1∶结构简单2∶解析速度快
3:支持事务和持久化设计。
在这里插入图片描述

Kafka协议

Kafka协议是基于TCP/IP的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。特点是:
1:结构简单2︰解析速度快3:无事务支持4:有持久化设计

在这里插入图片描述

消息队列分发策略

MQ消息队列有如下几个角色
1:生产者
2:存储消息
3:消费者
那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。
在这里插入图片描述

消息队列高可用及高可靠

什么是高可用

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。

集群模式1-Master-Slave主从数据共享

在这里插入图片描述
生产者讲消费发送到Master节点,所有的都连接这个消息队列共享这块数据区域,Master节点负责写入,一旦Master挂掉,slave节点继续服务。从而形成高可用,

集群模式2-Master-Slave主从数据同步

在这里插入图片描述
这种模式写入消息同样在Master主节点上,但是主节点会同步数据到slave节点形成副本,和zookeeper或者redis主从机制很类同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点就行消费,以为消息的拷贝和同步会占用很大的带宽和网络资源。在后续的rabbtmq中会有使用。

集群模式3-多集群同步部署

在这里插入图片描述
和上面区别不大,但是它的写入是可以任意节点写

集群模式4-多集群转发部署

在这里插入图片描述
如果你插入的数据是broker-1中,元数据信息会存储数据的相关描述和记录存放的位置(队列)。
它会对描述信息也就是元数据信息就行同步,如果消费者在broker-2中进行消费,发现自己几点没有对应的消息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他会去联系其他的黄牛询问,如果有就返回。

集群模式5-Master-slave与Breoker-cluster组合的方案

在这里插入图片描述
反正终归三句话:1:要么消息共享,2︰要么消息同步 3:要么元数据共享

什么是高可靠

所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠.

在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。如何保证中间件消息的可靠性呢?可以从两个方面考虑:
1:消息的传输:通过协议来保证系统间数据解析的正确性。
2:消息的存储可靠:通过持久化来保证消息的可靠性。

RabbitMQ入门

什么是RabbitMQ

简单概述:
RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征。
在这里插入图片描述

官网地址

RabbitMQ安装

安装包下载

RabbitMQ下载地址:https://www.rabbitmq.com/download.html
环境准备:CentOS7.X+/Erlang

注意:下载RabbitMQ和Erlang要注意版本号对应
在这里插入图片描述
Erlang下载地址:https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm

安装Erlang

上传rpm包到CentOS服务器上
在这里插入图片描述

# 执行
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
# 如果遇到依赖检测失败 则执行 yum -y install epel-release 然后再执行上一条命令
# 安装
sudo yum install erlang
# 查看erlang 退出用 halt().
erl
# 查看安装路径
whereis erlang

socat安装

# RabbitMQ 安装需要依赖socat
yum install -y socat

RabbitMQ安装

# 安装
rpm -Uvh rabbitmq-server-3.8.19-1.el7.noarch.rpm
# 
yum install rabbitmq-server -y
# 启动rabbitmq
systemctl start rabbitmq-server
# 查看rabbitmq状态
systemctl status rabbitmq-server

查看RabbitMQ状态
在这里插入图片描述

# 设置rabbitmq开机自启
systemctl enable rabbitmq-server
# 停止rabbitmq
systemctl stop rabbitmq-server

RabbitMQWeb管理界面

默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效

# 开启
rabbitmq-plugins enable rabbitmq_management

说明: rabbitmq有一个默认账号和密码是:guest|默认情况只能在localhost本机下访问,所以需要添加一个远程登录的用户。

在这里插入图片描述

# 重启服务
systemctl restart rabbitmq-server

阿里云等服务器放行15672端口

浏览器访问 http://IP地址:15672

在这里插入图片描述

授权账号和密码

# 新增用户
rabbitmqctl add_user admin admin
# 设置用户分配操作权限
rabbitmqctl set_user_tags admin administrator

用户级别:

  • 1、administrator可以登录控制台、查看所有信息、可以对rabbitmq进行管理·
  • 2、monitoring 监控者登录控制台,查看所有信息
  • 3、policymaker 策略制定者登录控制台,指定策略
  • 4、managment普通管理员登录控制台
  • 5、none 不能访问

授权角色权限

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

之后就可以通过新账户密码登录图形化界面了!
在这里插入图片描述
其他命令:
在这里插入图片描述

RabbitMQ入门案例-Simple简单模式

创建项目

创建Maven简单项目

由于是入门项目,所以只需要导入RabbitMQ原生依赖即可

         <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>

编写生产者

/**
 * 生产者
 * @author Tu_Yooo
 * @Date 2021/7/27 15:10
 */
public class Producer {
    public static void main(String[] args) {
        //所有中间件技术都是基于tcp/ip协议基础上构建的新型协议 RabbitMQ遵循的是amqp
        //只要是tcp/ip协议永远都逃不掉 的是ip 和端口号port

        //1.创建链接工程 设置账号密码等连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.108.204.96");
        factory.setUsername("root"); //用户名 
        factory.setPassword("XXXXX"); //密码
        factory.setPort(5672);
        factory.setVirtualHost("/");//设置虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建链接connection
            connection = factory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();
            // 4.通过创建交换机 队列 绑定关系 路由Key 发生消息 和接收消息
            String queueName = "queue1"; //队列名称

            /**
             * @params1 队列名称
             * @params2 是否持久化
             * @params3 是否具有排他性 是否独占队列
             * @params4 是否自动删除队列 最后一个消息被消费完 是否删除队列
             * @params5 携带附加参数
             */
            channel.queueDeclare(queueName,false,false,false,null);
            // 5.准备消息
            String message = "Hello,RabbitMQ!";
            // 6.发生消息给队列
            /**
             * @params1 交换机
             * @params2 队列名 路由Key
             * @params3 消息是否持久化
             * @params5 消息主体
             */
           channel.basicPublish("",queueName,null,message.getBytes());

            System.out.println("消息发生完毕");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection!=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

编写消费者

/**
 * 消费者
 * @author Tu_Yooo
 * @Date 2021/7/27 15:10
 */
public class Consumer {

    public static void main(String[] args) {
        //所有中间件技术都是基于tcp/ip协议基础上构建的新型协议 RabbitMQ遵循的是amqp
        //只要是tcp/ip协议永远都逃不掉 的是ip 和端口号port

        //1.创建链接工程 设置账号密码等连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.108.204.96");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        factory.setVirtualHost("/");//设置虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建链接connection
            connection = factory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();
            // 4.通过创建交换机 队列 绑定关系 路由Key 发生消息 和接收消息
            String queueName = "queue1"; //队列名称

            //消费消息
            channel.basicConsume(queueName, true, new DeliverCallback() { //接收消息成功时执行
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("接收消息:" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() { //接收消息失败时执行
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接收消息失败");
                }
            });
            System.in.read();//阻断 不往下执行
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection!=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

RabbitMQ核心组成部分

在这里插入图片描述
核心概念:

Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。安装rabbitmq-server

Connection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手

Channel :网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。

Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。

Virtual Host虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若千个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange

Exchange :交换机,接受消息,根据路由键发送消息到绑定的队列。(不具备消息存储的能力)Bindings : Exchange和Queue之问的虚拟连接,binding中可以保护多个routing key.

Routing key :是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。

Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

在这里插入图片描述

RabbitMQ消息模式

官网教程

Simple简单模式

在这里插入图片描述

代码参考本文入门案例

注意点:
1.从官网图片上来看,虽然没有显示交换机,实则如果没有指定交换机,会有一个默认交换机,来将我们的消息推送到队列中

在这里插入图片描述

fanout发布订阅模式

在这里插入图片描述
生产者 生产消息 每一个消费者都可以收到相同的消息

1.图型化界面创建fanout交换机
在这里插入图片描述
2.交换机绑定队列
在这里插入图片描述
3.如果没有队列,提取创建好队列
在这里插入图片描述
4.编写生产者

/**
 *  发布订阅
 * 生产者
 * @author Tu_Yooo
 * @Date 2021/7/27 15:10
 */
public class Producer {
    public static void main(String[] args) {
        //所有中间件技术都是基于tcp/ip协议基础上构建的新型协议 RabbitMQ遵循的是amqp
        //只要是tcp/ip协议永远都逃不掉 的是ip 和端口号port

        //1.创建链接工程 设置账号密码等连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.108.204.96");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        factory.setVirtualHost("/");//设置虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建链接connection
            connection = factory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();
            // 4.通过创建交换机 队列 绑定关系 路由Key 发生消息 和接收消息
            String queueName = "queue1"; //队列名称

            /**
             * @params1 队列名称
             * @params2 是否持久化
             * @params3 是否具有排他性 是否独占队列
             * @params4 是否自动删除队列 最后一个消息被消费完 是否删除队列
             * @params5 携带附加参数
             */
            channel.queueDeclare(queueName,false,false,false,null);
            // 5.准备消息
            String message = "Hello,RabbitMQ!";

            //准备交换机
            String exchangeName = "faout-exchange";

            //准备路由Key
            String routeName = "";

            //指定交换机的类型
            String type = "fanout";

            // 6.发生消息给队列
            /**
             * @params1 交换机
             * @params2 队列名 路由Key
             * @params3 消息是否持久化
             * @params5 消息主体
             */
            channel.basicPublish(exchangeName,routeName,null,message.getBytes());

            System.out.println("消息发生完毕");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection!=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

5.编写消费者

/**
 * 发布订阅
 * 消费者
 * @author Tu_Yooo
 * @Date 2021/7/27 15:10
 */
public class Consumer implements Runnable{

    public static void main(String[] args) {
        new Thread(new Consumer(),"queue1").start();
        new Thread(new Consumer(),"queue2").start();
    }

    public void run() {
        //所有中间件技术都是基于tcp/ip协议基础上构建的新型协议 RabbitMQ遵循的是amqp
        //只要是tcp/ip协议永远都逃不掉 的是ip 和端口号port

        //1.创建链接工程 设置账号密码等连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.108.204.96");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        factory.setVirtualHost("/");//设置虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建链接connection
            connection = factory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();
            // 4.通过创建交换机 队列 绑定关系 路由Key 发生消息 和接收消息
            String queueName = Thread.currentThread().getName(); //队列名称

            //消费消息
            channel.basicConsume(queueName, true, new DeliverCallback() { //接收消息成功时执行
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("接收消息:" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() { //接收消息失败时执行
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接收消息失败");
                }
            });
            System.in.read();//阻断 不往下执行
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection!=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

可以看到,在代码中我们并没有创建交换机,指定队列,那是因为我们在图形化界面中已经提前创建并绑定好了

Routing路由模式

在这里插入图片描述
消息携带路由Key,根据消费者Key匹配消息推送到哪个消费者

1.创建direct交换机,绑定路由Key
在这里插入图片描述
2.编写生产者

/**
 *  路由模式
 * 生产者
 * @author Tu_Yooo
 * @Date 2021/7/27 15:10
 */
public class Producer {
    public static void main(String[] args) {
        //所有中间件技术都是基于tcp/ip协议基础上构建的新型协议 RabbitMQ遵循的是amqp
        //只要是tcp/ip协议永远都逃不掉 的是ip 和端口号port

        //1.创建链接工程 设置账号密码等连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.108.204.96");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        factory.setVirtualHost("/");//设置虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建链接connection
            connection = factory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();
            // 4.通过创建交换机 队列 绑定关系 路由Key 发生消息 和接收消息
            String queueName = "queue1"; //队列名称

            /**
             * @params1 队列名称
             * @params2 是否持久化
             * @params3 是否具有排他性 是否独占队列
             * @params4 是否自动删除队列 最后一个消息被消费完 是否删除队列
             * @params5 携带附加参数
             */
            channel.queueDeclare(queueName,false,false,false,null);
            // 5.准备消息
            String message = "Hello,RabbitMQ!";

            //准备交换机
            String exchangeName = "direct-exchange";

            //准备路由Key
            String routeName = "email";

            //指定交换机的类型
            String type = "direct";

            // 6.发生消息给队列
            /**
             * @params1 交换机
             * @params2 队列名 路由Key
             * @params3 消息是否持久化
             * @params5 消息主体
             */
            channel.basicPublish(exchangeName,routeName,null,message.getBytes());

            System.out.println("消息发生完毕");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection!=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

3.消费者不用变动

Topics主题模式

在这里插入图片描述
主题模式在路由模式之上又进行了升级,可以根据路由Key模糊匹配推送到指定队列

#:代表零级或多级,用图上的列子来说它可以是这样的lazy.XXXX.XXX.XXX,lazy
*:代表一级,它有且只能是这样的XXX.orange.XXX

1.创建topic交换机
在这里插入图片描述
2.绑定队列
在这里插入图片描述
3.编写生产者

/**
 *  路由模式
 * 生产者
 * @author Tu_Yooo
 * @Date 2021/7/27 15:10
 */
public class Producer {
    public static void main(String[] args) {
        //所有中间件技术都是基于tcp/ip协议基础上构建的新型协议 RabbitMQ遵循的是amqp
        //只要是tcp/ip协议永远都逃不掉 的是ip 和端口号port

        //1.创建链接工程 设置账号密码等连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.108.204.96");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        factory.setVirtualHost("/");//设置虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建链接connection
            connection = factory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();
            // 4.通过创建交换机 队列 绑定关系 路由Key 发生消息 和接收消息
            String queueName = "queue1"; //队列名称

            /**
             * @params1 队列名称
             * @params2 是否持久化
             * @params3 是否具有排他性 是否独占队列
             * @params4 是否自动删除队列 最后一个消息被消费完 是否删除队列
             * @params5 携带附加参数
             */
            channel.queueDeclare(queueName,false,false,false,null);
            // 5.准备消息
            String message = "Hello,RabbitMQ!";

            //准备交换机
            String exchangeName = "topic-exchange";

            //准备路由Key
            String routeName = "xxxx.order";

            //指定交换机的类型
            String type = "topic";

            // 6.发生消息给队列
            /**
             * @params1 交换机
             * @params2 队列名 路由Key
             * @params3 消息是否持久化
             * @params5 消息主体
             */
            channel.basicPublish(exchangeName,routeName,null,message.getBytes());

            System.out.println("消息发生完毕");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection!=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

Headers参数模式

根据参数匹配到对应的队列

1.创建headers交换机,根据参数绑定队列
在这里插入图片描述
2.未来就可以通过携带附加参数,来匹配到对应的队列
在这里插入图片描述

完整的声明创建方式

在以上的案例中,都是通过图形化界面与代码相互配合,通过图形化界面先创建好交换机,绑定上队列,然后在代码中直接使用

那么能不能通过代码的方式创建交换机绑定队列呢?

/**
 *  完整的声明方式
 * 生产者
 * @author Tu_Yooo
 * @Date 2021/7/27 15:10
 */
public class Producer {
    public static void main(String[] args) {
        //所有中间件技术都是基于tcp/ip协议基础上构建的新型协议 RabbitMQ遵循的是amqp
        //只要是tcp/ip协议永远都逃不掉 的是ip 和端口号port

        //1.创建链接工程 设置账号密码等连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("47.108.204.96");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setPort(5672);
        factory.setVirtualHost("/");//设置虚拟访问节点
        Connection connection = null;
        Channel channel = null;
        try {
            // 2.创建链接connection
            connection = factory.newConnection("生产者");
            // 3.通过连接获取通道Channel
            channel = connection.createChannel();
            // 4.通过创建交换机 队列 绑定关系 路由Key 发生消息 和接收消息
            String queueName = "queue1"; //队列名称

            /**
             * @params1 队列名称
             * @params2 是否持久化
             * @params3 是否具有排他性 是否独占队列
             * @params4 是否自动删除队列 最后一个消息被消费完 是否删除队列
             * @params5 携带附加参数
             */
            channel.queueDeclare(queueName,false,false,false,null);
            // 5.准备消息
            String message = "Hello,RabbitMQ!";

            //准备交换机
            String exchangeName = "direct-message-exchange";

            //准备路由Key
            String routeName = "email";

            //指定交换机的类型
            String type = "direct";

            //声明交换机 参数1:交换机名称 参数2:交换机类型 参数3:是否持久化
            channel.exchangeDeclare(exchangeName,type,true);

            //声明队列
            //参数1: 队列名 参数2:是否持久化 参数3:是否具有排他性  参数4:是否自动删除  参数5:附加参数 headers模式中根据参数绑定队列
            channel.queueDeclare("queue5",true,false,false,null);
            channel.queueDeclare("queue6",true,false,false,null);
            channel.queueDeclare("queue7",true,false,false,null);

            //绑定队列 参数1:队列名 参数2:交换机名 参数3:路由Key direct中是需要绑定路由Key的
            channel.queueBind("queue5",exchangeName,"email");
            channel.queueBind("queue6",exchangeName,"sms");
            channel.queueBind("queue7",exchangeName,"order");

            // 6.发生消息给队列
            /**
             * @params1 交换机
             * @params2 队列名 路由Key
             * @params3 消息是否持久化
             * @params5 消息主体
             */
            channel.basicPublish(exchangeName,routeName,null,message.getBytes());

            System.out.println("消息发生完毕");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if (channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection!=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}

Word工作队列模式

在这里插入图片描述

轮询模式

轮询模式的分发:一个消费者—条,按均分配;

公平分发

公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-29 11:43:03  更:2021-07-29 11:43:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 23:15:23-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码