IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ-集群搭建、负载均衡 -> 正文阅读

[大数据]RabbitMQ-集群搭建、负载均衡

为何要搭建RabbitMQ集群?

如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案。

集群方案原理

RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。

在这里插入图片描述
设置多个RabbitMQ的节点,节点之间使用镜像队列来同步数据,并对外使用HAProxy反向代理,那么HAProxy就作为 对外访问的公共方式,不论是生产者还是消费者都访问HAProxy,再通过HAProxy访问RabbitMQ的节点,若其中的某些节点挂了也没关系,只要有一个存活就好,挂掉的节点修复好后数据又会被同步好。

单机多实例方式搭建集群

在一台虚拟机上启动2个RabbitMQ的节点,多节点之间用端口区分(实际搭建集群式用ip区分)。用云服务器的,记得去开启对应端口。

参考官方文档:https://www.rabbitmq.com/clustering.html
步骤:

1.先确保RabbitMQ运行没有问题
命令:rabbitmqctl status
在这里插入图片描述

2.停止rabbitmq服务
命令:service rabbitmq-server stop
在这里插入图片描述
3.启动第一个节点:
命令:
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start
在这里插入图片描述
4.启动完第一个节点后,克隆会话用于启动第二条节点
web管理插件端口占用,所以还要指定其web插件占用的端口号。

命令: RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start
在这里插入图片描述
再复制一个会话进行节点重置
5.rabbit1操作作为主节点:
依次执行以下命令:
rabbitmqctl -n rabbit1 stop_app
rabbitmqctl -n rabbit1 reset
rabbitmqctl -n rabbit1 start_app
在这里插入图片描述
6.rabbit2操作为从节点:
依次执行以下命令:
rabbitmqctl -n rabbit2 stop_app
rabbitmqctl -n rabbit2 reset
rabbitmqctl -n rabbit2 join_cluster rabbit1@‘super’ ###’'内是主机名换成自己的
rabbitmqctl -n rabbit2 start_app
在这里插入图片描述
7.查看集群状态
rabbitmqctl cluster_status -n rabbit1

至此,rabbit1和rabbit2组成了一个集群,但是还不完整,有许多问题(如数据不同步)

主节点:
可以看创建了的两个队列
在这里插入图片描述

从节点:
可以看创建了的两个队列
在这里插入图片描述

给从节点上的rabbit2队列发一条消息
在这里插入图片描述
在这里插入图片描述

再看主节点:
也可以看到rabbit2队列有一条消息
在这里插入图片描述
在正常运行的时候好像莫得什么问题,这时手动让rabbit2从节点挂掉
在这里插入图片描述
再从主节点看:
直接拿不到数据了
在这里插入图片描述
刚发的消息是给从节点,从节点没挂之前主节点可以拿到数据,从节点挂了后直接就?,这说明消息实际存在于自己的节点上,也就是数据没同步。重启了刚挂掉的从节点,消息也没了。。。
在这里插入图片描述

集群管理

rabbitmqctl join_cluster {cluster_node} [–ram]
将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。

rabbitmqctl cluster_status
显示集群的状态。

rabbitmqctl change_cluster_node_type {disc|ram}
修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。

rabbitmqctl forget_cluster_node [–offline]
将节点从集群中删除,允许离线执行。

rabbitmqctl update_cluster_nodes {clusternode}

在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新相应的集群信息。这个和join_cluster不同,它不加入集群。考虑这样一种情况,节点A和节点B都在集群中,当节点A离线了,节点C又和节点B组成了一个集群,然后节点B又离开了集群,当A醒来的时候,它会尝试联系节点B,但是这样会失败,因为节点B已经不在集群中了。

rabbitmqctl cancel_sync_queue [-p vhost] {queue}
取消队列queue同步镜像的操作。

镜像集群配置

上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列。

镜像队列是基于普通的集群模式的,然后再添加一些策略,所以你还是得先配置普通集群,然后才能设置镜像队列,我们就以上面的集群接着做。

设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。

命令的方式: rabbitmqctl set_policy my_ha “^” ‘{“ha-mode”:“all”}’

通过开启的网页的管理端:
在这里插入图片描述

  • Name:策略名称
  • Pattern:匹配的规则,如果是匹配所有的队列,是^.
  • Definition:使用ha-mode模式中的all,也就是同步所有匹配的队列。问号链接帮助文档。

在进行镜像集群的配置后:
从主节点看:
在这里插入图片描述
将鼠标放在rabbit1上,可以看到提示它同步了rabbit2节点。
那个+几就表示要同步到几个节点上去

