交换机(Exchange)
?
?交换机的属性
Name:交换机名称
Type:交换机类型,direct,topic,fanout,headers
?Durability:是否需要持久化,如果持久性,则RabbitMQ重启后,交换机还存在
Auto Delet:当最后一个绑定到Exchange的队列删除后,自动删除改Exchange
Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False
Arguments:扩展参数,用于扩展AMQP协议定制化使用
一、直连交换机
1、生产者
①、application.yml
server:
port: 8081
spring:
application:
name: provider
rabbitmq:
host: 192.168.211.128
password: 123456
port: 5672
username: springboot
virtual-host: my_vhost
?②、DirectConfig:
package com.example.provider.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class DirectConfig {
/**
* 创建队列
* @return
*/
@Bean
public Queue directQueueA(){
return new Queue("directQueueA",true);
}
@Bean
public Queue directQueueB(){
return new Queue("directQueueB",true);
}
@Bean
public Queue directQueueC(){
return new Queue("directQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
/**
* 进行交换机和队列的绑定,设置bindingkey
*/
@Bean
public Binding BindingA(){
return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA");
}
@Bean
public Binding BindingB(){
return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB");
}
@Bean
public Binding BindingC(){
return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC");
}
}
③、ProviderController
package com.example.provider;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@SuppressWarnings("all")
public class ProviderController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/directSend")
public String directSend(String routingKey){
template.convertAndSend("directExchange",routingKey,"hello world");
return "yes";
}
}
?④、消费者application.yml文件
server:
port: 8082
spring:
application:
name: consumer
rabbitmq:
host: 192.168.211.128
password: 123456
port: 5672
username: springboot
virtual-host: my_vhost
⑤、运行测试
?
?此时RabbitMQ队列中就有了三个消息队列
2、消费者?
?①、DirectReceiverA
package com.example.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues="directQueueA")
@Slf4j
public class DirectReceiverA {
@RabbitHandler
public void process(String message){
log.info("A接到"+message);
}
}
②、DirectReceiverB?
package com.example.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues="directQueueB")
@Slf4j
public class DirectReceiverB {
@RabbitHandler
public void process(String message){
log.info("B接到"+message);
}
}
③、DirectReceiverC?
package com.example.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues="directQueueC")
@Slf4j
public class DirectReceiverC {
@RabbitHandler
public void process(String message){
log.info("C接到"+message);
}
}
运行测试
?二、主题交换机
1、生产者
①、TopicConfig?
package com.example.provider.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class TopicConfig {
public final static String KEY_A="*.orange.*";
public final static String KEY_B="*.*.rabbit";
public final static String KEY_C="lazy.#";
/**
* 创建队列
* @return
*/
@Bean
public Queue topicQueueA(){
return new Queue("topicQueueA",true);
}
@Bean
public Queue topicQueueB(){
return new Queue("topicQueueB",true);
}
@Bean
public Queue topicQueueC(){
return new Queue("topicQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
/**
* 进行交换机和队列的绑定,设置bindingkey
*/
@Bean
public Binding topicBindingA(){
return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
}
@Bean
public Binding topicBindingB(){
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
}
@Bean
public Binding topicBindingC(){
return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
}
}
②、ProviderController
@RequestMapping("/topicSend")
public String toipcSend(String routingKey){
template.convertAndSend("topicExchange",routingKey,"hello world");
return "yes";
}
③、运行测试
符合TopicConfig 中的规则就会增加队列
2、消费者
①、TopicReceiverA?
package com.example.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues="topicQueueA")
@Slf4j
public class TopicReceiverA {
@RabbitHandler
public void process(String message){
log.warn("A接到"+message);
}
}
②、TopicReceiverB
package com.example.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues="topicQueueB")
@Slf4j
public class TopicReceiverB {
@RabbitHandler
public void process(String message){
log.warn("B接到"+message);
}
}
③、TopicReceiverC?
package com.example.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues="topicQueueC")
@Slf4j
public class TopicReceiverC {
@RabbitHandler
public void process(String message){
log.warn("C接到"+message);
}
}
三、扇形交换机?
1、生产者
①、FanoutConfig?
package com.example.provider.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class FanoutConfig {
/**
* 创建队列
* @return
*/
@Bean
public Queue fanoutQueueA(){
return new Queue("fanoutQueueA",true);
}
@Bean
public Queue fanoutQueueB(){
return new Queue("fanoutQueueB",true);
}
@Bean
public Queue fanoutQueueC(){
return new Queue("fanoutQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
/**
* 进行交换机和队列的绑定,设置bindingkey
*/
@Bean
public Binding fanoutBindingA(){
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingB(){
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingC(){
return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
}
}
②、ProviderController
@RequestMapping("/fanoutSend")
public String fanoutSend(){
template.convertAndSend("fanoutExchange",null,"hello world");
return "yes";
}
2、消费者
①、FanoutReceiverA?
package com.example.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues="fanoutQueueA")
@Slf4j
public class FanoutReceiverA {
@RabbitHandler
public void process(String message){
log.error("A接到"+message);
}
}
?②、FanoutReceiverB?
package com.example.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues="fanoutQueueB")
@Slf4j
public class FanoutReceiverB {
@RabbitHandler
public void process(String message){
log.error("B接到"+message);
}
}
③、FanoutReceiverC?
package com.example.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues="fanoutQueueC")
@Slf4j
public class FanoutReceiverC {
@RabbitHandler
public void process(String message){
log.error("C接到"+message);
}
}
运行测试?
本篇博客有大量的代码重复,大家在测试时要注意细小的细节,比如方法名重复或者调用重复方法,大量重复代码只是为了给大家进行演示,希望能帮到你们
bye~
|