一、场景介绍
可用于解耦、削峰、异步
1.1 串行方式
串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
public void makeOrder(){
orderService.saveOrder();
messageService.sendSMS("order");
emailService.sendEmail("order");
appService.sendApp("order");
}
存在问题: 1:每完成一个订单,耗时时间长 2:若3、4步执行失败,造成事务回滚导致下单失败,得不偿失 3:非核心业务代码过多造成冗余
1.2 并行方式
并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
public void makeOrder(){
orderService.saveOrder();
relationMessage();
}
public void relationMessage(){
theadpool.submit(new Callable<Object>{
public Object call(){
messageService.sendSMS("order");
}
})
theadpool.submit(new Callable<Object>{
public Object call(){
emailService.sendEmail("order");
}
})
theadpool.submit(new Callable<Object>{
public Object call(){
appService.sendApp("order");
}
})
theadpool.submit(new Callable<Object>{
public Object call(){
appService.sendApp("order");
}
})
}
存在问题: 1:耦合度高 2:需要自己写线程池自己维护成本太高 3:出现了消息可能会丢失,需要你自己做消息补偿 4:如何保证消息的可靠性你自己写 5:如果服务器承载不了,你需要自己去写高可用
1.3 异步消息队列的方式
public void makeOrder(){
orderService.saveOrder();
rabbitTemplate.convertSend("ex","2","消息内容");
}
好处 1:完全解耦,用MQ建立桥接 2:有独立的线程池和运行模型 3:出现了消息可能会丢失,MQ有持久化功能 4:如何保证消息的可靠性,死信队列和消息转移的等 5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。 按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍
1.4 小结
使用消息中间件可以做到: 1、高内聚,低耦合 2、流量的削峰 3、分布式事务的可靠消费和可靠生产 4、索引、缓存、静态化处理的数据同步 5、流量监控 6、日志监控(ELK) 7、下单、订单分发、抢票
二、fanout模式案例演示
2.1 fanout模式图解
2.2 目标
2.3 生产者具体实现
2.3.1 创建生产者工程
2.3.2 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>springboot-rabbitmq-fanout-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
2.3.3 application.yml
server:
port: 8080
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.229.128
port: 5672
2.3.4 定义订单的生产者
package com.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void makeOrderFanout(String userId, String productId, int num) {
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
String exchangeName = "fanout_order_exchange";
String routeKey = "";
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
}
}
2.3.5 配置类-绑定关系
package com.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue fanoutEmailQueue() {
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue fanoutSmsQueue() {
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue fanoutWeixinQueue() {
return new Queue("weixin.fanout.queue", true);
}
@Bean
public FanoutExchange fanoutOrderExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
}
@Bean
public Binding fanoutEmailBinding() {
return BindingBuilder.bind(fanoutEmailQueue()).to(fanoutOrderExchange());
}
@Bean
public Binding fanoutSmsBinding() {
return BindingBuilder.bind(fanoutSmsQueue()).to(fanoutOrderExchange());
}
@Bean
public Binding fanoutWeixinBinding() {
return BindingBuilder.bind(fanoutWeixinQueue()).to(fanoutOrderExchange());
}
}
2.3.5 创建生产者测试类
package com.test;
import com.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutProducerApplicationTests {
@Autowired
OrderService orderService;
@Test
public void contextLoads() throws Exception {
orderService.makeOrderFanout("1", "1", 12);
}
}
2.4 消费者具体实现
2.4.1 创建消费者工程
2.4.2 引入同样pom的依赖
2.4.3 application.yml
server:
port: 8081
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.229.128
port: 5672
2.4.4 创建三个消费者
FanoutEmailService:
package com.service.fanout;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class FanoutEmailService {
@RabbitHandler
public void messagerevice(String message){
System.out.println("email-------------->" + message);
}
}
FanoutSMSService:
package com.service.fanout;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@RabbitListener(queues = {"sms.fanout.queue"})
@Component
public class FanoutSMSService {
@RabbitHandler
public void messagerevice(String message){
System.out.println("sms-------------->" + message);
}
}
FanoutWeixinService:
package com.service.fanout;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@RabbitListener(queues = {"weixin.fanout.queue"})
@Component
public class FanoutWeixinService {
@RabbitHandler
public void messagerevice(String message){
System.out.println("weixin-------------->" + message);
}
}
2.4.5 消费者的主启动类
package com;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class FanoutConsumer {
public static void main(String[] args) {
SpringApplication.run(FanoutConsumer.class, args);
}
}
2.5 测试流程
1、先启动生产者(因为配置类定义在生产者一端,所以若先启动消费者会出现找不到队列的报错信息;解决方案:可以将配置类复制一份到消费者端) 2、启动消费者 3、查看消息是否成功消费
三、Direct模式案例演示
3.1 Direct模式图解
3.2 生产者具体实现
这里只介绍关键步骤,其他步骤参考fanout模式
3.2.1 创建Direct的配置类
package com.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
@Bean
public Queue directEmailQueue() {
return new Queue("email.direct.queue", true);
}
@Bean
public Queue directSmsQueue() {
return new Queue("sms.direct.queue", true);
}
@Bean
public Queue directWeixinQueue() {
return new Queue("weixin.direct.queue", true);
}
@Bean
public DirectExchange directOrderExchange() {
return new DirectExchange("direct_order_exchange", true, false);
}
@Bean
public Binding directEmailBinding() {
return BindingBuilder.bind(directEmailQueue()).to(directOrderExchange()).with("email");
}
@Bean
public Binding directSmsBinding() {
return BindingBuilder.bind(directSmsQueue()).to(directOrderExchange()).with("sms");
}
@Bean
public Binding directWeixinBinding() {
return BindingBuilder.bind(directWeixinQueue()).to(directOrderExchange()).with("weixin");
}
}
3.2.2 订单生产者
package com.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void makeOrderFanout(String userId, String productId, int num) {
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
String exchangeName = "fanout_order_exchange";
String routeKey = "";
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
}
public void makeOrderDirect(String userId, String productId, int num) {
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
String exchangeName = "direct_order_exchange";
rabbitTemplate.convertAndSend(exchangeName, "email", orderNumer);
rabbitTemplate.convertAndSend(exchangeName, "sms", orderNumer);
}
}
3.2.3 生产者测试类
package com.test;
import com.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutProducerApplicationTests {
@Autowired
OrderService orderService;
@Test
public void contextLoads() throws Exception {
orderService.makeOrderFanout("1", "1", 12);
}
@Test
public void contextLoads2() throws Exception {
orderService.makeOrderDirect("1", "1", 12);
}
}
3.3 消费者具体实现
3.3.1 创建三个消费者
DirectEmailService:
package com.service.direct;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailService {
@RabbitHandler
public void messagerevice(String message){
System.out.println("email-------------->" + message);
}
}
DirectSMSService:
package com.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@RabbitListener(queues = {"sms.direct.queue"})
@Component
public class DirectSMSService {
@RabbitHandler
public void messagerevice(String message){
System.out.println("sms-------------->" + message);
}
}
DirectWeixinService:
package com.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@RabbitListener(queues = {"weixin.direct.queue"})
@Component
public class DirectWeixinService {
@RabbitHandler
public void messagerevice(String message){
System.out.println("weixin-------------->" + message);
}
}
四、topic模式案例演示
4.1 topic模式图解
4.2 特别说明
这个说明下,前面两个案例都是采用配置类的方式实现交换机和队列的创建和绑定,因此特地用此案例来演示一下注解方式实现。
4.3 生产者具体实现
由于采用注解方式,不需要创建配置类,因此只需要创建订单生产者和测试类即可
4.3.1 订单生产者
package com.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void makeOrderFanout(String userId, String productId, int num) {
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
String exchangeName = "fanout_order_exchange";
String routeKey = "";
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
}
public void makeOrderDirect(String userId, String productId, int num) {
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
String exchangeName = "direct_order_exchange";
rabbitTemplate.convertAndSend(exchangeName, "email", orderNumer);
rabbitTemplate.convertAndSend(exchangeName, "sms", orderNumer);
}
public void makeOrderTopic(String userId, String productId, int num) {
String orderNumer = UUID.randomUUID().toString();
System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
String exchangeName = "topic_order_exchange";
String routingKey = "sms.email";
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderNumer);
}
}
4.3.2 生产者测试类
package com.test;
import com.service.OrderService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class FanoutProducerApplicationTests {
@Autowired
OrderService orderService;
@Test
public void contextLoads() throws Exception {
orderService.makeOrderFanout("1", "1", 12);
}
@Test
public void contextLoads2() throws Exception {
orderService.makeOrderDirect("1", "1", 12);
}
@Test
public void contextLoads3() throws Exception {
orderService.makeOrderTopic("1", "1", 12);
}
}
4.4 消费者具体实现
4.4.1 创建三个消费者
TopicEmailService:
package com.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@RabbitListener(bindings =@QueueBinding(
value = @Queue(value = "email.topic.queue",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.email.#"
))
@Component
public class TopicEmailService {
@RabbitHandler
public void messagerevice(String message){
System.out.println("emailtopic-------------->" + message);
}
}
TopicSMSService:
package com.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@RabbitListener(bindings =@QueueBinding(
value = @Queue(value = "sms.topic.queue",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.sms.*"
))
@Component
public class TopicSMSService {
@RabbitHandler
public void messagerevice(String message){
System.out.println("smstopic-------------->" + message);
}
}
TopicWeixinService:
package com.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@RabbitListener(bindings =@QueueBinding(
value = @Queue(value = "weixin.topic.queue",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "weixin.#"
))
@Component
public class TopicWeixinService {
@RabbitHandler
public void messagerevice(String message){
System.out.println("weixintopic-------------->" + message);
}
}
4.4.2 简单介绍消费者
由上述代码可以见,交换机和队列的创建和绑定都在消费者这一端实现了,通过注解的方式。
五、总结
1、交换机和队列的创建和绑定配置类,应该定义在消费者还是生产者那边? 2、使用配置类方式还是使用注解方式好?
本人意见: 1:可以在两端都添加好配置,生产者一端可随意,但消费者一端必须添加,因为大多数情况,我们都是先启动消费者的。但还是视情况而定。 2:推荐使用配置类方式,因为配置类方式逻辑比较清晰,代码内聚方便管理,而且后续对消息和队列设置TTL属性时也较为方便和直观。
|