为何要搭建RabbitMQ集群?
如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案。
集群方案原理
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
设置多个RabbitMQ的节点,节点之间使用镜像队列来同步数据,并对外使用HAProxy反向代理,那么HAProxy就作为 对外访问的公共方式,不论是生产者还是消费者都访问HAProxy,再通过HAProxy访问RabbitMQ的节点,若其中的某些节点挂了也没关系,只要有一个存活就好,挂掉的节点修复好后数据又会被同步好。
单机多实例方式搭建集群
在一台虚拟机上启动2个RabbitMQ的节点,多节点之间用端口区分(实际搭建集群式用ip区分)。用云服务器的,记得去开启对应端口。
参考官方文档:https://www.rabbitmq.com/clustering.html 步骤:
1.先确保RabbitMQ运行没有问题 命令:rabbitmqctl status
2.停止rabbitmq服务 命令:service rabbitmq-server stop 3.启动第一个节点: 命令: RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start 4.启动完第一个节点后,克隆会话用于启动第二条节点 web管理插件端口占用,所以还要指定其web插件占用的端口号。
命令: RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start 再复制一个会话进行节点重置 5.rabbit1操作作为主节点: 依次执行以下命令: rabbitmqctl -n rabbit1 stop_app rabbitmqctl -n rabbit1 reset rabbitmqctl -n rabbit1 start_app 6.rabbit2操作为从节点: 依次执行以下命令: rabbitmqctl -n rabbit2 stop_app rabbitmqctl -n rabbit2 reset rabbitmqctl -n rabbit2 join_cluster rabbit1@‘super’ ###’'内是主机名换成自己的 rabbitmqctl -n rabbit2 start_app 7.查看集群状态 rabbitmqctl cluster_status -n rabbit1
至此,rabbit1和rabbit2组成了一个集群,但是还不完整,有许多问题(如数据不同步)
主节点: 可以看创建了的两个队列
从节点: 可以看创建了的两个队列
给从节点上的rabbit2队列发一条消息
再看主节点: 也可以看到rabbit2队列有一条消息 在正常运行的时候好像莫得什么问题,这时手动让rabbit2从节点挂掉 再从主节点看: 直接拿不到数据了 刚发的消息是给从节点,从节点没挂之前主节点可以拿到数据,从节点挂了后直接就?,这说明消息实际存在于自己的节点上,也就是数据没同步。重启了刚挂掉的从节点,消息也没了。。。
集群管理
rabbitmqctl join_cluster {cluster_node} [–ram] 将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。
rabbitmqctl cluster_status 显示集群的状态。
rabbitmqctl change_cluster_node_type {disc|ram} 修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。
rabbitmqctl forget_cluster_node [–offline] 将节点从集群中删除,允许离线执行。
rabbitmqctl update_cluster_nodes {clusternode}
在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。
rabbitmqctl cancel_sync_queue [-p vhost] {queue} 取消队列queue同步镜像的操作。
镜像集群配置
上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。
镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。
设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。
命令的方式: rabbitmqctl set_policy my_ha “^” ‘{“ha-mode”:“all”}’
通过开启的网页的管理端:
- Name:策略名称
- Pattern:匹配的规则,如果是匹配所有的队列,是^.
- Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档。
在进行镜像集群的配置后: 从主节点看: 将鼠标放在rabbit1上,可以看到提示它同步了rabbit2节点。 那个+几就表示要同步到几个节点上去
在从节点看: 可以看到rabbit2同步着rabbit1。
这样两个节点挂掉一个,另一个也不会丢失数据,挂掉的那个重新启动后也会重新同步。
但又有新的问题出现了,当我们通过代码访问RabbitMQ时,是访问第一个节点呢?还是第二个?访问的那一个挂了咋办?我们期望的是写一个统一的ip和端口号,不管是生产端还是消费端,都通过这个ip和端口来访问RabbitMQ上的随机节点。
负载均衡-HAProxy
安装HAProxy
//下载依赖包 yum install gcc vim wget
//上传haproxy源码包
//解压 tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//进入目录、进行编译、安装 cd /usr/local/haproxy-1.6.5 make TARGET=linux31 PREFIX=/usr/local/haproxy make install PREFIX=/usr/local/haproxy //赋权 groupadd -r -g 149 haproxy useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
//创建haproxy配置文件 mkdir /etc/haproxy vim /etc/haproxy/haproxy.cfg
配置HAProxy
配置文件路径:/etc/haproxy/haproxy.cfg
global
log 127.0.0.1 local0 info
maxconn 5120
chroot /usr/local/haproxy
uid 99
gid 99
daemon
quiet
nbproc 20
pidfile /var/run/haproxy.pid
defaults
log global
mode tcp
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s
clitimeout 60s
srvtimeout 15s
listen rabbitmq_cluster
bind 0.0.0.0:5672
mode tcp
balance roundrobin
server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2
listen stats
bind 172.16.98.133:8100
mode http
option httplog
stats enable
stats uri /rabbitmq-stats
stats refresh 5s
启动HAproxy负载
启动命令:/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
//查看haproxy进程状态 ps -ef | grep haproxy
访问如下地址对mq节点进行监控(内网ip换成自己上面配的的) http://172.16.98.133:8100/rabbitmq-stats 代码中访问mq集群地址,则变为访问haproxy地址:5672
代码测试集群
helloworld测试类:
public class ProducerHelloWorldHAProxyTest {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("106.15.50.230");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue_haproxy",true,false,false,null);
String body = "hello haproxy~";
channel.basicPublish("","queue_haproxy",null,body.getBytes());
channel.close();
connection.close();
}
}
运行后,查看RabbitMQManagement: 可以看到成功创建了队列,并发送了一条消息
这时我们挂掉一个节点,: 再运行一下上面的代码后: 可以看到正常存活的节点上的队列收到了消息 再重启被挂掉的节点: 去控制台查看: 同步了数据
|