依赖spring-boot-starter-amqp快速整合消息队列,实现高效异步处理任务和解耦合
本文参考3W学习方法来叙述内容。
一、What
1、RabbitMQ是什么?
消息队列中间件,同类产品有ActiveMQ、 Kafka、ZeroMQ、 RocketMQ 等。
2、消息队列是什么?
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
二、Why
1、RabbitMQ这类消息队列中间件能做什么?应用场景?
异步处理那些不需要及时处理的任务,随着业务升级、初期业务逻辑中的部分同步操作可以拆分出能异步执行的子任务,比如接发发短信、订单处理操作、红包方法等,通过消息队列实现高可用、解耦合。
三、How
1、springboot工程如何整合RabbitMQ?
1.1 引入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.4</version>
</dependency>
1.2 application.yml配置ip、端口、账号等基本信息
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
listener:
simple:
####开启手动ack
acknowledge-mode: manual
retry:
####开启消费者异常进行重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 3000
1.3 创建配置类,配置Exchange和Queue
package com.example.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Breaker-93
* @date 2021/8/25 10:37
*/
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_TEST = "queue_test1";
public static final String EXCHANGE_TEST = "exchange_test1";
public static final String ROUTE_TEST = "test1.#";
/**
* 声明交换机
*
* @return
*/
@Bean(EXCHANGE_TEST)
public Exchange EXCHANGE_TEST() {
// durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TEST).durable(true).build();
}
/**
* 声明队列
*
* @return
*/
@Bean(QUEUE_TEST)
public Queue QUEUE_TEST() {
return new Queue(QUEUE_TEST);
}
/**
* 队列绑定交换机
*/
@Bean
public Binding ROUTE_TEST(@Qualifier(QUEUE_TEST) Queue queue, @Qualifier(EXCHANGE_TEST) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTE_TEST).noargs();
}
}
1.4 生产者发送消息
package com.example.springbootrabbitmq;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import com.example.springbootrabbitmq.config.RabbitmqConfig;
/**
* @author Breaker-93
* @date 2021/8/25 12:41
*/
@SpringBootTest
public class ProducerTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void Producer_topics_springbootTest() {
// 使用rabbitTemplate发送消息
String message = "Hello, rabbit mq";
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TEST, "test1.xyj", message);
}
}
1.5 消费者监听队列,处理消息
package com.example.springbootrabbitmq;
import java.io.IOException;
import java.util.Map;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import com.example.springbootrabbitmq.config.RabbitmqConfig;
import com.rabbitmq.client.Channel;
/**
* @author Breaker-93
* @date 2021/8/25 13:39
*/
@Component
public class ReceiveHandler {
/**
* 队列监听
*
* @param msg
* @param message
* @param channel
*/
@RabbitListener(queues = {RabbitmqConfig.QUEUE_TEST})
public void receive(@Headers Map<String, Object> headers, Message message, Channel channel) throws IOException {
System.out.println("QUEUE_TEST msg: " + message);
// 手动ack
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动签收
channel.basicAck(deliveryTag, false);
}
}
|