1 简介
2 基本结构
-
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。 -
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。 -
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。 -
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。 -
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
3 消息发布接收流程
3.1 发送消息
- 生产者和Broker建立TCP连接。
- 生产者和Broker建立通道。
- 生产者通过通道消息发送给Broker,由Exchange将消息进行转发。**
- Exchange将消息转发到指定的Queue(队列)
3.2 接收消息
- 消费者和Broker建立TCP连接
- 消费者和Broker建立通道
- 消费者监听指定的Queue(队列)**
- 当有消息到达Queue时Broker默认将消息推送给消费者。**
- 消费者接收到消息。
4 工作模式(6种)
前四种工作模式存在包含关系:Work queues ∈ Publish/subscribe ∈ Routing ∈ Topics
4.1 Work queues 工作队列
- 多个消费者监听一个消息队列
- 消费者获取消息的方式为轮询:即将消息轮流、均匀地发给不同的消费者
- 一条消息只会被一个消费者接收
- 某消费者在处理完某条消息后,才会收到下一条消息
4.2 Publish/subscribe 发布订阅模式 (exchange type = fanout)
- 不同消费者监听不同队列(当然也可以,此时就是工作队列模式)
- 工作队列模式是发布订阅模式包含的一种情况(不同消费者监听一个队列)
- 需要绑定各消息队列和交换机(不需要routing key)
4.3 Routing 路由模式(exchange type = direct)
- 交换机与消息队列进行绑定时需要设置routing key
- 交换机发送生产者信息时,根据其routing key将消息发送给不同的消息队列,从而给不同的消费者
- 发布订阅模式是路由模式包含的一种情况(即不同的消息队列绑定同一个routing key)
4.4 Topics 通配符模式(exchange type = topic)
4.5 Header模式
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配 队列。
Map headers_email = new Hashtable();
headers_email.put("inform_type", "email");
4.6 RPC 客户端远程调用服务端模式(exchange type = direct)
- 使用MQ可以实现RPC的异步调用
- 客户端通过 RPC请求队列 发送消息,同时监听 RPC响应队列,接受响应信息
- 服务端通过 RPC响应队列 返回响应信息,同时监听 RPC请求队列 ,接收请求信息
- 两者皆即是生产者、也是消费者
5 Spring整合RabbitMQ的步骤
-
添加依赖:在生产者和消费者依赖文件里,均添加以下依赖 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>
-
配置RabbitMQ的参数:在生产者和消费者src文件里,均添加application.yml配置文件 server:
port: 44000
spring:
application:
name: test‐rabbitmq‐producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
-
声明Exchange、Queue,绑定交换机:在生产者和消费者src文件里,均添加RabbitConfig类配置文件
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
public static final String ROUTINGKEY_EMAIL = "inform.#.email.#";
public static final String ROUTINGKEY_SMS = "inform.#.sms.#";
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL() {
return new Queue(QUEUE_INFORM_EMAIL);
}
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
return new Queue(QUEUE_INFORM_SMS);
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
@Bean
public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
- 发送消息:生产端使用
RabbitTemplate 发送消息 - 监听消息:消费端使用
@RabbitListener 注解监听队列。
|