前言
交换机 (Exchange)
生产者将消息发送到Exchange,有Exchange再路由到一个或多个队列中
路由键 (RoutingKey)
生产者将信息发送给交换机时会指定RoutingKey指定路由规则
绑定键 (BindingKey)
通过绑定键将交换机和队列关联起来,这样RabbitMQ就知道如何正确的将消息路由到队列
关系小结
生产者将消息发送给哪个Exchange是需要由RoutingKey决定的,生产者需要将Exchange与哪个队列绑定时需要由BindingKey决定
交换机类型
直连交换机:Direct exchange
主题交换机:Topic exchange
?
扇形交换机:Fanout exchange?
?首部交换机:Headers exchange
?默认交换机:?
Dead Letter Exchange (死信交换机)
?
交换机的属性
?
?一.直连交换机
1.新建DirectConfig类
创建队列
创建交换机
行交换机和队列的绑定:设置BindingKey
package com.lj.provider.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class DirectConfig {
/**
* 创建队列
* @return
*/
@Bean
public Queue direcrQueueA(){
return new Queue("direcrQueueA",true);
}
@Bean
public Queue direcrQueueB(){
return new Queue("direcrQueueB",true);
}
@Bean
public Queue direcrQueueC(){
return new Queue("direcrQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
/**
* 进行交换机和队列的绑定
* 设置BindingKey
*/
public Binding BindingA(){
return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("aa");
}
public Binding BindingB(){
return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("bb");
}
public Binding BindingC(){
return BindingBuilder.bind(direcrQueueA()).to(directExchange()).with("cc");
}
}
2.建一个ProviderController发送信息
package com.lj.provider;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@SuppressWarnings("all")
public class ProviderController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/directSend")
public String DirectSend(String routingKey){
template.convertAndSend("directExchange",routingKey,"Hello World");
return "yes";
}
}
?这里面是没有Queue的
运行成功
?3.建立3个DirectReceiver类接收消息
package com.lj.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "direcrQueueA")
@Slf4j
public class DirectReceiverA {
@RabbitHandler
public void process(String msg){
log.warn("A接到"+msg);
}
}
?
二.主题交换机
?1.在生产者新建?TopicConfig
package com.lj.provider.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class TopicConfig {
/**
* 定义规则
*/
public final static String KEY_A="*.orange.*";
public final static String KEY_B="*.*.rabbit";
public final static String KEY_C="lazy.#";
/**
* 创建队列
* @return
*/
@Bean
public Queue topicQueueA(){
return new Queue("topicQueueA",true);
}
@Bean
public Queue topicQueueB(){
return new Queue("topicQueueB",true);
}
@Bean
public Queue topicQueueC(){
return new Queue("topicQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
/**
* 进行交换机和队列的绑定
* 设置BindingKey
*/
@Bean
public Binding topicBindingA(){
return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
}
@Bean
public Binding topicBindingB(){
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
}
@Bean
public Binding topicBindingC(){
return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
}
}
2.在 Controller 里面加一个方法
@RequestMapping("/topicSend")
public String topicSend(String routingKey){
template.convertAndSend("topicExchange",routingKey,"Hello World");
return "yes";
}
3.在消费者创建3个接收者TopicReceiver
package com.lj.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueA")
@Slf4j
public class TopicReceiverA {
@RabbitHandler
public void process(String msg){
log.warn("A接到"+msg);
}
}
?
?三.扇形交换机
1.建立FanoutConfig
package com.lj.provider.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@SuppressWarnings("all")
public class FanoutConfig {
/**
* 创建队列
* @return
*/
@Bean
public Queue fanoutQueueA(){
return new Queue("fanoutQueueA",true);
}
@Bean
public Queue fanoutQueueB(){
return new Queue("fanoutQueueB",true);
}
@Bean
public Queue fanoutQueueC(){
return new Queue("fanoutQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
/**
* 进行交换机和队列的绑定
* 设置BindingKey
*/
@Bean
public Binding fanoutBindingA(){
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingB(){
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingC(){
return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
}
}
2.在controller加方法
@RequestMapping("/fanoutSend")
public String fanoutSend(){
template.convertAndSend("fanoutExchange",null,"Hello World");
return "yes";
}
3.在消费者创建3个接收者FanoutReceiver
package com.lj.consumer.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "fanoutQueueA")
@Slf4j
public class FanoutReceiverA {
@RabbitHandler
public void process(String msg){
log.warn("A接到"+msg);
}
}
?
|