RabbitMQ工作原理
RabbitMQ图形化界面
-
点击add创建队列 D:代表持久态队列,随着rabbitmq服务的重启或宕机,队列依然存在,消息也会得到持久化 -
点击创建的队列queue1 overview:队列的基本运行状况 consumer:消费者 bindings:绑定交换机和队列的关系,默认绑定默认交换机 publish messages:往队列发送消息 get messages:获取消息,nack 消息预览;ack消息消费后应答,队列会移除消息 move messages:移动消息到其他队列 delete:删除队列 purge:清空队列 runtime:运行时状态 -
发布消息 -
接收消息 生产者生产消息发送到交换机,交换机将消息推送到队列,消费者监听队列
Hello World 模式
package com.rainhey.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public final static String QueueName="hello";
public static void main(String[] args){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.100");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Channel channel = null;
try {
Connection connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QueueName, false,false,false,null);
String message="hello world";
channel.basicPublish("", QueueName, null, message.getBytes());
System.out.println("消息发送完毕!");
}
catch (Exception e){
e.printStackTrace();
System.out.println("发送消息出现异常!!");
}
}
}
package com.rainhey.demo;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QueueName="hello";
public static void main(String[] args){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.100");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Channel channel = null;
try {
Connection connection = connectionFactory.newConnection();
channel = connection.createChannel();
System.out.println("等待接受消息中");
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
channel.basicConsume(QueueName, true,deliverCallback,cancelCallback );
} catch (Exception e) {
e.printStackTrace();
}
}
}
工作队列模式
轮询分发消息
同一队列连接多个消费者,默认轮询分发
package com.rainhey.demo1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.100");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
package com.rainhey.demo1;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer0 {
public static final String QueueName="hello1";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(QueueName, false, false, false, null);
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()){
String next = scanner.next();
channel.basicPublish("" ,QueueName, null, next.getBytes());
System.out.println("发送消息:"+ next);
}
}
}
package com.rainhey.demo1;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer0 {
public static final String QueueName="hello1";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
DeliverCallback deliverCallback=(String var1, Delivery var2)->{
System.out.println("收到消息:"+new String(var2.getBody()));
};
CancelCallback cancelCallback=(String var1)->{
System.out.println("消费中断!!");
};
channel.basicConsume(QueueName, true, deliverCallback, cancelCallback);
}
}
开启一个生产者进程,两个消费者进程,生产者发送六条消息,结果如下
消息应答
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
自动应答(默认)
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消费者那边出现连接或者 channel 关闭,那么消息就丢失了;如果消费者这边由于接收太多还来不及处理的消息,导致消费者进程内存耗尽,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
手动应答
保证rabbitmq发过来的消息不丢失 手动消息应答的方法
- Channel.basicAck(用于肯定确认):RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
- Channel.basicNack(用于否定确认)
- Channel.basicReject(用于否定确认):不处理该消息了直接拒绝
手动应答原理:若某消费者没有完成消费,则无法发送ack应答,此时已经发送的消息不会丢失,消息会重新入队,由其他消费者进行消费 手动应答只需少量改动以上消费者代码,其余代码不变
package com.rainhey.demo1;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer0 {
public static final String QueueName="hello1";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
DeliverCallback deliverCallback=(String var1, Delivery var2)->{
channel.basicAck(var2.getEnvelope().getDeliveryTag(), false);
System.out.println("收到消息:"+new String(var2.getBody()));
};
CancelCallback cancelCallback=(String var1)->{
System.out.println("消费中断!!");
};
boolean autoAck = false;
channel.basicConsume(QueueName, autoAck, deliverCallback, cancelCallback);
}
}
生产者已经发送3条消息 第四条消息本该第二个消费者消费,此时关闭第二个消费者,发送第四条消息,该消息最终被第一个消费者消费
RabbitMQ 持久化
保证当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失,确保消息不会丢失需要做两件事,需要将队列和消息都标记为持久化。
队列持久化
queueDeclare第二个参数该为true,即使重启 rabbitmq 队列也依然存在
消息持久化
第三个参数改为MessageProperties.PERSISTENT_TEXT_PLAIN,告诉 RabbitMQ 将消息保存到磁盘
不公平分发
设置参数channel.basicQos(1)指该消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者
交换机
概念
生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们,这就的由交换机的类型来决定
临时队列
一旦我们断开了消费者的连接,队列将被自动删除 String queueName = channel.queueDeclare().getQueue();
绑定
绑定交换机和队列
Fanout模式
- 创建交换机
- 创建三个队列
- 绑定交换机和队列
- fanout-exchange交换机发布消息
-三个队列都受到消息 代码
package com.rainhey.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.100");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
package com.rainhey.demo2;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class SendLogs {
public static final String ExchangeName="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(ExchangeName, "fanout");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String next = scanner.next();
channel.basicPublish(ExchangeName, "", null, next.getBytes());
}
}
}
package com.rainhey.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogs01 {
public static final String ExchangeName="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(ExchangeName, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, ExchangeName, "");
System.out.println("等待接收消息,消息打印在屏幕上!!");
DeliverCallback deliverCallback=(var1,var2)->{
System.out.println("控制台接收的消息"+new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag->{});
}
}
开一个生产者,两个消费者,结果如下
路由模式
package com.rainhey.demo3;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.100");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
package com.rainhey.demo3;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import javax.swing.event.ChangeEvent;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Producer {
public static final String ExchangeName="directLogs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.DIRECT);
Map<String,String> keyMap=new HashMap<>();
keyMap.put("info", "info消息");
keyMap.put("error", "error消息");
keyMap.put("warning", "warning消息");
keyMap.put("debug", "debug消息");
for(Map.Entry<String,String> keyEntry: keyMap.entrySet()){
channel.basicPublish(ExchangeName, keyEntry.getKey(), null, keyEntry.getValue().getBytes());
System.out.println("生产者发出消息:"+keyEntry.getValue());
}
}
}
package com.rainhey.demo3;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import javax.swing.event.ChangeEvent;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDirect1 {
public static final String ExchangeName="directLogs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.DIRECT);
String queueName="queueD1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, ExchangeName, "error");
System.out.println("等待接收消息!!");
DeliverCallback deliverCallback=(var1,var2)->{
System.out.println(new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag->{});
}
}
package com.rainhey.demo3;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDirect2 {
public static final String ExchangeName="directLogs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.DIRECT);
String queueName="queueD2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, ExchangeName, "info");
channel.queueBind(queueName, ExchangeName, "warning");
System.out.println("等待接收消息!!");
DeliverCallback deliverCallback=(var1, var2)->{
System.out.println(new String(var2.getBody(),"UTF-8"));
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag->{});
}
}
主题模式
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”,"quick.orange.rabbit"这种类型的。 在这个规则列表中,其中有两个替换符是大家需要注意的 *(星号)可以代替一个单词 #(井号)可以替代零个或多个单词
package com.rainhey.demo4;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.1.100");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
package com.rainhey.demo4;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey,
null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
package com.rainhey.demo4;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName="Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
package com.rainhey.demo4;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName="Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}
死信队列
概念
某些时候由于特定的原因导致 queue 中的某些消息无法被消费,当消息消费发生异常时,将消息投入死信队列中,比如用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信来源
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到MQ)
- 消息被拒绝
消息TTL过期
package com.rainhey.demo5;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static final String NormalExchange="normalExchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(NormalExchange, BuiltinExchangeType.DIRECT);
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 11; i++) {
String message="info"+i;
channel.basicPublish(NormalExchange, "zhangsan", properties, message.getBytes());
System.out.println("生产者发送消息:"+message);
}
}
}
package com.rainhey.demo5;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static final String normalExchange="normalExchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(normalExchange, BuiltinExchangeType.DIRECT);
Map<String,Object> params=new HashMap<>();
params.put("x-dead-letter-exchange", "deadExchange");
params.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare("normalQueue", false, false, false, params);
channel.queueBind("normalQueue", normalExchange, "zhangsan");
System.out.println("等待接收消息。。。。");
DeliverCallback deliverCallback=(var1,var2)->{
String message = new String(var2.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume("normalQueue", true, deliverCallback, (var)->{});
}
}
package com.rainhey.demo5;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer2 {
public static final String DeadExchange="deadExchange";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(DeadExchange, BuiltinExchangeType.DIRECT);
channel.queueDeclare("deadQueue", false, false, false, null);
channel.queueBind("deadQueue", DeadExchange, "lisi");
System.out.println("等待接受死信队列里的消息。。。");
DeliverCallback deliverCallback=(var1,var2)->{
String message = new String(var2.getBody(), "UTF-8");
System.out.println(message);
};
channel.basicConsume("deadQueue", true, deliverCallback, (var)->{});
}
}
启动consumer1然后关闭,启动producer和consumer2,consumer2收到消息
队列达到最大长度
在以上代码中改变如下:
- 消息生产者代码去掉 TTL 属性
- consumer1队列声明参数中添加params.put(“x-max-length”, 6);
- consumer2代码不变
装不下的消息送入到死信队列
消息被拒
- 生产者代码不变
- sonsumer1代码如下
package com.rainhey.demo5;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
public static final String normalExchange="normalExchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(normalExchange, BuiltinExchangeType.DIRECT);
Map<String,Object> params=new HashMap<>();
params.put("x-dead-letter-exchange", "deadExchange");
params.put("x-dead-letter-routing-key","lisi");
params.put("x-max-length", 6);
channel.queueDeclare("normalQueue", false, false, false, params);
channel.queueBind("normalQueue", normalExchange, "zhangsan");
System.out.println("等待接收消息。。。。");
DeliverCallback deliverCallback=(var1,var2)->{
String message = new String(var2.getBody(), "UTF-8");
if(message.equals("info5")){
System.out.println("Consumer1 接收到消息" + message + "并拒绝签收该消息");
channel.basicReject(var2.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer1 接收到消息"+message);
channel.basicAck(var2.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume("normalQueue", false, deliverCallback, (var)->{});
}
}
- consumer2代码不变
延迟队列
延时队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
- 用户注册成功后,如果三天内没有登陆则进行短信提醒
RabbitMQ 中的 TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间,单位是毫秒;如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信" 消息设置TTL和队列设置TTL区别: 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中);如果设置了消息TTL,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的, RabbitMQ 只会检查第一个消息是否过期,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间 如果不设置 TTL,表示消息永远不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃
设置队列TTL
- 创建工程
- 导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
- 修改配置文件
spring.rabbitmq.host=192.168.1.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
- 添加swagger配置类
package com.example.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig(){
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo(){
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.build();
}
}
- 代码架构图
- 配置文件类代码
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
public static final String xExchange="xExchange";
public static final String aQueue="aQueue";
public static final String bQueue="bQueue";
public static final String yExchange="yExchange";
public static final String dQueue="dQueue";
@Bean
public DirectExchange xExchange(){
return new DirectExchange(xExchange);
}
@Bean
public DirectExchange yExchange(){
return new DirectExchange(yExchange);
}
@Bean
public Queue aQueue(){
Map<String,Object> args=new HashMap<>();
args.put("x-dead-letter-exchange", yExchange);
args.put("x-dead-letter-routing-key","YD");
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(aQueue).withArguments(args).build();
}
@Bean
public Binding queueBindXA(@Qualifier("aQueue") Queue aQueue, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(aQueue).to(xExchange).with("XA");
}
@Bean
public Queue bQueue(){
Map<String,Object> args=new HashMap<>();
args.put("x-dead-letter-exchange", yExchange);
args.put("x-dead-letter-routing-key", "YD");
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(bQueue).withArguments(args).build();
}
@Bean
public Binding queueBindXB(@Qualifier("bQueue") Queue bQueue, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(bQueue).to(xExchange).with("XB");
}
@Bean
public Queue dQueue(){
return new Queue(dQueue);
}
@Bean
public Binding deadQueueBindY(@Qualifier("dQueue") Queue dQueue,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(dQueue).to(yExchange).with("YD");
}
}
- 生产者
package com.example.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
System.out.println("发送一条消息给两个队列");
rabbitTemplate.convertAndSend("xExchange", "XA", "来自ttl为10s的队列"+message);
rabbitTemplate.convertAndSend("xExchange", "XB", "来自ttl为40的队列"+message);
}
}
- 消费者
package com.example.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DeadQueueConsumer {
@RabbitListener(queues="dQueue")
public void receiveMsg(Message message, Channel channel) throws IOException {
String s = new String(message.getBody());
System.out.println(s);
}
}
- 发送请求 http://localhost:8080/ttl/sendMsg/hello
缺点:每增加一个新的时间需求,就要新增一个队列
设置消息TTL
增加一个队列QC
- 配置类
package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
public static final String xExchange="xExchange";
public static final String yExchange="yExchange";
public static final String dQueue="dQueue";
public static final String cQueue="cQueue";
@Bean
public DirectExchange xExchange(){
return new DirectExchange(xExchange);
}
@Bean
public DirectExchange yExchange(){
return new DirectExchange(yExchange);
}
@Bean
public Queue cQueue(){
Map<String, Object> args=new HashMap<>();
args.put("x-dead-letter-exchange", yExchange);
args.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(cQueue).withArguments(args).build();
}
@Bean
public Binding bindingXC(@Qualifier("cQueue") Queue cQueue,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(cQueue).to(xExchange).with("XC");
}
@Bean
public Queue dQueue(){
return new Queue(dQueue);
}
@Bean
public Binding deadQueueBindY(@Qualifier("dQueue") Queue dQueue,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(dQueue).to(yExchange).with("YD");
}
}
- 生产者
package com.example.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
System.out.println("发送一条消息给队列");
rabbitTemplate.convertAndSend("xExchange", "XC", message, CorrelationData->{
CorrelationData.getMessageProperties().setExpiration(ttlTime);
return CorrelationData;
});
}
}
- 消费者
package com.example.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DeadQueueConsumer {
@RabbitListener(queues="dQueue")
public void receiveMsg(Message message, Channel channel) throws IOException {
String s = new String(message.getBody());
System.out.println(s);
}
}
这种方式消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
安装延时队列插件
- 去链接下载延时插件,上传到
/usr/local/software - 复制插件到指定目录
cp -r ./rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins/ - 查看插件
rabbitmq-plugins list - 启动rabbitmq
systemctl start rabbitmq-server (重要,顺序不能乱) - 启动插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange - 配置文件
package com.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
public static final String exchangeName="delayed.exchange";
public static final String queueName="delayed.queue";
public static final String delayedRoutingKey="delayed.routingKey";
@Bean
public Queue delayedQueue(){
return new Queue(queueName);
}
@Bean
public CustomExchange delayedExchange(){
Map<String, Object> args=new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(exchangeName, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedExchange") CustomExchange delayedExchange,@Qualifier("delayedQueue") Queue delayedQueue){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(delayedRoutingKey).noargs();
}
}
- 生产者
package com.example.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@Controller
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendDelayedMsg/{message}/{delayedTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayedTime){
message+=delayedTime;
System.out.println("发送一条消息给队列");
rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, CorrelationData->{
CorrelationData.getMessageProperties().setDelay(delayedTime);
return CorrelationData;
});
}
}
- 消费者
package com.example.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DeadQueueConsumer {
@RabbitListener(queues="delayed.queue")
public void receiveMsg(Message message, Channel channel) throws IOException {
String s = new String(message.getBody());
System.out.println(s);
}
}
- 测试
访问http://localhost:8080/sendDelayedMsg/hello/20000 访问http://localhost:8080/sendDelayedMsg/hello/10000 第二个消息被先消费,符合预期
优先级队列
- 发送消息代码添加优先级,默认消息优先级为最低0
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().priority(5).build();
for (int i = 1; i <11; i++) {
String message = "info"+i;
if(i==5){
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送消息完成:" + message);
}
}
}
}
- 声明队列代码添加优先级
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
3.注意 队列实现优先级:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序
惰性队列
惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB 队列具备两种模式:default 和 lazy,lazy模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
RabbitMQ集群
搭建集群
- 克隆虚拟机 克隆1和克隆2
- 修改 3 台机器的主机名称,分别改为node1,node2,node3
vim /etc/hostname
- 配置各个节点的 hosts 文件,让各个节点都能互相识别对方
- 以确保各个节点的 cookie 文件使用的是同一个值在 node1 上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
- 启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以下命令)
rabbitmq-server -detached
- 在节点 2 执行
rabbitmqctl stop_app
(rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app(只启动应用服务)
- 在节点 3 执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
- 查看集群状态
rabbitmqctl cluster_status
- 需要重新设置用户
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
- 查看后台
- 解除集群节点(node2 和 node3 机器分别执行)
(node1 机器上执行)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2
镜像队列
- 启动三台集群节点
- 随便找一个节点添加 policy
消息随着队列备份而备份,镜像队列自动分布或转移在启动的节点中
|