基本概念
了解rabbitmq之前先要了解3个基本概念:生产者 、消费者 、代理(队列 )。 rabbitmq在生产者和代理中间做了一层抽象。这样消息生产者和队列就没有直接联系,在中间加入了一层交换器(Exchange)。这样消息生产者把消息交给交换器,交换器根据路由策略再把消息转发给对应队列。 黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系(后面会详细讲)将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。
常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种
-
Direct Exchange 直连型交换机 根据消息携带的路由键将消息投递给对应队列。 大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。 然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列 -
Fanout Exchange 扇型交换机 这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列 -
Topic Exchange 主题交换机 这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。 简单地介绍下规则: * (星号) 用来表示一个单词 (必须出现的) # (井号) 用来表示任意数量(零个或多个)单词 通配的绑定键是跟队列进行绑定的,举个小例子 队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.# 如果一条消息携带的路由键为 A.TT.B ,那么队列Q1将会收到; 如果一条消息携带的路由键为TT.AA.BB ,那么队列Q2将会收到;
集成SpringBoot
- 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 编写配置文件
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: test
publisher-confirm-type: correlated
publisher-returns: true
- 声明交换机,队列,以及绑定关系
Direct 直流交换机
@Configuration
public class DirectRabbitConfig {
private String routing = "test-direct-routing";
@Bean
public Queue createDirectQueue(){
return new Queue("test-direct-queue",true);
}
@Bean
public DirectExchange createDirectExchange(){
return new DirectExchange("test-direct-exchange",true,false);
}
@Bean
public Binding createDirectBinding(){
return BindingBuilder.bind(createDirectQueue()).to(createDirectExchange()).with(routing);
}
}
注意:因为direct是直连交换机,直连交换机是一对一,如果配置多个监听器到同一个队列,会发现他们是轮询消费,并会重复消费
Fanout: 扇形交换机
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue createFanoutAQueue(){
return new Queue("fanout.A",true);
}
@Bean
public Queue createFanoutBQueue(){
return new Queue("fanout.B",true);
}
@Bean
public Queue createFanoutCQueue(){
return new Queue("fanout.C",true);
}
@Bean
public FanoutExchange createFanoutExchange(){
return new FanoutExchange("test-fanout-exchange",true,false);
}
@Bean
public Binding createBindingeA(){
return BindingBuilder.bind(createFanoutAQueue()).to(createFanoutExchange());
}
@Bean
public Binding createBindingeB(){
return BindingBuilder.bind(createFanoutBQueue()).to(createFanoutExchange());
}
@Bean
public Binding createBindingeC(){
return BindingBuilder.bind(createFanoutCQueue()).to(createFanoutExchange());
}
}
因为fanout 扇形交换机不需要routing 路由,所以当发送消息到交换机的时候,交换机会将消息发送到所有绑定到他的队列。
Topic: 主题交换机
@Configuration
public class TopicRabbitConfig {
public final static String MAN = "topic.man";
public final static String WO_MAN = "topic.#";
public final static String TEST_MAN_TOPIC_QUEUE = "test-man-topic-queue";
public final static String TEST_WOMAN_TOPIC_QUEUE = "test-woman-topic-queue";
public final static String TEST_WOMAN_TOPIC_EXCHANGE = "test-topic-exchange";
@Bean
public Queue createManTopicQueue(){
return new Queue(TEST_MAN_TOPIC_QUEUE,true);
}
@Bean
public Queue createWomanTopicQueue(){
return new Queue(TEST_WOMAN_TOPIC_QUEUE,true);
}
@Bean
public TopicExchange createTopicExchange(){
return new TopicExchange(TEST_WOMAN_TOPIC_EXCHANGE,true,false);
}
@Bean
public Binding createTopicBinding(){
return BindingBuilder.bind(createManTopicQueue()).to(createTopicExchange()).with(MAN);
}
@Bean
public Binding createTopicBinding2(){
return BindingBuilder.bind(createWomanTopicQueue()).to(createTopicExchange()).with(WO_MAN);
}
}
当发送的绑定键对topic.man 时,两个队列的监听器都可以收到消息
当发送的绑定键时topic.# 时,只有一个队列test-woman-topic-queue的监听器可以收到消息
发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/direct/sendMsg")
public String directSendMsg(){
String msg = "hello World";
rabbitTemplate.convertAndSend(DirectRabbitConfig.EXCHANGE_NAME,DirectRabbitConfig.ROUTING,msg);
return "ok~";
}
监听队列
@Component
public class TestDirectListener {
@RabbitListener(queues = "test-direct-queue")
public void receiver5(Message msg, Channel channel) throws IOException, InterruptedException {
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
System.out.println("队列1消费消息{}"+message);
}
}
convertSendAndReceive方法与convertAndSend方法的区别
convertSendAndReceive(…) :可以同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才触发另一个convertSendAndReceive(…),也就是才会接收下一条消息。RPC调用方式。 convertAndSend(…) :使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。
|