什么是RabbitMq
RabbitMQ是一个基于AMQP协议的开源的消息代理和队列服务器。 优点:
- 采用Erlang语言进行开发作为底层语言实现:Erlang有着和原生Socket一样的延迟,所以性能非常高
- 开源、性能优秀,稳定性保障
- 提供可靠性消息投递模式(confirm)、返回模式(return)
- 与SpringAMQP完美整合,API丰富
- 集群模式比较丰富,表达式配置,HA模式,镜像队列模型
- 保证数据不丢失的前提做到高可靠性,可用性
AMOP 专有名词:
- Server:又称为 Broker
- Connection:连接,应用程序和broker之间的网络连接
- Channel:网络信道,一个网络会话的任务
- Message:消息
- Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个Virtual host里面可以有若干个Exchage和Queue,但同一个Virtual host里面不能有相同名称的Exchage和Queue
- Exchange:交换机,接收消息,根据路由键转发消息到绑定队列
- Binding:Exchage和Queue之间的虚拟连接,binding中可以包含routing key
- Routing key:一个路由规则
- Queue:保存具体的消息的容器
RabbitMQ 消息的流转过程: 
应用场景/作用
思考
- 生产端的可靠性投递;
- 如果消息和钱有关,这个消息一定不能丢失
- 需要做到生产端100%投递,就需要和业务数据保证原子性
- 消费端的幂等;
- 生产端如果要做到可靠性投递,可能会有重复投递
- 消费端消费了两次或多次这个数据可能会不一致
- 所以消费端一定要做到同一个请求消费多次得到的结果一样
- MQ 本身需要考虑
- HA:高可用
- 低延迟
- 可靠性:确保数据是完整的
- 堆积能力:这是MQ能扛下你的业务量级的保证
- 扩展性:是否能够天然支持横向扩展无感知扩容
RabbitMq 集群架构原理解析
1)主备模式 master-slave结构,可以理解为热备份,master负责读写,master宕机后切换到slave
2)镜像模式 业界主流使用比较多的模式; RabbitMQ集群非常经典的就是镜像模式,保证数据100%不丢失; 高可用、数据同步低延迟、奇数个节点。
缺点: 镜像队列集群的缺陷是无法进行很好的横向扩容,因为每个节点都是一个完整的互相复制的节点,并且镜像节点过多也会增加MQ的负担,一个数据写入就要复制到多个节点,吞吐量也会降低 
RabbitMq 的安装和使用
RabbitMq-3.8.19安装详解 前面我已经介绍了一篇,再此省略。
修改用户登录和连接心跳
- 将 loopback_users.guest = false,前面的注释去掉
- 将 {heartbeat, 60} 修改为 {heartbeat, 10}
查看MQ端口是否启用:yum -y install lsof
启动插件:
- rabbitmq-plugins enable rabbitmq_management
查看管理后台是否启动
常用命令
systemctl start rabbitmq-server
rabbitmq-server -detached
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl stop_app
rabbitmqctl start_app
rabbitmqctl status
rabbitmqctl add_user username password
rabbitmqctl change_password username password
rabbitmqctl list_users
rabbitmqctl delete_user username
rabbitmqctl list_user_permissions username
rabbitmqctl clear_permissions -p vhostpath username
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
rabbitmqctl set_permissions -p / gavin ".*" ".*" ".*"
rabbitmqctl list_vhosts
rabbitmqctl add_vhost vhostpath
rabbitmqctl list_permissions -p vhostpath
rabbitmqctl delete_vhost vhostpath
rabbitmqctl list_queues
rabbitmqctl -p vhostpath purge_queue queueName
rabbitmqctl reset
springboot 整合 rabbitmq
发送
搭建一个 SpringBoot 工程及准备工作,我在另一篇博客写了,这里不再重复。SpringBoot全局异常处理、集成Swagger和参数必填校验 ,我们现在需要的准备工作,都在这篇准备了。
新增 application.yaml
server:
port: 8088
spring:
rabbitmq:
host: 192.168.150.130
username: guest
password: guest
virtual-host: /
connection-timeout: 10000
消息实体
package cn.com.springboot.vo;
import lombok.Data;
import java.io.Serializable;
@Data
public class OrderInfo implements Serializable {
private String id;
private String orderName;
private String messageId;
}
发送类
package cn.com.springboot.web;
import cn.com.springboot.vo.OrderInfo;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderSender {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final String ORDER_EXCHANGE = "order_exchange";
private static final String ORDER_ROUTING_KEY = "order_r_key";
public void sendOrder(OrderInfo orderInfo){
CorrelationData correlationData = new CorrelationData();
correlationData.setId(orderInfo.getMessageId());
rabbitTemplate.convertAndSend(ORDER_EXCHANGE, ORDER_ROUTING_KEY, orderInfo, correlationData);
}
}
rabbitmq 准备工作
创建 exchange  Exchange的相关属性说明
-
Name:exchange的名称 -
Type类型说明
-
direct:exchange在和queue进行binding时会设置routingkey,只有routingkey完全相同,exchange才会将消息转发到对应的queue上,相当于点对点 -
fanout:直接将消息路由到所有绑定的队列中,无需对消息的routingkey进行匹配操作,因为不绑定routingkey,所有也是消息转发最快的(广播方式) -
topic:此类型的exchange和direct差不多,但direct类型要求routingkey完全相同,而topic可以使用通配符:‘*’,‘#’ 其中‘*’表示匹配一个单词,‘#’则表示匹配没有或者多个单词 -
header:路由规则是根据header来判断 -
总结:一般direct和topic用来具体的路由信息,如果用广播就使用fanout,header类型用的比较少 -
Durability:Durable是持久化到磁盘的意思 -
Auto Delete:如果设置为yes,那么最后一个绑定到exchange上的队列被删除后,exchange也会自动删除 -
Internal:如果为yes则表明该exchange是rabbitmq内部使用,不提供给外部系统应用,一般是在自己编写erlang语言做定制化扩展时使用 -
Arguments这个是扩展AMQP协议时自定义使用的内容
创建Queue  Exchange和Queue通过Binding关联,由routingkey进行路由    
测试发送
package cn.com.springboot.web;
import cn.com.springboot.vo.OrderInfo;
import cn.com.springboot.vo.ResultVo;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@Api("发送消息")
@RestController
public class RabbitMqController {
@Autowired
private OrderSender orderSender;
@PostMapping("/sendOrder")
public ResultVo sender(@RequestBody OrderInfo orderInfo){
orderSender.sendOrder(orderInfo);
return ResultVo.success();
}
}
用 swagger 测试  消息成功发送到队列  注意:
- 一个exchange可以绑定多个queue,只要routingkey一样,一个消息就会发送到多个queue上
- exchange绑定一个queue,无论binding多少个routingkey,只要符合这个routingkey规则的消息都会发送到这个队列中,接收的时候无论从哪个routingkey过来的消息,连接这个队列的消费端都会消费掉,相当于多个消息规则对应一个队列
消息接收
创建另一个工程:consumer-and-producer,搭建步骤和上面的基本一样。 application.yaml
server:
port: 8080
spring:
rabbitmq:
host: 192.168.150.130
username: guest
password: guest
virtual-host: /
connection-timeout: 10000
listener:
simple:
concurrency: 5
max-concurrency: 10
auto-startup: true
prefetch: 1
acknowledge-mode: manual
添加消费类
package cn.com.springboot.web;
import cn.com.springboot.vo.OrderInfo;
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Log4j2
@Component
public class OrderReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order_queue", durable = "true"),
exchange = @Exchange(value = "order_exchange", type = "topic"),
key = "order_r_key"
))
@RabbitHandler
public void receiveOrderInfo(@Payload OrderInfo orderInfo,
@Headers Map<String, Object> headers,
Channel channel) throws IOException {
log.info("开始消费");
log.info("orderName:{}, messageId:{}", orderInfo.getOrderName(), orderInfo.getMessageId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
启动测试 
暂时分享到这,欢迎指正!
|