RabbitMQ 整合 PD 商城实战项目流程总结
一、订单流量削峰(解耦)
简单模式,若多添加几个消费者则用工作模式。
导入商城项目
将 step5 课前资料里面 /elasticsearch/pd商城.zip 解压。
第二层目录下 pd-web 文件夹解压到 rabbitmq 工程目录下面。
修改 pom.xml 文件中的 SpringBoot 版本为 2.3.2.RELEASE(可拖拽至 idea 内更改)。
在 rabbitmq 工程中导入模块(若之前在 idea 内更改则可以直接右键 Add As Maven 导入)。 或者打开工程结构 Project Structures 手动添加也可。
使用 sqlyog 导入项目目录中的 pd.sql:
右键复制脚本文件的路径,在 sqlyog 里面右键连接,从 SQL 转储文件导入数据库。
如果导入失败,可以增大 mysql 缓存区
set global max_allowed_packet=100000000;
set global net_buffer_length=100000;
SET GLOBAL interactive_timeout=28800000;
SET GLOBAL wait_timeout=28800000
因为大环境变化,MySQL 5.5 以下版本的用起来会很费劲,请至少更新至5.7。MariaDB 要 10.4 以上。
确认工程 JDK 的配置(吐槽:坑爹的涛哥这里准备的项目是 GBK 编码的,属实难顶)。
为了本项目的运行,首先将数据库 pd_store 的 user、order、order_item 表清空:
delete from pd_user
delete from pd_order
delete from pd_order_item
找到主启动类 RunPdApp ,运行,然后更改启动配置中的工作目录为本项目,改完重新启动。
打开 localhost 进入模拟拼多商城,注册一个新用户(随意填写,注意格式即可)。
注册成功以后,点击地址管理,添加一个收货地址(只填必填项即可)。
返回起始页,选择一个商品,加入购物车并下订单(无需支付,订单会添加到数据库内)。
查看数据库是否正确存储数据。
生产者 - 发送订单
修改订单,把订单发送到 rabbitmq
返回 pd-web 项目,打开 service 下的 OrderServiceImpl 实现类,注释掉 64-89 行关于数据库操作的代码。这里是为了不直接操作数据库,而是发送到 rabbitmq 以后再继续操作。
pom.xml 添加 rabbitmq 依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml 添加 rabbitmq 连接。
spring:
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
启动类(或者新建自动配置类)
-
提供使用的队列参数: orderQueue,持久队列(订单不能丢),不独占(并行处理效率高),不自动删除。 -
创建 spring 的 Queue 对象,用来封装队列的参数。RabbitAutoConfiguration 自动配置类,会自动发现 Queue 实例,使用这里的参数,连接服务器在服务器上创建队列。 1)通过方法创建一个 Queue 队列,注意引包要用 amqp 核心包。 import org.springframework.amqp.core.Queue 2)直接返回一个 Queue 对象,参数为上述参数(队列名,true,false,false)。 3)最后将方法交给 Spring 容器管理。
package com.pd;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@MapperScan("com.pd.mapper")
public class RunPdAPP{
public static void main(String[] args) {
SpringApplication.run(RunPdAPP.class, args);
}
@Bean
public Queue orderQueue(){
return new Queue("orderQueue",true,false,false);
}
}
修改 OrderServiceImpl,向 Rabbitmq 发送订单。
-
saveOrder 方法上添加自动注入 AmqpTemplate 对象,该对象为 Spring 提供的用来发送消息的工具对象。 -
在注释的方法里,要求将 pdOrder 发送到 orderQueue 队列: t.convertAndSend("orderQueue",pdOrder); -
这里的 convertAndSend 方法是转换并发送,先转成 byte[],再发送,订单对象会被自动序列化成 byte[]。
重启启动类,重新下订单进行测试:
- 下订单以后,会发现数据库里没有新订单。
此时查看 Rabbitmq 服务器,在 orderQueue 队列会发现有一条就绪的消息。 点开 orderQueue 队列,查看下方的 Get messages 会有很长的一串代码(消息):
有消息即可。
订单 - 消费者
1.找到项目目录,复制一份 pd-web 起名为 pd-web-consumer 作为消费者端。
2.修改 pom.xml 文件:
- 将 artifactId 改为 pd-web-consumer(可直接拖拽 pom 文件至 idea)。
3.修改 application.yml 文件:
- 因为之前的生产者端已经占用了 80 端口,所以这里将 server.port 改为 81。
4.添加依赖、添加连接、队列参数:前面做过了所以省略。
5.新建消费者类(OrderConsumer),从 orderQueue 接收消息:
-
在 pd 包内创建 OrderConsumer 类。 -
添加 @Component 注解自动创建实例,添加 @RabbitListener 注解指定接收消息的队列。 -
在接收订单时,需要调用 OrderService 对象来接收,因此添加自动注入 OrderService 对象。 -
创建 receive 方法,参数为 PdOrder 对象,并添加 @RabbitHandler 注解。 @RabbitHandler 注解是用来配合 @RabbitListener 注解的。 因此指定处理消息的方法的类中,只能有一个 @RabbitHandler 注解。
6.调用原来的业务代码,存储订单
- 通过 OrderService 对象调用 saveOrder 方法接收订单,最后输出提示 “订单已存储” 。
- Spring 会自动创建实例,自动启动消费者,自动开始从队列获取消息,自动执行处理消息的方法。
package com.pd;
import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "orderQueue")
public class OrderConsumer {
@Autowired
private OrderService orderService;
@RabbitHandler
public void receive(PdOrder pdOrder) throws Exception {
orderService.saveOrder(pdOrder);
System.out.println("-----------------------------------订单已存储!wow!");
}
}
7.修改 OrderServiceImpl,还原成原来的数据库操作的代码即可。
-
将注入的 AmqpTemplate 对象删掉,saveOrder 前面自动序列化的代码也删掉,数据库注释部分还原。 -
因为之前的是随机生成的 orderId,现在因为前面已经获取了 PdOrder 的对象。 所以直接添加一行代码:获取 pdOrder 的 orderId。 String orderId = pdOrder.getOrderId();
8.消息传递测试:
- 由于之前发送的订单对象需要序列化,所以这里需要重新下一个订单来测试传递。
- 重新下完订单以后,启动 81 端口的 RunPdApp 服务,消息会自动传递到消费者这里。
- 控制台显示 “订单已存储” 即可。
9.合理分发相关:
-
ack – spring 封装的 rabbitmq,已经是手动 ack 模式了,spring 会自动发送回执。 -
qos=1 – spring 默认设置的 qos 是250,yml 中 可以添加 prefetch = 1 把 qos 设置成 1。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
10.持久化相关:
队列持久化参数设定为 true;
生产者发送的消息默认是持久消息(即生产者利用 AmqpTemplate 调用 convertAndSend 的方法)。
二、SpringBoot 整合 RabbitMQ
创建项目
-
在 rabbitmq 目录下创建 SpringBoot 工程模块:rabbitmq-spring,只添加 rabbitmq 依赖即可。 -
修改 pom.xml 文件:Spring Boot 版本改为 2.3.2.RELEASE -
修改 application.yml 文件:添加 rabbitmq 连接 spring:
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
项目结构
- 创建结构和之前类似,虽然项目为我们自动创建了一个启动类(RabbitmqSpringApplication),但是我们想同时启动多个模式下的生产者和消费者时会起冲突,因此为了方便测试运行我们每个包内单独创建一个启动类(Main)来运行该包下的测试程序。
- 分别创建 m1/m2/m3/m4/m5 包,目录下创建 Main 启动类、Producer 生产者类和 Consumer 消费者类。
M1 项目测试
M1 项目为默认模式的测试,即只有一个消费者使用一个队列接收一个生产者的模式。
Producer 类
首先我们完成生产者类,生产者类负责向某个队列发送消息,通过对象转换并发送自己想发送的信息。
-
添加 @Component 注解自动创建实例。 -
自动注入 AmqpTemplate 对象。 -
自定义一个发送方法(send),用 AmqpTemplate 对象调用 convertAndSend 方法将信息发送。 方法参数为:( 队列名,具体信息内容 )这里该方法会自动转换字符串内容。 -
这里同意规定测试队列名为 ”helloworld“,发送信息随意。
package cn.tedu.rabbitmqspring.m1;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private AmqpTemplate t;
public void send(){
t.convertAndSend("helloworld","Hello-THE-FKING-World");
}
}
Consumer 类
-
添加 @Component 注解自动创建实例。 -
通常我们在消费者端写接收方法时会在类上添加 @RabbitListener 注解,并在接收方法上添加 @RabbitHandler 注解来配合。 这里我们选择直接在方法上添加 @RabbitListener 注解来表示接收。 每个 @RabbitListener 注解,都会注册启动一个消费者。 直接加在方法上面表示该队列只有这一个方法用来接收。 这样做的好处是接收代码可以集中,维护起来更加方便。 -
自定义一个接收方法(receive),上面通过 @RabbitListener 注解限定接收的队列。 方法参数为 String 对象(发送过来的数据),并输出 “收到: ” + 内容。
package cn.tedu.rabbitmqspring.m1;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(queues = "helloworld")
public void receive(String str){
System.out.println("收到: "+str);
}
}
Main 启动类
-
完成 Spring 启动类的基本配置。 -
创建一个 Queue 实例,封装队列参数: 定义一个方法(helloWorldQueue),返回 Queue 队列,该队列设定为非持久队列,并交给容器管理。 -
添加测试代码,调用生产者发送消息: 为了调用生产者,所以进行 Producer 的注入。 定义测试方法(test),利用 Producer 对象调用 send 方法发送信息。 这里可以添加 @PostConstruct 注解,该注解用于初始化资源,可以事先将这些文件加载到内存里,需要时直接从内存调用。 -
spring 的执行流程: 包扫描创建所有实例 --> 完成所有依赖的注入 --> @PostConstruct --> 后续流程 -
这里执行代码时,自动配置类有可能还没有创建队列,消费者可能也还没有启动第一次,执行可能丢失信息。
package cn.tedu.rabbitmqspring.m1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.PostConstruct;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class,args);
}
@Bean
public Queue helloWorldQueue(){
return new Queue("helloworld",false);
}
@Autowired
private Producer p;
@PostConstruct
public void test(){
p.send();
}
}
启动测试:测试有收到消息即可。
M2 项目测试
M2 项目测试为轮询模式(工作模式),即有多台消费者用一个队列接收生产者的模式。
基本项目结构和 M1 类似,我们可以直接复制 M1 项目的三个类到 M2 里。
Producer 类
-
本项目的生产者采用工作模式,向 task_queue 队列发送消息。 -
主要更改自定义发送方法的方法体,工作模式需要不断地向消费者发送信息,所以采用 while-true 死循环来模拟频繁的发送。增加输入消息提示并获取信息。 -
最后还是调用 convertAndSend 方法发送,参数为( 队列名,消息内容,消息预处理器) 此处的消息预处理器,可以重新设置消息的属性,本次测试不采用所以可以不写。
package cn.tedu.rabbitmqspring.m2;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Scanner;
@Component
public class Producer {
@Autowired
private AmqpTemplate t;
public void send(){
while (true) {
System.out.println("输入消息: ");
String str = new Scanner(System.in).nextLine();
t.convertAndSend("task_queue",str);
}
}
}
Consumer 类
- 工作模式下,通常是多个消费者共用一个队列来接收消费者的消息,我们模拟两个消费者来测试。
- 在 M1 的基础上,将队列名改为 task_queue,并复制出另一个发送方法模拟两个同时启动。
- 相应的方法名和输出提示也要更改。
package cn.tedu.rabbitmqspring.m2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(queues = "task_queue")
public void receive1(String str){
System.out.println("Consumer 1 收到: "+str);
}
@RabbitListener(queues = "task_queue")
public void receive2(String str){
System.out.println("Consumer 2 收到: "+str);
}
}
Main 启动类
-
在工作模式中,队列为共用的,所以要创建一个持久型队列,更改队列方法名和队列参数。 -
由于前面发送方法为死循环,send 方法不会停止跳出, 所以PostConstruct也不会继续。 这样就会导致后续流程不会进行,虽然可以一直输出消息,但是消费者端却不会收到消息。 -
所以我们要把 send 方法放到一个新线程里面去执行,不会占用主线程。方法可以用 lambda 表达式省略写。
package cn.tedu.rabbitmqspring.m2;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class,args);
}
@Bean
public Queue taskQueue(){
return new Queue("task_queue");
}
@Autowired
private Producer p;
@PostConstruct
public void test(){
new Thread(()->p.send()).start();
}
}
启动测试:有消息输出即可。
M3 项目测试
M3 项目测试为发布订阅模式(广播模式),即生产者向 fanout 交换机发送消息。
只有与之绑定队列的消费者才能收到信息。
基本项目结构和 M2 类似,我们可以直接复制 M2 项目的三个类到 M3 里。
Producer 类
- 因为本项目是订阅模式测试,所以并不是向队列发送消息,而是向 fanout 交换机发送消息。
- 只需要将发送方法(send)里面调用 convertAndSend 方法更改即可。
- 方法参数为( 交换机名,路由键,发送消息),订阅模式不设置路由键(空串即可)。
- 交换机名设定为 “logs”(其他的也可以,但注意后面的名字需要与其一致)。
package cn.tedu.rabbitmqspring.m3;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Scanner;
@Component
public class Producer {
@Autowired
private AmqpTemplate t;
public void send(){
while (true) {
System.out.println("输入消息: ");
String str = new Scanner(System.in).nextLine();
t.convertAndSend("logs","",str);
}
}
}
Consumer 类
-
订阅模式下,Consumer 需要创建随机队列,然后和交换机绑定。 -
具体更改的地方只有 @RabbitListener 注解的参数部分: M1-M2 测试中都是与队列进行绑定参数,这里我们要使用绑定参数(bindings)。 @RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "logs", declare = "false")
))
其中: 由于群发没有关键词,所以 @QueueBinding 处可以直接绑定。 value = @Queue 当后面没有配置参数时就是默认:随机命名,非持久,独占,自动删除的队列。 交换机参数里面的 declare = “false” 代表:不重新定义,不新创建交换机。 ( 吐槽:这注解里面套注解,是真的不做人 … ) -
最后将配置复制到第二个方法上即可。
package cn.tedu.rabbitmqspring.m3;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "logs",declare = "false")
))
public void receive1(String str){
System.out.println("Consumer 1 收到: "+str);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(name = "logs",declare = "false")
))
public void receive2(String str){
System.out.println("Consumer 2 收到: "+str);
}
}
Main 启动类
-
创建队列方法改为创建交换机,并封装交换机参数: 定义一个创建交换机方法(logsExchange),返回 Fanout 交换机(FanoutExchange)。 交换机参数为( 交换机名,持久化参数,自动删除参数)。 我们这里设置交换机为非持久,非自动删除的交换机,交换机名应于上面一致。
package cn.tedu.rabbitmqspring.m3;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.annotation.PostConstruct;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class,args);
}
@Bean
public FanoutExchange logsExchange(){
return new FanoutExchange("logs",false,false);
}
@Autowired
private Producer p;
@PostConstruct
public void test(){
new Thread(()->p.send()).start();
}
}
启动测试:测试两个消费者均收到消息即为成功。
Day01结束
|