IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【RabbitMQ】RabbitMQ 学习教程(二)RabbitMQ 的简单模型 -> 正文阅读

[大数据]【RabbitMQ】RabbitMQ 学习教程(二)RabbitMQ 的简单模型

RabbitMQ 常用的工作模式有以下五种:

  1. 简单模式
  2. 工作队列模式(轮询、公平分发)
  3. 发布/订阅模式(fanout 路由键在这个模式中没有意义)
  4. 路由模式(direct)
  5. 主题模式(topic 路由中的 #:表示0个或多个;*:表示至少只有一个)

简单模式

简单模型如下图:

在这里插入图片描述
在上图的模型中,有以下概念:

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来
  • Queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

这种模型,在 RabbitMQ 中是如何使用的呢?

接下来,咱们分为两种方式:直接在后台管理页面进行操作;通过代码进行实现

后台管理页面

对于这种模型,在后台管理页面,是以交换机 Exchange 为生产者去生产消息,队列为消费者去消费消息

首先开启 RabbitMQ 服务,浏览器访问:http://localhost:15672,会显示如下登陆界面:
在这里插入图片描述
输入账号、密码:

// 默认都是 guest
guest
guest

登录成功后,显示如下页面:
在这里插入图片描述
可以上这个界面上添加交换机、队列、用户等信息。

先通过页面添加一个队列吧:

点击“Queues”,会显示队列的相关信息
在这里插入图片描述
然后输入需要添加队列的信息,点击 “Add queue” 按钮,即可添加一个队列。这里以 “simple_queue” 队列为例:
在这里插入图片描述
添加完队列后,可在当前页面查看:
在这里插入图片描述
点击某个队列的名称,可查看某个队列的具体信息:
在这里插入图片描述
队列添加完了,咋们来看看交换机:
在这里插入图片描述
得记住一个交换机:AMQP default 交换机,它是默认的交换机。

简单模型中的队列,就是绑定的默认的交换机

点击这个默认的交换机 “AMQP default”,可查看其详情:
在这里插入图片描述
通过交换机发送消息:
在这里插入图片描述
点击 “Publish message”,会将内容 “Hello RabbitMQ!!” 发送到队列 “simple_queue”。

然后,咱们去队列中查看:
在这里插入图片描述
发现,simple_queue 队列中确实收到一条消息。

那么,我们如何获取这条消息呢?

点击 “simple_queue” 队列,进入到队列详情中,然后,找到 “Get messages”,并点击 “Get message(s)” 按钮:
在这里插入图片描述
发现,能获取到之前交换机发送的消息。

【注意】:在获取消息时,有个确认模式 Ack Mode。这里选择 “Nack message requeue true”,而不要选择 Ack message requeue false,尤其是在生产环境上。

那么 确认模式 Ack Mode:Nack message requeue true 和 Ack message requeue false 有什么区别呢?

Nack message requeue true:能获取到消息的内容,但不会真正的去消费消息
Ack message requeue false:在获取消息的内容的同时,它会真正的去消费消息。就是,通过这种确认模式,它会真正地去消费消息。

在 RabbitMQ 的页面上也可以直观地看到:

使用 Nack message requeue true 去获取消息时,消息的总数不会变化;使用 Ack message requeue false 去获取消息时,消息的总数会变化,而且是减一呢!!如下图,
在这里插入图片描述

代码实现

新建一个 Maven 工程,引入 POM 依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

代码编写步骤:

  1. 创建连接工厂
  2. 创建连接 Connection
  3. 通过连接获取通道 Channel
  4. 通过通道创建 交换机、声明队列、绑定关系、路由 Key、发送消息、接收消息
  5. 准备消息内容
  6. 发送消息给队列 Queue
  7. 关闭通道
  8. 关闭连接

新建一个生产者 Producer:

public class Producer {

    public static void main(String[] args) {
        // 1. 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // 2. 创建连接 Connection
            connection = connectionFactory.newConnection("生产者");
            // 3. 通过连接获取通道 Channel
            channel = connection.createChannel();
            String queueName = "code_simple_queue1";
            // 4. 通过通道创建声明队列
            /**
             * @param1 队列名称
             * @param2 是否持久化 持久化:RabbitMQ服务器重启,队列还存在;反之,不存在。
             *          非持久化的队列中的消息会存盘,不会随着服务器的重启会消失
             * @param3 排他性 是否独占一个队列(一般不会)
             * @param4 是否自动删除 随着最后一个消费者消费消息完毕后,是否自动删除这个队列
             * @param5 携带一些附加信息 供其它消费者获取
             */
            channel.queueDeclare(queueName, false, false, false, null);
            // 5. 准备消息内容
            String message = "Hello RabbitMQ";
            // 6. 发送消息给队列 Queue
            /**
             * @param1 交换机
             * @param2 队列、路由Key
             * @param3 消息的状态控制
             * @param4 消息内容
             */
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("消息发送完成~~~发送的消息为:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel && channel.isOpen()) {
                try {
                    // 7. 关闭通道
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            if (null != connection && connection.isOpen()) {
                try {
                    // 8. 关闭连接
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行生产者程序后,
在这里插入图片描述
控制台显示发送消息成功。然后,去后台管理页面查看:
在这里插入图片描述
确实发送了一条消息到 MQ 中去了。

queueDeclare() 方法详解

queueDeclare() 方法有两个重载方法:

com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException;

com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;

queueDeclare() 方法的参数详解说明如下:

  1. param1:队列名称
  2. param2:设置是否持久化。持久化:RabbitMQ服务器重启,队列还存在;反之,不存在。非持久化的队列中的消息会存盘,不会随着服务器的重启会消失。
  3. param3:排他性。是否独占一个队列(一般不会)。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除(即使该队列是持久化的)
  4. param4:设置是否自动删除。自动删除的前提:至少有一个消费者连接到这个队列,之后,所有与这个队列连接的所有客户端断开时,才会自动删除
  5. param5:携带一些附加信息 供其它消费者获取

basicPublish() 方法详解

basicPublish() 方法有三个重载方法:

void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;

void basicPublish(String var1, String var2, boolean var3, BasicProperties var4, byte[] var5) throws IOException;

void basicPublish(String var1, String var2, boolean var3, boolean var4, BasicProperties var5, byte[] var6) throws IOException;

针对第一个方法的参数说明:

  1. param1:交换机名称。如何设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换机中
  2. param2:路由键。交换机根据路由键将消息存储到相应的队列中
  3. param3:消息的基本属性集。
  4. param4:消息体。真正需要发送的消息

再新建一个消费者:

public class Consumer {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者");
            channel = connection.createChannel();

            String queueName = "code_simple_queue1";
			
			// 定义消费者:这里使用了一个匿名内部类
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            	// 这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 交换机
                    String exchange = envelope.getExchange();
                    // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                    long deliveryTag = envelope.getDeliveryTag();
                    // body 消息体
                    String msg = new String(body,"utf-8");
                    System.out.println("收到消息:" + msg);
                }
            };
  
             // 监听队列
            channel.basicConsume(queueName, true, consumer);

            System.out.println("开始接收消息~~~");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != channel && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            if (null != connection && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

接收消息时,一般通过实现 com.rabbitmq.client.Consumer 接口或者继承 DefaultConsumer 类来实现。当调用与 Consumer 相关的 API 时,不同的订阅采用不同的消费者标签 ConsumerTag 来区分彼此。在同一个 Channel 中的消费者也需要通过唯一的消费者标签以作区分。

basicConsume() 方法详解

basicConsume() 方法有好多重载方法,这里只描述文中用到的方法:

String basicConsume(String var1, boolean var2, Consumer var3) throws IOException;

basicConsume() 方法参数说明:

  1. param1:队列名称
  2. param2:设置是否自动确认。当消费者接收到消息后要告诉 mq 消息已接收,如果将此参数设置为 true 表示会自动回复 mq,如果设置为 false,要通过编程实现回复
  3. param3:设置消费者的回调函数,用来处理 RabbitMQ 推送过来的消息

上述的监听队列: channel.basicConsume(queueName, true, consumer); 中的 consumer 使用匿名内部类,这里我们可以将其修改为 lambad 表达式进行表示:

...
connection = connectionFactory.newConnection("消费者");
channel = connection.createChannel();

String queueName = "code_simple_queue1";
// 监听队列
channel.basicConsume(queueName, true, (String consumerTag, Delivery message) -> {
            System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));

        }, (String consumerTag) -> System.out.println("接收消息失败")
);
...

此时,用到的重载方法如下:

String basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) throws IOException;

各参数的说明如下:

  1. param1:队列名称
  2. param2:设置是否自动确认。当消费者接收到消息后要告诉 mq 消息已接收,如果将此参数设置为 true 表示会自动回复 mq,如果设置为 false,要通过编程实现回复
  3. param3:设置消费者获取消息成功的回调函数
  4. param4:设置消费者获取消息失败的回调函数

当然,运行消费者程序,成功消费消息:
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-29 09:09:56  更:2021-08-29 09:25:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:44:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码