RabbitMQ高级
基础部分点击 RabbitMQ基础 演示代码下载 本文使用spring编写程序
1. 使用spring整合RabbitMQ
<rabbit:connection-factory ></rabbit:connection-factory> 定义连接对象
<rabbit:admin connection-factory=" "></rabbit:admin> 定义管理交换机 队列
<rabbit:queue></rabbit:queue> 定义队列
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange> 定义某种类型的交换机并绑定队列
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
</rabbit:listener-container> 消费端 监听队列 并绑定继承MessageListener类的子类进行业务操作
举例
生产端
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:queue id="spring_topic_queue_star" name="spring_topic_queue_star" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_well" name="spring_topic_queue_well" auto-declare="true"/>
<rabbit:queue id="spring_topic_queue_well2" name="spring_topic_queue_well2" auto-declare="true"/>
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="lxs.*" queue="spring_topic_queue_star"/>
<rabbit:binding pattern="lxs.#" queue="spring_topic_queue_well"/>
<rabbit:binding pattern="xzk.#" queue="spring_topic_queue_well2"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
消费端
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>
<rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>
<rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>
<rabbit:listener ref="topicListenerStar" queue-names="spring_topic_queue_star"/>
<rabbit:listener ref="topicListenerWell" queue-names="spring_topic_queue_well"/>
<rabbit:listener ref="topicListenerWell2" queue-names="spring_topic_queue_well2"/>
</rabbit:listener-container>
在代码中
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testQueue() {
rabbitTemplate.convertAndSend("routingKey", "rabbitTemplate发送消息");
}
convertAndSend方法参数
)]
2.消费端限流
<rabbit:listener-container connection-factory="connectionFactory"
</rabbit:listener-container>
3.TTL
Time To Live,消息过期时间设置 可以通过ttl属性来进行消息的定时失效 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。
- 队列过期后,会将队列所有消息全部移除。
- 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除掉)
- 通过xml配置整个队列的ttl
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
2 通过MessagePostProcessor给单条消息设置过期时间
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
return message;
}
};
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.hehe", "message ttl....", messagePostProcessor);
4.死信队列
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX
消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队
列,requeue=false; - 原队列存在消息过期设置,消息到达超时时间未被消费;
?
队列绑定死信交换机:
? 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
这样一来 当消息过期后就会被转发到死信交换机 进行处理(其实还是正常的交换机相当于对过期的消息绑定一个交换机 过期了就去绑定的这个交换机 )
5. 延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。 需求:
- 下单后,30分钟未支付,取消订单,回滚库存。
- 新用户注册成功7天后,发送短信问候
TTL+死信队列 组合实现延迟队列的效果
<rabbit:queue id="order_queue" name="order_queue">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
|