MQ的作用和应用场景
解耦
多个系统之间复杂的调用通过MQ解耦开来, 消息的生产者将消息发送到MQ中时,就不再关心消息的状况; 消费者只需要从MQ中获取消息,而不用关心消息的来源
异步
消息一发送到MQ中,就会直接返回响应的响应,不需要等待结果的处理后返回 提高系统的运行效率和用户的体验,特别是面对高延迟的接口(等待处理信息完毕需要很长的时间)效果显著
削峰
高峰期突然增加的请求会被MQ压制到系统本身能处理的请求数(进多出少),但是剩下请求会积压在MQ中。
MQ引入的缺点
- 系统可用性降低,MQ一旦down机,就会导致整个系统挂掉,即要保证MQ的高可用
- 系统的复杂性增加,需要考虑的事情增多:要保证消息不会丢失,同时还要保证消息的幂等性
- 一致性问题,一旦有一个系统因为某种原因导致请求执行失败,就会与其他系统中的数据产生不同,出现相应的信息差。
各个MQ之间的区别ActiveMQ:
- 缺点:
社区不活跃,偶尔有数据丢失 - 优点:
MQ功能完善且强大,吞吐较高
- RabbitMQ:
缺点: erlang语言开发,使用erlang语言的公司很少 优点: MQ功能相对完善,吞吐量较高 - RocketMQ:
缺点: 如果阿里不维护了,就可能需要自己去维护 优点: 可以支撑大规模的吞吐,源码是基于JAVA的,同时还是分布式的 - KAFKA:
优点: 大数据领域广泛使用,吞吐量超高,易于扩展 缺点: 实现基本Jms功能
ActiveMQ保证高可用性
Zookeeper+LevelDB
搭建Zookeeper集群
参考文档:Zookeeper集群搭建. 1、安装JDK 2、下载Zookeeper的安装包,并解压到/opt/zoocluster目录下 3、复制;创建zoo.cfg文件,在zookeeper安装目录中的data目录下添加myid 文件,并写入节点id 4、在zoo.cfg文件中:修改各自的监听端口号
#添加以下的内容:
#server(固定).1(单个zookeeper的id)=alichoto(主机地址【可以在/etc/hosts中配置主机别名】2888(交换数据的端口):3888(选举端口)
server.1=alichoto:2888:3888
server.2=alichoto:2888:3888
server.3=alichoto:2888:3888
搭建ActiveMQ集群
参考文档:activeMQ官网
修改控制台端口号(conf/jetty.xml) 修改/etc/hosts中主机和名字的映射(模拟多机的情况)实现ip到域名地址的映射 三个节点的BrokerName一致(conf/activemq.xml) 修改消息端口 添加ReplicatedLevelDB的持久化配置
测试
通过任意一台zookeeper的客户端来查看activemq的连接情况 在客户端中输入
ls /activemq/level-stores
成功之后会发现三个active节点 查看主节点 通过get命令打印出各个节点的信息 在java中需要的修改是url改成带故障迁移的
failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false
消息幂等性
1、数据库插入操作设置唯一索引,防止出现脏数据 2、防止重复提交,消费过一次,更新token,存入redis缓存或本地缓存,重复提交时token不存在 3、在并发不高的情况下,消费消息前查看是否redis有消费的记录(用全局id表示),没有则执行业务,有则返回 4、使用分布式锁,分布式环境下使用,通过redis或zookeeper提供分布式锁,业务执行更新或插入的时候,先获取锁,执行业务完后,在释放锁
消息丢失
消息丢失发生地方: 1、生产者发送消息到mq时,出现网络波动等问题,导致消息丢失 2、mq接受到消息,mq宕机(mq集群,消息持久化) 3、消费接受到消息,还没来得及处理,消费者服务器就挂掉了 同步事务以及接收回调机制 同步事务:每次发送消息,只有等待消息的确认之后,才会发送下一组消息 接收回调:异步处理,到时会在生产者端注册一个回调函数确认消费者是否接收到数据,最后告诉生产者是成功还是失败
消息顺序性
消息的发送端:不能异步发送,必须保证消息被接受才能发送下一个消息 消息的存储端:
- 消息不能分区:一个topic只能有一个队列(在activemq中是queue,一个queue只能被一个消费这消费)
- mq宕机的时候,数据复制可能错乱,必须同步复制
消息的接收端:不能多个消费者消费同一个队列,即不能并行消费 所以在保证消息顺序性的同时,会导致吞吐量下降,大部分情况下不关注顺序的业务大量存在
参考资料
《阿里面试技术手册》
|