导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<!-- swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
配置文件
# rabbitMQ ip地址
spring.rabbitmq.host=192.168.49.128
# rabbitMQ 端口号
spring.rabbitmq.port=5672
# rabbitMQ 用户名
spring.rabbitmq.username=root
# rabbitMQ 密码
spring.rabbitmq.password=root
# 开启发布确认 none 禁止发布确认 是默认的 correlated 发布消息成功到交换器后会回调方法 simple 类似于发一次确认一次
spring.rabbitmq.publisher-confirm-type=correlated
# 消息由交换器发送到队列失败后退回
spring.rabbitmq.publisher-returns=true
?Swagger配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webapiInfo())
.select().build();
}
private ApiInfo webapiInfo(){
return new ApiInfoBuilder()
.title("rabbitmq接口文档")
.description("文本档描述了rabbitmq微服务接口定义")
.version("1.0")
.contact(new Contact("xi","http://atguigu.com","1374216232@qq.com"))
.build();
}
}
队列及交换机配置类
package com.xi.rabbitmq.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* TTL队列 配置文件类代码
*/
@Configuration
public class TtlQueueConfig {
// 普通交换机名称
public static final String X_EXCHANGE = "X";
// 死信交换机名称
public static final String Y_DEAD_LETTER_EXCHANGE = "X";
// 普通队列名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
// 死信队列名称
public static final String DEAD_LETTER_QUEUE = "QD";
// 普通队列的名称
public static final String QUEUE_C = "QC";
// 声明普通交换机 xExchange 是别名
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
/**
* 声明死信交换机
* DirectExchange 直接交换机
* TopicExchange 主题交换机
* FanoutExchange 扇形交换机
* HeadersExchange 首部交换机
* CustomExchange 自定义交换机
*/
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列 10S
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
// 设置TTL 单位是毫秒
arguments.put("x-message-ttl", 10000);
// 第二种方法 :进入QueueBuilder类里面看原码构建 ttl(1000).
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
// 声明队列 40S
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
// 设置TTL 单位是毫秒
arguments.put("x-message-ttl", 40000);
// 第二种方法 :进入QueueBuilder类里面看原码构建 ttl(1000).
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
// 死信队列
@Bean("queueD")
public Queue queueD() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
// 声明队列 时间不是死值
@Bean("queueC")
public Queue queueC() {
Map<String, Object> arguments = new HashMap<>();
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
// 第二种方法 :进入QueueBuilder类里面看原码构建 ttl(1000).
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
// 队列与交换机绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
配置消费者
import com.xi.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class Consumer {
// 接收消息
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("接收到的队列confirm.queue消息:{}",msg);
}
}
controller层
import com.xi.rabbitmq.springbootrabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
// 测试确认
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public String sendMessage(@PathVariable("message") String message) {
CorrelationData correlationData = new CorrelationData("1");
log.info("发送消息为:{}",message);
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
return "ok";
}
}
|