RabbitMQ
项目源码git共享在 RabbitMQ项目源码git地址
RabbitMQ介绍
2007年,Rabbit技术公司基于AMQP标准开发的RabbitMQ1.0 发布。RabbitMQ采用Erlang语言开发。Erlang 语言由Ericson设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
rabbitmq基础架构图
Connection 连接(由连接工厂创建,连接工厂需要设置参数)
Channel 管道,用来通信
VirtualHost 虚拟主机
Exchange 交换机
Queue队列
RabbitMQ工作模式
最基本的单生产者单消费者队列模式
一个生产者,一个消费者,项目示范:单生产者单消费者项目使用示范
Work queues工作队列模式
一个生产者,若干消费者,项目示范:单生产者多消费者项目使用示范
Pub/Sub订阅模式
一个生产者,多个队列,多消费者,项目示范:Pub/Sub模式项目使用示范
在订阅模型中,多了一个Exchange角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接收者,会一直等待消息到来
- Queue:消息队列,接收消息、缓存消息
- Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:定向,把消息交给符合指定routing key 的队列 Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
Routing路由模式
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key) -
消息的发送方在向Exchange 发送消息时,也必须指定消息的 RoutingKey -
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key完全—致,才会接收到消息 -
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key -
X: Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列 -
C1:消费者,其所在队列指定了需要routing key为error的消息 -
C2:消费者,其所在队列指定了需要routing key为info、error、warning 的消息
Topic通配符模式
同一交换机下,只要发的消息的routingkey能和绑定的队列的通配符能匹配上这个消息就会被送进队列,项目示范:Topic模式项目使用示范
Topic主题模式可以实现 Pub/Sub发布与订阅模式和Routing 路由模式的功能,只是Topic在配置routing key的时候可以使用通配符,显得更加灵活。
下载和安装RabbitMQ
安装docker
为了屏蔽环境,操作系统不同,我使用docker,没有docker直接去菜鸟教程的docker安装教程看,有自动安装的命令,这里我摘取centos和ubuntu的。
curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
启动和将docker加入开机启动的服务
systemctl start docker
systemctl enable docker
拉取镜像并启动容器
docker pull rabbitmq:3.6.5-management
docker run -dit --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 5672:5672 -p 15672:15672 -p 4369:4369 -p 25672:25672 rabbitmq:3.6.5-management
访问管理后台
我的linux是个虚拟机放,IP是192.168.0.105,你自己的就用127.0.0.1就可以了,rabbitmq服务的端口是5672,后台的端口是15672。
单生产者单消费者项目使用示范
创建生产者
创建的是maven项目
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbitmq-demo</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>producer</artifactId>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Producer.java
package com.example.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("first_queue",true,false,false,null);
String msg = "hello rabbitmq,this is my first message.";
channel.basicPublish("","first_queue",null,msg.getBytes());
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
去控制面板查看
点开first_queue,再点Get messages里的Get Message(s)
创建消费者
也是maven项目
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbitmq-demo</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>consumer</artifactId>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
ConsumerDemo.java
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDemo {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag: "+consumerTag);
System.out.println("env: "+envelope.toString());
System.out.println("消费者收到消息:");
System.out.println(new String(body));
}
};
channel.basicConsume("first_queue",true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
单生产者多消费者项目使用示范
创建生产者
给之前的代码加个for循环执行,并加一个sleep即可
SingleLoopProducer.java
package com.example.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class SingleLoopProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("first_queue",true,false,false,null);
for (;;){
try {
String msg = "hello rabbitmq,this is my first message. "+new Date().toString();
channel.basicPublish("","first_queue",null,msg.getBytes());
Thread.sleep(1500);
}catch (Exception e){
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
创建消费者
SingleLoopConsumerDemo1.java 消费者2号
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SingleLoopConsumerDemo1 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号收到消息:");
System.out.println("consumerTag: "+consumerTag);
System.out.println("env: "+envelope.toString());
System.out.println("消息主体:");
System.out.println(new String(body));
}
};
channel.basicConsume("first_queue",true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
SingleLoopConsumerDemo2.java 消费者2号
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SingleLoopConsumerDemo2 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号收到消息:");
System.out.println("consumerTag: "+consumerTag);
System.out.println("env: "+envelope.toString());
System.out.println("消息主体:");
System.out.println(new String(body));
}
};
channel.basicConsume("first_queue",true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
启动两个消费者,再启动生产者
可以看到两个消费者不断的在消费队列里的消息,并且不会重复消费。
Pub/Sub模式项目使用示范
创建pub生产者端
PubProducer.java
package com.example.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "pub_demo_exchange";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,true,false,null);
String queue1Name = "pub_demo_queue1";
String queue2Name = "pub_demo_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
channel.queueBind(queue1Name,exchangeName,"",null);
channel.queueBind(queue2Name,exchangeName,"",null);
String msg = "hello ,send to exchange "+exchangeName;
channel.basicPublish(exchangeName,"",null,msg.getBytes());
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
创建队列1的消费者
PubSubConsumerDemo1.java
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSubConsumerDemo1 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
String queueName = "pub_demo_queue1";
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号从队列"+queueName+"收到消息:");
System.out.println("consumerTag: "+consumerTag);
System.out.println("env: "+envelope.toString());
System.out.println("消息主体:");
System.out.println(new String(body));
}
};
channel.basicConsume(queueName,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
创建队列2的消费者
PubSubConsumerDemo2.java
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSubConsumerDemo2 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
String queueName = "pub_demo_queue2";
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号从队列"+queueName+"收到消息:");
System.out.println("consumerTag: "+consumerTag);
System.out.println("env: "+envelope.toString());
System.out.println("消息主体:");
System.out.println(new String(body));
}
};
channel.basicConsume(queueName,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
Routing模式项目使用示范
创建生产者
RoutingProducer.java
package com.example.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RoutingProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "routing_demo_exchange";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,true,false,null);
String queue1Name = "routing_demo_queue1";
String queue2Name = "routing_demo_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
channel.queueBind(queue1Name,exchangeName,"error",null);
channel.queueBind(queue2Name,exchangeName,"debug",null);
channel.queueBind(queue2Name,exchangeName,"info",null);
channel.queueBind(queue2Name,exchangeName,"warning",null);
String msg = "hello ,send to exchange "+exchangeName+" level: error";
channel.basicPublish(exchangeName,"error",null,msg.getBytes());
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
创建消费者1消费routingkey=“error”
RoutingConsumerDemo1.java
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RoutingConsumerDemo1 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
String queueName = "routing_demo_queue1";
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号从队列"+queueName+"收到消息:");
System.out.println("consumerTag: "+consumerTag);
System.out.println("env: "+envelope.toString());
System.out.println("消息主体:");
System.out.println(new String(body));
}
};
channel.basicConsume(queueName,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
创建消费者2消费routingkey为debug info error
RoutingConsumerDemo2.java
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RoutingConsumerDemo2 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
String queueName = "routing_demo_queue2";
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2号从队列"+queueName+"收到消息:");
System.out.println("consumerTag: "+consumerTag);
System.out.println("env: "+envelope.toString());
System.out.println("消息主体:");
System.out.println(new String(body));
}
};
channel.basicConsume(queueName,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
Topic模式项目使用示范
创建生产者
TopicProducer.java
package com.example.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "topic_demo_exchange";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,true,false,null);
String queue1Name = "topic_demo_queue1";
String queue2Name = "topic_demo_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
channel.queueBind(queue1Name,exchangeName,"*.log",null);
channel.queueBind(queue2Name,exchangeName,"order.#",null);
String msg = "hello ,this is an order data,send to exchange "+exchangeName;
channel.basicPublish(exchangeName,"order.log",null,msg.getBytes());
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
创建日志消费者
TopicConsumerDemo1.java
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicConsumerDemo1 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
String queueName = "topic_demo_queue1";
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号从队列"+queueName+"收到消息:");
System.out.println("consumerTag: "+consumerTag);
System.out.println("env: "+envelope.toString());
System.out.println("消息主体:");
System.out.println(new String(body));
}
};
channel.basicConsume(queueName,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
创建订单消费者
TopicConsumerDemo2.java
package com.example.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicConsumerDemo2 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.105");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
String queueName = "topic_demo_queue2";
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1号从队列"+queueName+"收到消息:");
System.out.println("consumerTag: "+consumerTag);
System.out.println("env: "+envelope.toString());
System.out.println("消息主体:");
System.out.println(new String(body));
}
};
channel.basicConsume(queueName,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行结果
Springboot 整合RabbitMQ
创建生产者
项目结构图
创建一个springboot-rabbitmq-producer模块,是一个springboot项目
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>springboot-rabbitmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.example.springbootrabbitmqproducer.SpringbootRabbitmqProducerApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
application:
name: springboot-rabbitmq-producer
rabbitmq:
host: 192.168.0.105
port: 5672
virtual-host: /
username: admin
password: admin
server:
port: 8001
rabbitmq配置类
package com.example.springbootrabbitmqproducer.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
public static final String EXCHANGE_NAME = "springboot_topic_exchange";
public static final String QUEUE_NAME = "springboot_queue";
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean(QUEUE_NAME)
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean
public Binding binding(@Qualifier(QUEUE_NAME) Queue queue,@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("springboot.#").noargs();
}
}
测试运行
package com.example.springbootrabbitmqproducer;
import com.example.springbootrabbitmqproducer.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
class SpringbootRabbitmqProducerApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"springboot.hello","hello,springboot rabbitmq!");
}
}
运行结果
创建消费者
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>springboot-rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.example.springbootrabbitmqconsumer.SpringbootRabbitmqConsumerApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
application:
name: springboot-rabbitmq-consumer
rabbitmq:
host: 192.168.0.105
port: 5672
virtual-host: /
username: admin
password: admin
server:
port: 8001
监听队列类
package com.example.springbootrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitmqListener {
public static final String QUEUE_NAME = "springboot_queue";
@RabbitListener(queues = QUEUE_NAME)
public void listenerQueue(Message message){
System.out.println("监听方法收到了消息:");
System.out.println(message);
}
}
测试运行
监听方法收到了消息:
(Body:'hello,springboot rabbitmq!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=springboot_topic_exchange, receivedRoutingKey=springboot.hello, deliveryTag=1, consumerTag=amq.ctag-rwLRHnLKMiETHNnk1ZAwDw, consumerQueue=springboot_queue])
RabbitMQ高级特性
消息可靠性投递
在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitmq整个消息投递的路径为:
producer—>rabbitmq broker—>exchange- -->queue—>consumer
消息从producer 到 exchange 则会返回一个confirmCallback .
消息从exchange–>queue投递失败则会返回一个returnCallback 。我们将利用这两个callback 控制消息的可靠性投递
-
设置ConnectionFactory的publisher-confirms="true”开启确认模式。 -
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。 -
设置ConnectionFactory的publisher-returns="true"开启退回模式。 -
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage. -
在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。使用channel下列方法,完成事务控制:
- txSelect(),用于将当前channel设置成transaction模式
- txCommit(),用于提交事务
- txRollback(),用于回滚事务
confirm确认模式
# 旧版配置
publisher-confirms: true
# 新版配置
publisher-confirm-type: correlated
创建一个子项目springboot-rabbitmq-producer-confirm,复制一份之前项目的应用配置
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>springboot-rabbitmq-producer-confirm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-producer-confirm</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>
com.example.springbootrabbitmqproducerconfirm.SpringbootRabbitmqProducerConfirmApplication
</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
application:
name: springboot-rabbitmq-producer
rabbitmq:
host: 192.168.0.105
port: 5672
virtual-host: /
username: admin
password: admin
# 启用消息确认
publisher-confirm-type: correlated
测试类
package com.example.springbootrabbitmqproducerconfirm;
import com.example.springbootrabbitmqproducerconfirm.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootRabbitmqProducerConfirmApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirmSend(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
System.out.println("收到投递反馈");
System.out.println(correlationData);
if (ack){
System.out.println("消息投递成功");
}else {
System.out.println("消息投递失败,错误原因:");
System.out.println(s);
}
}
});
String msgId = "1";
String msg = "hello,this message is a test for sending message efficiently,i expect receive a confirm from exchange.";
rabbitTemplate.convertAndSend(RabbitmqConfig.QUEUE_NAME,msg.getBytes(),new CorrelationData(msgId));
}
}
运行结果
return退回模式
跟上面一样再次创建一个测试用的项目
application.yml
spring:
application:
name: springboot-rabbitmq-producer
rabbitmq:
host: 192.168.0.105
port: 5672
virtual-host: /
username: admin
password: admin
# 启用消息投递失败回退
publisher-returns: true
测试类
package com.example.springbootrabbitmqproducerreturn;
import com.example.springbootrabbitmqproducerreturn.config.RabbitmqConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootRabbitmqProducerReturnApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testReturnSend() {
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息投递失败,调用了回退函数 replyCode:" + replyText + " replyText:" + replyText + " exchange:" + exchange + " routingKey:" + routingKey);
System.out.println(message);
}
});
String msg = "hello,this message is a test for sending message efficiently,if message send failed,i will receive a return callback.";
rabbitTemplate.convertAndSend(RabbitmqConfig.QUEUE_NAME + "not", msg);
}
}
运行结果
消费者Ack消费可靠性
自动确认(默认)
手动确认
根据异常情况确认(不常用)
ack指Acknowledge,确认。表示消费端收到消息后的确认方式。有三种确认方式:
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck),手动签收,
如果出现异常,则调用channel.basicNack)方法,让其自动重新发送消息。
创建项目springboot-rabbitmq-consumer-ack
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>springboot-rabbitmq-consumer-ack</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-consumer-ack</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.example.springbootrabbitmqconsumerack.SpringbootRabbitmqConsumerAckApplication
</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
application:
name: springboot-rabbitmq-producer
rabbitmq:
host: 192.168.0.105
port: 5672
virtual-host: /
username: admin
password: admin
server:
port: 8080
RabbitmqListener.java
package com.example.springbootrabbitmqconsumerack.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitmqListener implements ChannelAwareMessageListener {
public static final String QUEUE_NAME = "springboot_queue";
@RabbitListener(queues = RabbitmqListener.QUEUE_NAME, ackMode = "MANUAL")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("收到消息");
System.out.println(message);
boolean dealSuccess = true;
try {
Thread.sleep(1500);
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
dealSuccess = false;
}
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if (dealSuccess) {
channel.basicAck(deliveryTag, false);
System.out.println("手动签收");
} else {
channel.basicNack(deliveryTag, false, true);
System.out.println("拒绝签收");
}
}
}
往这个队列里放一条消息,然后我们看我们的代码故意报错拒绝签收,让队列重新入队。
运行后我们可以看到一直在收到这个消息然后因为报错,拒收了消息,重新入队
我们注释掉错误试下,成功签收
去rabbitmq控制台手动获取下消息看看,发现消息的确被消费了
消费端限流
步骤:
- 设置每次最多拉取多少条消息
- 让消费端ack方式为手动确认方式(manual)
我们往刚才队列里填充10个消息(1-10,手动在控制台添加的),然后对刚才项目,注释掉故意设置的报错,注释掉睡眠,然后启动我们看rabbitmq控制台,我们可以看到消息速率是2/s,同时可以点开consumers看看有哪些消费者在消费,也可以看到我们设置的每次抓取消息数目限制为2
listener:
simple:
prefetch: 2
application.yml
spring:
application:
name: springboot-rabbitmq-producer
rabbitmq:
host: 192.168.0.105
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
prefetch: 2
server:
port: 8080
队列长度限制MaxLength
队列创建的时候设置参数"x-max-length"即可设置队列的最大长度
@Bean(QUEUE_NAME)
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).maxLength(1000).build();
}
消息设置TTL(time to live)过期时间
队列创建的时候设置参数"x-message-ttl"即可设置消息的存活时间,也可以发送消息的时候单独设置过期时间,如果两个都设置了,取最小值。
@Bean("queue_name")
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).ttl(10000).build();
}
@SpringBootTest
class SpringbootRabbitmqProducerApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"springboot.hello","hello,springboot rabbitmq!",messagePostProcessor);
}
}
我测试每隔一两秒时间发一个消息进队列,然后停下来不发了。看到的监控图如下,是生效的。
死信队列
消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
给队列绑定死信交换机:
给队列设置参数:x-dead-letter-exchange和x-dead-letter-routing-key
创建交换机1,队列1,队列1绑定到交换机1,并将队列1的死信队列绑定到交换机2去
创建交换机2,队列2,队列2绑定到交换机2
给交换机1发消息送入队列1,由一个消费者消费队列1的消息,并拒绝掉消息,这样这个消息就被送到了死信队列里去了,其实就是按照配置好的routing key 送到了交换机2,从而送到了队列2当中去了
创建项目springboot-rabbitmq-demo-dead-letter-exchange
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>springboot-rabbitmq-demo-dead-letter-exchange</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-demo-dead-letter-exchange</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>
com.example.springbootrabbitmqdemodeadletterexchange.SpringbootRabbitmqDemoDeadLetterExchangeApplication
</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
队列配置类
package com.example.springbootrabbitmqdemodeadletterexchange.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqConfig {
public static final String EXCHANGE1_NAME = "springboot_dead_letter_exchange1";
public static final String QUEUE1_NAME = "springboot_dead_letter_queue1";
public static final String EXCHANGE2_NAME = "springboot_dead_letter_exchange2";
public static final String QUEUE2_NAME = "springboot_dead_letter_queue2";
@Bean(EXCHANGE1_NAME)
public Exchange exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE1_NAME).durable(true).build();
}
@Bean(QUEUE1_NAME)
public Queue queue(){
return QueueBuilder.durable(QUEUE1_NAME).withArgument("x-dead-letter-exchange",EXCHANGE2_NAME).withArgument("x-dead-letter-routing-key","springboot.dead.letter").build();
}
@Bean
public Binding binding(@Qualifier(QUEUE1_NAME) Queue queue,@Qualifier(EXCHANGE1_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("springboot.#").noargs();
}
@Bean(EXCHANGE2_NAME)
public Exchange deadLetterExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE2_NAME).durable(true).build();
}
@Bean(QUEUE2_NAME)
public Queue deadLetterQueue(){
return QueueBuilder.durable(QUEUE2_NAME).build();
}
@Bean
public Binding deadLetterQueueBinding(@Qualifier(QUEUE2_NAME) Queue queue,@Qualifier(EXCHANGE2_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("springboot.#").noargs();
}
}
队列1的消费类
package com.example.springbootrabbitmqdemodeadletterexchange.listener;
import com.example.springbootrabbitmqdemodeadletterexchange.config.RabbitmqConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ConsumerListener {
@RabbitListener(queues = RabbitmqConfig.QUEUE1_NAME,ackMode = "MANUAL")
public void consumeQueue1(Message message, Channel channel){
System.out.println("队列"+RabbitmqConfig.QUEUE1_NAME+" 收到消息:"+new String(message.getBody()));
System.out.println(message.toString());
try {
System.out.println("手动拒绝消息");
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
首先运行这个项目,让交换机和队列被定义好,然后关闭项目(队列1监听器开启了,我们先关掉项目)
我们给队列1投递一个消息(控制台publish message)
然后我们启动项目,把消息消费,并拒绝掉消息,拒绝的时候requeue=false,然后我们看运行结果
我们进入控制台查看队列2当中的消息,我们可以看到已经有了,而队列2页是可以绑定消费者进行消费的
延迟队列
TTL队列+死信队列组合实现延迟队列即可
创建项目springboot-rabbitmq-delay-queue,配置文件跟之前一样,就只有应用名不一样
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>springboot-rabbitmq-delay-queue</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-delay-queue</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.example.springbootrabbitmqdelayqueue.SpringbootRabbitmqDelayQueueApplication
</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
配置类
DLXConfig.java
package com.example.springbootrabbitmqdelayqueue.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DLXConfig {
public static final String EXCHANGE_NAME = "springboot_delay_queue_demo_dlx_exchange";
public static final String QUEUE_NAME = "springboot_delay_queue_demo_dlx_queue";
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean(QUEUE_NAME)
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean("DLXConfigBinding")
public Binding binding(@Qualifier(EXCHANGE_NAME) Exchange exchange,@Qualifier(QUEUE_NAME) Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
TTLQueueConfig.java
package com.example.springbootrabbitmqdelayqueue.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
public class TTLQueueConfig {
public static final String EXCHANGE_NAME = "springboot_delay_queue_demo_ttl_exchange";
public static final String QUEUE_NAME = "springboot_delay_queue_demo_ttl_queue";
public static final int EXPIRE = 10000;
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean(QUEUE_NAME)
public Queue queue(){
return QueueBuilder.durable(QUEUE_NAME).ttl(EXPIRE).withArgument("x-dead-letter-exchange",DLXConfig.EXCHANGE_NAME).withArgument("x-dead-letter-routing-key","").build();
}
@Bean("TTLQueueConfigBinding")
public Binding binding(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
为了能够让配置生效,我们在启动类中调用下。
经过测试,如果我不调用rabbitTemplate去操作那么不会建立连接并执行配置,这是一个懒加载的机制
package com.example.springbootrabbitmqdelayqueue;
import com.example.springbootrabbitmqdelayqueue.config.TTLQueueConfig;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringbootRabbitmqDelayQueueApplication implements ApplicationRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqDelayQueueApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("启动成功");
rabbitTemplate.convertAndSend(TTLQueueConfig.EXCHANGE_NAME,"","hello".getBytes());
}
}
启动后,运行结果
消息进了ttl队列,然后没有被消费,过期了,因死信队列策略,会被送往死信队列中去
前往死信队列中查看结果,那么我们可以看到消息被投递到这来了,如果我们定义消费者绑定到这个队列即可实现整个延迟队列
那我们现在写一个消费者来绑定到延迟后的死信队列去
package com.example.springbootrabbitmqdelayqueue.listener;
import com.example.springbootrabbitmqdelayqueue.config.DLXConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DelayQueueListener {
@RabbitListener(queues = DLXConfig.QUEUE_NAME)
public void onMessage(Message message){
System.out.println("延迟队列收到消息:"+new Date().toString());
System.out.println(new String(message.getBody()));
System.out.println(message.toString());
}
}
修改启动类,加上时间戳打印
package com.example.springbootrabbitmqdelayqueue;
import com.example.springbootrabbitmqdelayqueue.config.TTLQueueConfig;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Date;
@SpringBootApplication
public class SpringbootRabbitmqDelayQueueApplication implements ApplicationRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqDelayQueueApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("启动成功,发送消息到ttl队列去"+new Date().toString());
rabbitTemplate.convertAndSend(TTLQueueConfig.EXCHANGE_NAME,"","hello,delay queue".getBytes());
}
}
启动项目,可以看到先发送了消息,十秒后收到了消息,延迟队列实现成功
日志与监控
- 查看队列
rabbitmqctl list_queues - 查看exchanges
rabbitmqctl list_exchanges - 查看用户
rabbitmqctl list_users - 查看连接
rabbitmqctl list_connections - 查看消费者信息
rabbitmqctl list_consumers - 查看环境变量
rabbitmqctl environment - 查看未被确认的队列
rabbitmqctl list_queues name messages_unacknowledged - 查看单个队列的内存使用
rabbitmqctl list_queues name memory - 查看准备就绪的队列
rabbitmqctl list_queues name messages_ready
消息追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
开启后把每一步记录输出到日志或者队列了,需要的时候自行百度,不做示范了。
RabbitMQ应用问题
消息可靠性保障–消息补偿
消息幂等性保障–乐观锁机制
RabbitMQ集群搭建+HAProxy代理集群节点
在虚拟机将当前虚拟机克隆一份完整的,这样就有了两个节点。称节点1,节点2.
操作步骤
rabbitmqctl stop_app
docker cp ./.erlang.cookie rabbitmq:/var/lib/rabbitmq
rabbitmqctl reset
docker exec -it rabbitmq /bin/bash
root@907dd77fa8ea:/
Starting node rabbit@907dd77fa8ea ...
echo '192.168.0.105 907dd77fa8ea' >> /etc/hosts
root@95ccb7d105c8:/
Clustering node rabbit@95ccb7d105c8 with rabbit@907dd77fa8ea ...
rabbitmqctl start_app
操作完成后进控制台看,两个节点都有了
现在集群已经搭建好了,接下来要实现高可用还需要配置镜像队列,也就是将主节点的队列数据同步到从节点去
我们打开主节点的控制台,进入Admin,点右边的Policies,添加一个策略
表达式规则地址:https://developer.mozilla.org/en-US/docs/Web/JavaScript/Guide/Regular_Expressions
我们添加一个队列test_ha_queue来测试是不是会同步,在这个队列里投递一个消息,我们再去节点2控制台看,可以看到队列和消息都已经同步了的
而我们连接的时候是选择集群中任何一个IP都可以进行连接使用的,但是如果这个IP挂了,那么就连不上了。因此我们需要使用HAProxy来对IP做一个负载均衡
HAProxy
HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支持非常大的并发连接数。
下载haproxy.1.6.5解压放入/usr/local
下载地址:https://src.fedoraproject.org/repo/pkgs/haproxy/haproxy-1.6.5.tar.gz
tar -xvf haproxy-1.6.5.tar.gz
mv -r ./haproxy-1.6.5 /usr/local/
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
vi /etc/profile
source /etc/profile
mkdir /etc/haproxy
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
vi /etc/haproxy/haproxy.cfg
haproxy -f /etc/haproxy/haproxy.cfg
http://192.168.0.105:8100/rabbitmq-stats
配置文件内容
global
log 127.0.0.1 local0 info
maxconn 5120
chroot /usr/local/haproxy
uid 99
gid 99
daemon
quiet
nbproc 20
pidfile /var/run/haproxy.pid
defaults
log global
mode tcp
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s
clitimeout 60s
srvtimeout 15s
listen rabbit_cluster
# 绑定到5673端口
bind 0.0.0.0:5673
mode tcp
# 负载均衡的节点
balance roundrobin
server node1 192.168.0.105:5672 check inter 5000 rise 2 fall 2
server node2 192.168.0.102:5672 check inter 5000 rise 2 fall 2
listen stats
# 绑定状态查看后台的地址
bind 192.168.0.105:8100
mode http
option httplog
stats enable
# 状态后台的uri
stats uri /rabbitmq-stats
stats refresh 5s
停止haproxy命令是 killall haproxy
|