RabbitMQ 常用的工作模式有以下五种:
- 简单模式
- 工作队列模式(轮询、公平分发)
- 发布/订阅模式(fanout 路由键在这个模式中没有意义)
- 路由模式(direct)
- 主题模式(topic 路由中的 #:表示0个或多个;*:表示至少只有一个)
简单模式
简单模型如下图:
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来
- Queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
这种模型,在 RabbitMQ 中是如何使用的呢?
接下来,咱们分为两种方式:直接在后台管理页面进行操作;通过代码进行实现
后台管理页面
对于这种模型,在后台管理页面,是以交换机 Exchange 为生产者去生产消息,队列为消费者去消费消息
首先开启 RabbitMQ 服务,浏览器访问:http://localhost:15672,会显示如下登陆界面: 输入账号、密码:
// 默认都是 guest
guest
guest
登录成功后,显示如下页面: 可以上这个界面上添加交换机、队列、用户等信息。
先通过页面添加一个队列吧:
点击“Queues”,会显示队列的相关信息 然后输入需要添加队列的信息,点击 “Add queue” 按钮,即可添加一个队列。这里以 “simple_queue” 队列为例: 添加完队列后,可在当前页面查看: 点击某个队列的名称,可查看某个队列的具体信息: 队列添加完了,咋们来看看交换机: 得记住一个交换机:AMQP default 交换机,它是默认的交换机。
简单模型中的队列,就是绑定的默认的交换机。
点击这个默认的交换机 “AMQP default”,可查看其详情: 通过交换机发送消息: 点击 “Publish message”,会将内容 “Hello RabbitMQ!!” 发送到队列 “simple_queue”。
然后,咱们去队列中查看: 发现,simple_queue 队列中确实收到一条消息。
那么,我们如何获取这条消息呢?
点击 “simple_queue” 队列,进入到队列详情中,然后,找到 “Get messages”,并点击 “Get message(s)” 按钮: 发现,能获取到之前交换机发送的消息。
【注意】:在获取消息时,有个确认模式 Ack Mode。这里选择 “Nack message requeue true” ,而不要选择 Ack message requeue false ,尤其是在生产环境上。
那么 确认模式 Ack Mode:Nack message requeue true 和 Ack message requeue false 有什么区别呢?
Nack message requeue true :能获取到消息的内容,但不会真正的去消费消息 Ack message requeue false :在获取消息的内容的同时,它会真正的去消费消息。就是,通过这种确认模式,它会真正地去消费消息。
在 RabbitMQ 的页面上也可以直观地看到:
使用 Nack message requeue true 去获取消息时,消息的总数不会变化;使用 Ack message requeue false 去获取消息时,消息的总数会变化,而且是减一呢!!如下图,
代码实现
新建一个 Maven 工程,引入 POM 依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
代码编写步骤:
- 创建连接工厂
- 创建连接 Connection
- 通过连接获取通道 Channel
- 通过通道创建 交换机、声明队列、绑定关系、路由 Key、发送消息、接收消息
- 准备消息内容
- 发送消息给队列 Queue
- 关闭通道
- 关闭连接
新建一个生产者 Producer:
public class Producer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("生产者");
channel = connection.createChannel();
String queueName = "code_simple_queue1";
channel.queueDeclare(queueName, false, false, false, null);
String message = "Hello RabbitMQ";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送完成~~~发送的消息为:" + message);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channel && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (null != connection && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
运行生产者程序后, 控制台显示发送消息成功。然后,去后台管理页面查看: 确实发送了一条消息到 MQ 中去了。
queueDeclare() 方法详解
queueDeclare() 方法有两个重载方法:
com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare() throws IOException;
com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
queueDeclare() 方法的参数详解说明如下:
- param1:队列名称
- param2:设置是否持久化。持久化:RabbitMQ服务器重启,队列还存在;反之,不存在。非持久化的队列中的消息会存盘,不会随着服务器的重启会消失。
- param3:排他性。是否独占一个队列(一般不会)。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除(即使该队列是持久化的)
- param4:设置是否自动删除。自动删除的前提:至少有一个消费者连接到这个队列,之后,所有与这个队列连接的所有客户端断开时,才会自动删除
- param5:携带一些附加信息 供其它消费者获取
basicPublish() 方法详解
basicPublish() 方法有三个重载方法:
void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;
void basicPublish(String var1, String var2, boolean var3, BasicProperties var4, byte[] var5) throws IOException;
void basicPublish(String var1, String var2, boolean var3, boolean var4, BasicProperties var5, byte[] var6) throws IOException;
针对第一个方法的参数说明:
- param1:交换机名称。如何设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换机中
- param2:路由键。交换机根据路由键将消息存储到相应的队列中
- param3:消息的基本属性集。
- param4:消息体。真正需要发送的消息
再新建一个消费者:
public class Consumer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者");
channel = connection.createChannel();
String queueName = "code_simple_queue1";
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String msg = new String(body,"utf-8");
System.out.println("收到消息:" + msg);
}
};
channel.basicConsume(queueName, true, consumer);
System.out.println("开始接收消息~~~");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channel && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if (null != connection && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
接收消息时,一般通过实现 com.rabbitmq.client.Consumer 接口或者继承 DefaultConsumer 类来实现。当调用与 Consumer 相关的 API 时,不同的订阅采用不同的消费者标签 ConsumerTag 来区分彼此。在同一个 Channel 中的消费者也需要通过唯一的消费者标签以作区分。
basicConsume() 方法详解
basicConsume() 方法有好多重载方法,这里只描述文中用到的方法:
String basicConsume(String var1, boolean var2, Consumer var3) throws IOException;
basicConsume() 方法参数说明:
- param1:队列名称
- param2:设置是否自动确认。当消费者接收到消息后要告诉 mq 消息已接收,如果将此参数设置为 true 表示会自动回复 mq,如果设置为 false,要通过编程实现回复
- param3:设置消费者的回调函数,用来处理 RabbitMQ 推送过来的消息
上述的监听队列: channel.basicConsume(queueName, true, consumer); 中的 consumer 使用匿名内部类,这里我们可以将其修改为 lambad 表达式进行表示:
...
connection = connectionFactory.newConnection("消费者");
channel = connection.createChannel();
String queueName = "code_simple_queue1";
channel.basicConsume(queueName, true, (String consumerTag, Delivery message) -> {
System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
}, (String consumerTag) -> System.out.println("接收消息失败")
);
...
此时,用到的重载方法如下:
String basicConsume(String var1, boolean var2, DeliverCallback var3, CancelCallback var4) throws IOException;
各参数的说明如下:
- param1:队列名称
- param2:设置是否自动确认。当消费者接收到消息后要告诉 mq 消息已接收,如果将此参数设置为 true 表示会自动回复 mq,如果设置为 false,要通过编程实现回复
- param3:设置消费者获取消息成功的回调函数
- param4:设置消费者获取消息失败的回调函数
当然,运行消费者程序,成功消费消息:
|