01 RabbitMQ的使用场景
1 解耦、削峰、异步
同步异步的问题(串行)
串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
并行方式 异步线程池
并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间。 代码示例
public void makeOrder(){
orderService.saveOrder();
relationMessage();
}
public void relationMessage(){
theadpool.submit(new Callable<Object>{
public Object call(){
messageService.sendSMS("order");
}
})
theadpool.submit(new Callable<Object>{
public Object call(){
emailService.sendEmail("order");
}
})
theadpool.submit(new Callable<Object>{
public Object call(){
appService.sendApp("order");
}
})
theadpool.submit(new Callable<Object>{
public Object call(){
appService.sendApp("order");
}
})
}
存在问题: 1:耦合度高 2:需要自己写线程池自己维护成本太高 3:出现了消息可能会丢失,需要你自己做消息补偿 4:如何保证消息的可靠性你自己写 5:如果服务器承载不了,你需要自己去写高可用
异步消息队列的方式
好处 1:完全解耦,用MQ建立桥接 2:有独立的线程池和运行模型 3:出现了消息可能会丢失,MQ有持久化功能 4:如何保证消息的可靠性,死信队列和消息转移的等 5:如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用。 按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍
2 高内聚、低耦合
04、分布式事务的可靠消费和可靠生产 05、索引、缓存、静态化处理的数据同步 06、流量监控 07、日志监控(ELK) 08、下单、订单分发、抢票
2 SpringBoot整合RabbitMQ实现fanout模式
使用springboot完成rabbitmq的消费模式-Fanout
新建SpringBoot项目 在application.yml中配置连接rabbitmq:
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: XXXXXX
port: 5672
生产者订单业务OrderService
package com.sl.rabbitmq.springbootorderrabbitmqproducer.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void makeOrder(int userId, int productId, int num) {
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功" + orderId);
String exchangeName = "fanout_order_exchange";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
}
在配置类中完成交换机和队列的绑定关系
package com.sl.rabbitmq.springbootorderrabbitmqproducer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfiguration {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout_order_exchange", true, false);
}
@Bean
public Queue smsQueue(){
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue emailQueue(){
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue duanxinQueue(){
return new Queue("duanxin.fanout.queue", true);
}
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding(){
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding duanxinBinding(){
return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
}
}
测试
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
orderService.makeOrder(1, 1, 12);
}
}
结果: 绑定关系创建完成
消费者部分
新建一个module 同样配置application.yml
# 服务端口
server:
port: 8081
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: XXXXXX
port: 5672
创建三个消费者类
package com.sl.rabbitmq.springbootorderrabbitmqconsumer.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class SMSConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("sms fanout--接收到了订单信息:" + message);
}
}
两个注解 @RabbitListener指定类或方法消费某交换机或队列的消息 @RabbitHandler为消息的落脚点,在这里会作为message参数传入注解的方法中
启动生产者模块 再次启动生产者测试分发消息 结果:
3 Springboot的Direct模式实现
配置绑定 DirectRabbiitMqConfiguration类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitMqConfiguration {
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct_order_exchange", true, false);
}
@Bean
public Queue directsmsQueue(){
return new Queue("sms.direct.queue", true);
}
@Bean
public Queue directemailQueue(){
return new Queue("email.direct.queue", true);
}
@Bean
public Queue directduanxinQueue(){
return new Queue("duanxin.direct.queue", true);
}
@Bean
public Binding smsBinding(){
return BindingBuilder.bind(directsmsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding emailBinding(){
return BindingBuilder.bind(directemailQueue()).to(directExchange()).with("email");
}
@Bean
public Binding duanxinBinding(){
return BindingBuilder.bind(directduanxinQueue()).to(directExchange()).with("duanxin");
}
}
生产者方法
public void makeOrderDirect(int userId, int productId, int num) {
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功" + orderId);
String exchangeName = "direct_order_exchange";
rabbitTemplate.convertAndSend(exchangeName, "email", orderId);
rabbitTemplate.convertAndSend(exchangeName, "duanxin", orderId);
}
消费者
@Service
@RabbitListener(queues = {"duanxin.direct.queue"})
public class DirectDuanxinConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("duanxin direct--接收到了订单信息:" + message);
}
}
运行消费者服务,运行生产者测试 结果:
|