在从节点看:
在这里插入图片描述
可以看到rabbit2同步着rabbit1。

这样两个节点挂掉一个,另一个也不会丢失数据,挂掉的那个重新启动后也会重新同步。

但又有新的问题出现了,当我们通过代码访问RabbitMQ时,是访问第一个节点呢?还是第二个?访问的那一个挂了咋办?我们期望的是写一个统一的ip和端口号,不管是生产端还是消费端,都通过这个ip和端口来访问RabbitMQ上的随机节点。

负载均衡-HAProxy

安装HAProxy

//下载依赖包
yum install gcc vim wget

//上传haproxy源码包
在这里插入图片描述

//解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local

//进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
//赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy

//创建haproxy配置文件
mkdir /etc/haproxy
vim /etc/haproxy/haproxy.cfg

配置HAProxy

配置文件路径:/etc/haproxy/haproxy.cfg

#logging options
global
	log 127.0.0.1 local0 info
	maxconn 5120
	chroot /usr/local/haproxy
	uid 99
	gid 99
	daemon
	quiet
	nbproc 20
	pidfile /var/run/haproxy.pid

defaults
	log global
	
	mode tcp

	option tcplog
	option dontlognull
	retries 3
	option redispatch
	maxconn 2000
	contimeout 5s
   
     clitimeout 60s

     srvtimeout 15s	
#front-end IP for consumers and producters

listen rabbitmq_cluster
	bind 0.0.0.0:5672	#对外提供服务的端口
	
	mode tcp
	#balance url_param userid
	#balance url_param session_id check_post 64
	#balance hdr(User-Agent)
	#balance hdr(host)
	#balance hdr(Host) use_domain_only
	#balance rdp-cookie
	#balance leastconn
	#balance source //ip
	
	balance roundrobin
					 #节点的ip
        server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
        server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2

listen stats
	bind 172.16.98.133:8100	#HAProxy管理控制台的地址,内网ip记得换
	mode http
	option httplog
	stats enable
	stats uri /rabbitmq-stats
	stats refresh 5s

启动HAproxy负载

启动命令:/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg

//查看haproxy进程状态
ps -ef | grep haproxy

访问如下地址对mq节点进行监控(内网ip换成自己上面配的的)
http://172.16.98.133:8100/rabbitmq-stats
在这里插入图片描述
代码中访问mq集群地址,则变为访问haproxy地址:5672

代码测试集群

helloworld测试类:

public class ProducerHelloWorldHAProxyTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置参数
        factory.setHost("106.15.50.230");   //ip,换成HAProxy对外服务的ip
        factory.setPort(5672);  //端口 换成HAProxy对外服务的端口
        factory.setVirtualHost("/");    //虚拟机 默认/
        factory.setUsername("guest"); //用户名 默认guest
        factory.setPassword("guest"); //密码 默认guest

        //3.创建连接 Connection
        Connection connection = factory.newConnection();

        //4.创建 Channel
        Channel channel = connection.createChannel();

        //5.创建队列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * 参数:
         * 1.queue:队列名称
         * 2.durable:是否持久化,当目前重启后,还在
         * 3.exclusive:
         *      是否独占,即只有一个消费者监听端口
         *      当Connection关闭时,是否删除队列
         * 4.autoDelete:是否自动删除,在没有Consumer时自动删除队列
         * 5.arguments:参数
         */
        //若没有同名的队列,则会创建,否则不会创建
        channel.queueDeclare("queue_haproxy",true,false,false,null);

        //6.发送消息
        String body = "hello haproxy~";
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数:
         * 1.exchange:交换机名称。简单模式下实参使用""表示使用默认的交换机AMQP
         * 2.routingKey:路由的名称
         * 3.props:配置信息
         * 4.body:发送的消息数据
         */
                            //使用默认的交换机时,routingKey需要和队列的名称一样才能路由到对应的队列
        channel.basicPublish("","queue_haproxy",null,body.getBytes());

        //7.释放资源
        channel.close();
        connection.close();
    }
}

运行后,查看RabbitMQManagement:
可以看到成功创建了队列,并发送了一条消息
在这里插入图片描述
在这里插入图片描述

这时我们挂掉一个节点,:
在这里插入图片描述
再运行一下上面的代码后:
可以看到正常存活的节点上的队列收到了消息
在这里插入图片描述
再重启被挂掉的节点:
在这里插入图片描述
去控制台查看:
同步了数据
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-09 11:50:45  更:2021-09-09 11:51:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 14:55:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码