IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Zookeeper学习笔记 -> 正文阅读

[大数据]Zookeeper学习笔记

视频资源:

  1. 【尚硅谷】2021最新版Zookeeper 3.5.7版本教程
  2. 【编程不良人】2021最新ZooKeeper教程

0. Dubbo与Zookeeper的关系

参考:zookeeper、dubbo、kafka简单了解

Dubbo建议使用Zookeeper作为服务的注册中心。

Dbbo是一个框架,用于服务间的调度,服务程序编写使用dubbo做接口,dubbo实现了服务与服务之间还有zookeeper之间的通讯。

Zookeeper的作用:

zookeeper用来注册服务进行负载均衡,哪一个服务由哪一个机器来提供必需让调用者知道,简单来说就是ip地址和服务名称的对应关系。当然也可以 通过硬编码的方式把这种对应关系放到调用方业务代码中实现,但是如果提供服务的机器宕机,调用者将无法知晓,此时如果不更改代码会继续请求挂掉的机器提供服务。 zookeeper通过心跳机制可以检测挂掉的机器并将挂掉机器的ip和服务对应关系从列表中删除。至于支持高并发,简单来说就是横向扩展,在不更改代码的情况下通过添加机器来提高运算能力。通过添加新的机器向zookeeper注册服务,服务的提供者多了能服务的客户就多了,于是便能支持更大的并发量。

dubbo的作用:

管理中间层的工具,在业务层到数据仓库间有非常多服务的接入和服务提供者需要调度,dubbo提供一个框架解决这个问题。注意这里的dubbo只是一个框架,至于你架子上放什么是完全取决于你的,就像一个汽车骨架,你需要配你的轮子引擎。这个框架中要完成调度必须要有一个分布式的注册中心,储存所有服务的元数据,你可以用zk,也可以用别的,只是大家都用zk。

1. Zookeeper基础

  • Dubbo框架、SpringClou的框架,zk是作为注册中心
  • Hadoop、HBase组件,集群架构,zk是作为集群管理者
  • zk实现分布式锁
  • 是一个分布式应用程序的协调服务,很少做业务实现

1. 1 概述

Zookeeper的工作机制:

Zookeeper从设计模式的角度理解:是一个基于观察者模式设计的分布式服务管理框架。它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出响应的反应。

Zookeeper = 文件系统 + 通知机制

文件系统:服务端启动时去注册信息(创建的都是临时节点),Zookeeper集群存储的是各种服务器的上线信息

通知机制:客户端获取到当前服务器列表,并且注册监听;Zookeeper集群通知客户端(服务端上下线事件通知)

1.2 Zookeeper特点

cJ3RvZyQLEjNK1q

  1. Zookeeper:一个领导者(leader),多个跟随者(follower)组成的集群
  2. 集群中只要有半数以上节点(半数机制)存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器?(假如现在有5台服务器,挂了2台,集群依然能正常工作,但如果挂了3台就因为违背“半数以上节点存活”的规则而无法正常工作;假如现在有6台服务器,挂了2台依然能正常工作,但是挂了3台,因为3=(6/2),所以同样也不能正常工作。所以,5台和6台在提升集群健壮性上没有任何帮助,存在资源浪费)
  3. 全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个server,数据都是一致的
  4. 更新请求顺序执行,来自同一个client的更新请求按其发送顺序依次执行
  5. 数据更新原子性,一次数据更新,要么成功要么失败。(比如,每次写操作都有事务id(zxid)
  6. 实时性,在一定时间范围内,client能读到最新数据(比如一个client向一个服务器写入数据,那其他client要拿到/读到这个数据,需要服务器之间同步数据,zookeeper的同步速度非常快)

1.3 内存/数据结构

之前提到过,Zookeeper = 文件系统 + 通知机制,Zookeeper数据模型的结构与Unix文件系统很相似,整体上可以看作是一棵树,每个节点称作是一个ZNode。每一个ZNode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识

bgqiLrzMSRJxYB5

模型特点

  • 每个子目录如/znode1都被称作为一个znode(节点)。这个znode是被它所在的路径唯一标识
  • znode可以有子节点目录,并且每个znode可以存储数据
  • znode是有版本的,每个znode中存储的数据可以有多个版本,也就是一个访问路径中可以存储多分数据(如一个节点中的数据,每次对其修改,可对版本号递增1)
  • znode可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化,一旦变化可以通知设置监控的客户端(如可以使用Java客户端去监控一个节点,如果修改,则zk会通知客户端)

问:能否用Zookeeper存储海量的数据?

答:不行,它只适合存储简单的配置信息。

1.4 节点的分类

1.4.1 持久节点(persistent)P

是指节点创建后,就一直存在,直到有删除操作来删除这个节点——不会因为创建该节点的客户端会话失效而消失。该节点会存储到服务器的磁盘上,无论宕机与否,都不会消失

1.4.2 持久顺序节点(persistent_sequential)PS

这类节点的基本特性和上述节点类型一致。额外的特性是,在ZK中,每个父节点会为它的第一级子节点维护一份时序,会记录每一个子节点的创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动给节点名字加上一个数字后缀,作为新的节点名,这个数字后缀的范围是整型的最大值

1.4.3 临时节点(ephemeral)E

和持久节点不同,临时节点的生命周期和客户端会话绑定,也就是说,如果客户端会话失效,那么这个临时节点也将失效(被自动清除掉)。注意,这里提到的是会话失效,而非断开连接。另外,临时节点下面不能再创建子节点。

1.4.4. 临时顺序节点(ephemeral_sequential)ES

具备临时节点和顺序节点的特性。

1.5 应用场景

提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡(从软件层面而非硬件层面,硬件非常贵,但是性能好)等

1.5.1 统一命名服务

在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。例如ip地址不容易记住,而域名容易记住

kxQY8hofRza6sD1

当client对一个域名访问的时候,会自动根据负载情况,去指定访问特定服务器,而不用client自己设定。Nginx等很多框架也同样能实现这个事情

1.5.2 统一配置管理

  1. 分布式环境下,配置文件同步非常常见
    1. 一般要求一个集群中,所有节点的配置信息是一致的,比如Kafka集群或Hadoop集群
    2. 对配置文件修改后,希望能快速同步到各个节点上
  2. 配置管理可交给Zookeeper实现
    1. 可将配置信息写入Zookeeper上的一个ZNode
    2. 各个客户端服务监听这个ZNode
    3. 一旦这个ZNode中的数据发生修改,Zookeeper就通知所有监听这个ZNode的客户端

bHT2DkrKeFJzRUc

1.5.3 统一集群管理

  1. 分布式环境中,实时掌控每个节点的状态是必要的
    1. 可根据节点的状态实时做出一些调整
  2. Zookeeper可以实现实时监控节点状态变化
    1. 可将节点信息写入Zookeeper上的一个ZNode
    2. 监听这个ZNode可以获取它的实时状态变化

sSFVtDfm7UL3rje

假如现在有一个Zookeeper集群,在/GroupManager下面有多个/clientX,每个客户端可以在/clientX下注册,/clientX中就可以存放这个客户端的相关运行信息(如服务几点上线、上线时运行状态如何)。而此时,别人(Zookeeper或者其他节点)监听这个节点,就能获取到它的实时状态变化,从而做出相应策略(如运维就可以根据服务器运行好坏做出策略)

1.5.4 服务器动态上下线

客户端能实时洞察到服务器上下线变化

fSQ7Dw9cigmCphn

1.5.5 软负载均衡

在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求

当然其他框架也能做负载均衡,如Nginx

MHhd2bZSyq17nzC

1.6 选举机制(面试重点)

Zookeeper的选举机制较为复杂,分为第一次启动非第一次启动(第一次选举出来的Leader挂了,二次选举…三次选举…)。

第一次启动

ks64oH9NnlwAKWv

假设一个Zookeeper集群中有5台服务器,注意这里一开始就有5台,即想要成为leader必须有超过半数以上,即至少有3票。每一台集群中的服务器都有一个自己的唯一id,myid

  1. 服务器1启动,发起一次选举。每台服务器一开始都默认投给自己,服务器1投自己一票,接着需要判断自己手里的选票是否超过总集群数的一半以上,1票不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING,既不是Leader也不是Follower
  2. 服务器2启动,所有启动的服务器再发起一次选举。服务器1服务器2分别投自己一票并交换选票信息:此时服务器1发现服务器2myid比自己投票推举的(服务器1myid=1)大,更改选票为推举服务器2。此时,服务器1:0票,服务器2:2票,判断后发现仍没有半数以上的结果,选举无法完成,服务器1、2保持LOOKING状态
  3. 服务器3启动,所有启动的服务器再发起一次选举,并且都投自己,然后交换选票信息后都改投服务器3。此次投票结果:服务器1:0票,服务器2:0票,服务器3:3票,判断后发现服务器3的投票已经达到3票,满足条件,服务器3当选Leader。服务器1、2更改状态为FOLLOWING服务器3更改状态为LEADING
  4. 服务器4启动,再发起一次选举。此时服务器1、2、3已经不是LOOKING状态,不会更改选票信息。选举结果:服务器3:3票,服务器4:1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING
  5. 服务器5启动,与服务器4一样,状态改为了FOLLOWING

这里有几个概念:

SID:服务器ID。用来唯一标识一台Zookeeper集群中的机器,每一台机器不能重复,和myid一致

ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的更新。在某一时刻,集群中的每一台服务器的ZXID值不一定完全一致,这和Zookeeper服务器对于客户端“更新请求”的处理逻辑有关

Epoch:每个Leader任期的代号。没有Leader时,同一轮投票过程中的逻辑时钟值是相同的,每投完一次票这个值会增加

非第一次选举

D7WfQ4sYrdCAx8J

  1. 当Zookeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举

    1. 服务器初始化时(就是前面的启动时选举
    2. 服务器运行期间无法和Leader保持连接(比如服务器5在某一时刻与leader服务器无法保持连接,它可能认为leader服务器挂了,于是会开始进入leader选举,如下)
  2. 当一台机器进入leader选举流程时,当前集群也可能存在以下两种状态:

    1. 集群中本来就有一个leader(只是进入选举的服务器没有连接上)。此时该机器试图去选举Leader时,会被告知当前Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可

    2. 集群中确实不存在leader。

      假设:Zookeeper中有5台服务器,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,SID为3的服务器为leader,某一时刻,SID为3、5的服务器出现故障,因此开始选举Leader。选举的原则,关注Epoch, ZXID, SID

      M4V8aOszbvyqEQ9

      所以SID为2的服务器会被选举为新的Leader

1.7 Zookeeper安装

1.7.1 本地模式安装

在Apache官网下载,安装,到/conf下将zoo_sample.cfg修改为zoo.cfg并修改其中dataDir

1.7.2 配置参数解读

Zookeeper中的的配置文件zoo.cfg中参数解读如下:

  1. tickTime=2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位是毫秒。注意不仅服务器与客户端之间有,服务器与服务器之间也有

    Rqx3lK1QVYOm5dS

  2. initLimit=10:LF初始通信时限("10"代表10个心跳,总时间 = 10 * 2s = 20s)

    vWyK7xzrkuIZPlA

    Leader和Follower初始连接时,能容忍的最多心跳数(tickTime的数量)

  3. syncLimit=5:LF同步通信时限(“5”代表5个心跳,总时间= 5 * 2s = 10s)

    Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follower死掉,从服务器列表删除Follower

  4. dataDir:保存Zookeeper中的数据

    注意:默认的/tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录,建议修改至zk包下

  5. clientPort=2181:客户端连接端口,通常不做修改

  6. maxClientCnxns=60:最大客户连接数,即线程池线程数量。

  7. autopurge.snapRetainCount=3:The number of snapshots to retain in dataDir,zk中的树形结构是存储在内存中,这也是为什么zk可以接受大规模的并发。这里快照就是用来做一个持久化的存储

3. Zookeeper集群与操作

3.1 集群(cluster)

定义:集合同一种软件服务的多个节点同时提供服务

解决的问题:

  1. 单节点并发访问的压力大
  2. 单节点故障问题(如硬件老化、自然灾害等)

Zookeeper中的集群:

ZooKeeper

关于为什么客户端可以对任意zk节点进行读写并且zk能保证数据一致,请参考第七章节的ZAB协议(消息广播、崩溃恢复)

3.1.1 搭建集群

  1. MAC VMwave安装多个Linux(centos)虚拟机教程,教程里演示了如何在VMware Fusion中搭建多个centos虚拟机并进行简单的网络配置(可以ping得同),注意其中涉及到clone机器的操作需要Fusion Pro版本。然后可以用SecureCRT来通过ssh连接每一台虚拟机(这里如果是把虚拟机搭建在本地的话必须每次都启动每一台机器再用SecureCRT来ssh)
  2. Linux环境搭建(基础、网络配置、集群、hadoop前置课程)关闭防火墙,时间同步,集群搭建,JDK,ssh免密登录,权限管理…

3.2 客户端命令行操作

3.2.1 监听器原理

监听器原理详解

  1. 首先要有一个main()线程
  2. main()线程中创建Zookeeper客户端,这时就会创建两个线程,一个负责网络连接通信(connect),一个负责监听(listener)
  3. 通过connect线程将注册的监听事件发送给Zookeeper服务端
  4. 在Zookeeper服务端的注册监听器列表中将注册的监听事件添加到列表中
  5. Zookeeper服务端监听到有数据或路径变化,就会将这个消息发送给listener线程
  6. listener线程内部调用process()方法

qlSRZwAiOKoh1Yj

常见的监听

  1. 监听节点数据的变化,注意:数据的变化,注册一次,只能监听一次;想要再次监听,就需要再次注册

    get path [watch]

  2. 监听子节点增删的变化,注意:路径的变化,注册一次,只能监听一次;想要再次监听,就需要再次注册

    ls path [watch]

3.3 客户端API操作

3.3.1 创建maven项目

3.3.2 添加maven依赖

		<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.7</version>
        </dependency>
    </dependencies>

3.3.3 创建客户端

package com.zzw.zk;

import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

public class zkClient {

    private String connectString = "192.168.3.11:2181,192.168.3.22:2181,192.168.3.33:2181"; // 注意:这里的不同ip之间不能有空格
    private Integer sessionTimeout = 2000;
    private ZooKeeper zkClient;

    // 初始化(连接一个Server或一个Server集群)
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

    // 创建一个节点
    @Test
    public void create() throws KeeperException, InterruptedException {
        String nodeCreated = zkClient.create("/zzw", "yomi.txt".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}

3.3.4 获取子节点并监听

package com.zzw.zk;

import org.apache.zookeeper.*;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;

public class zkClient {

    private String connectString = "192.168.3.11:2181,192.168.3.22:2181,192.168.3.33:2181"; // 注意:这里的不同ip之间不能有空格
    private Integer sessionTimeout = 2000;
    private ZooKeeper zkClient;

    // 初始化(连接一个Server或一个Server集群)
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("---------------------------");
                List<String> children = null;
                try {
                    children = zkClient.getChildren("/", true);
                    for (String child : children) {
                        System.out.println(child);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    // 创建一个节点
    @Test
    public void create() throws KeeperException, InterruptedException {
        String nodeCreated = zkClient.create("/zzw", "yomi.txt".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    @Test
    public void getChidren() throws KeeperException, InterruptedException {
        // getChildren的第二个参数设置为true,就会使用zkClient在创建的时候传入的watcher监听器
				// 每次发生变化,就会调用监听器的process方法(已经在上面重写)
        List<String> children = zkClient.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }

        Thread.sleep(Long.MAX_VALUE);
    }
}

3.3.5 判断ZNode是否存在

exists(String path, Boolean watcher), return Stat

3.4 客户端向服务端写数据

服务端有两种:1. leader 2. follower

3.4.1 写入请求直接发给Leader节点

d3KUe2Tt5rJhmxq

  1. Client发送写请求给Leader节点
  2. Leader自己进行写操作,同时将这个写请求通知给他的所有follower节点
  3. follower节点写完后会通知Leader节点
  4. 如果超过半数以上的服务器都完成了写请求(与集群的选举一样),就会发送确认通知给Client,告知写请求已完成
  5. 如果还有没有通知的follower的节点,Leader节点会继续将写请求通知给他们
  6. 剩余的follower节点完成后通知Leader节点

这样的设计的好处是效率高,只要有半数的节点完成写请求,就会被当作整个集群完成了写请求

3.4.2 写入请求直接发给Follower节点

ksNbgwo7xLqShfU

  1. Client发送写请求给Follower节点
  2. Follower节点将写请求转发给Leader节点
  3. Leader自己进行写操作,同时将这个写请求通知给他的所有follower节点
  4. follower节点写完后会通知Leader节点
  5. 如果超过半数以上的服务器都完成了写请求,Leader节点会通知一开始接收到写请求的Follower节点
  6. 然后该Follower节点会发送确认通知给Client,告知写请求已完成。(因为一开始的连接是建立在Client和Follower节点服务器之间)
  7. 如果还有没有通知的follower的节点,Leader节点会继续将写请求通知给他们
  8. 剩余的follower节点完成后通知Leader节点

4. 服务器动态上下线监听案例

该章节就是对应1.5中的第四个Zookeeper应用场景

服务器上线的过程对于Zookeeper集群来说就是Zookeeper创建目录节点的过程

4.1 需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线

4.2 需求分析

aMRNVXiEB65Uywr

注意这里有一个概念需要明确:在Zookeeper集群中,服务器和客户端对于Zookeeper来说都是客户端,区别是服务器在Zookeeper中是作一个创建目录的操作(create),而客户端是监听一个节点的操作(get)

4.3 具体实现

  1. 现在集群上创建/servers节点

    [zk: localhost:2181(CONNECTED) 39] create /servers "servers"
    Created /servers
    
  2. 服务器注册

    package com.zzw.zk.case1;
    
    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    
    public class DistributeServer {
    
        private String connectString = "192.168.3.11:2181,192.168.3.22:2181,192.168.3.33:2181"; // 注意:这里的不同ip之间不能有空格
        private Integer sessionTimeout = 2000;
        private ZooKeeper zk;
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            DistributeServer server = new DistributeServer();
            // 1. 获取zk连接
            server.getConnection();
    
            // 2. 注册服务器到zk集群(创建目录)
            server.register(args[0]);
    
            // 3. 启动业务逻辑(Thread.sleep)
            server.business();
    
        }
    
        private void business() throws InterruptedException {
            Thread.sleep(Long.MAX_VALUE);
        }
    
        private void register(String hostname) throws KeeperException, InterruptedException {
            zk.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
            System.out.println(hostname + " is online");
        }
    
        private void getConnection() throws IOException {
            zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
    
                }
            });
        }
    
    }
    
  3. 客户端监听

    package com.zzw.zk.case1;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    public class DistributeClient {
    
        private String connectString = "192.168.3.11:2181,192.168.3.22:2181,192.168.3.33:2181"; // 注意:这里的不同ip之间不能有空格
        private int sessionTimeout = 2000;
        private ZooKeeper zk;
    
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            DistributeClient client = new DistributeClient();
            // 1. 获取zk连接
            client.getConnection();
    
            // 2. 监听/servers下面子节点的增加和删除
            client.getServerList();
    
            // 3. 业务逻辑(睡觉)
            client.business();
    
        }
    
        private void business() throws InterruptedException {
            Thread.sleep(Long.MAX_VALUE);
        }
    
        private void getServerList() throws KeeperException, InterruptedException {
    
            // 注册监听,注意这里只注册监听了一次,需要在监听器中的process方法中重写
            List<String> children = zk.getChildren("/servers", true);
    
            ArrayList<String> servers = new ArrayList<>();
    
            for (String child : children) {
                byte[] data = zk.getData("/servers/" + child, false, null);
                servers.add(new String(data));
            }
    
            System.out.println(servers);
    
        }
    
        private void getConnection() throws IOException {
            zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    // 这里需要再执行getServerList(),否则原方法的监听只注册了一次,只会执行一次
                    try {
                        getServerList();
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
    }
    
    

4.4 实际应用

在Dubbo框架或SpringCloud框架中,很多时候使用Zookeeper作为服务的注册中心,其原理就是利用了zk的监听机制

z4nwSZQE8cxqFf7

  1. 首先AService在上线时,会通过客户端与zk建立连接,并在zk中创建一个AService临时节点
  2. 然后BService在上线时,会通过客户端与zk建立连接,也在zk中创建一个BService的临时节点,根据业务需要,AService需要提供服务,而BService会消费服务,所以BService需要对AService创建的临时节点进行监听
  3. 此时zk中存在两个永久节点,比如ANodeBNode,这两个节点下各自都存在着临时节点,即自己集群中的所有ip地址
  4. 正常情况下,某一时刻BService需要调用AService:调用的过程中,BService会带着AService服务名去注册中心获取AService服务列表(集群的所有ip地址)到本地,然后根据负载均衡的策略去选择一台机器并调用服务
  5. AService集群中有机器出现宕机情况,AService应当去注册中心删除宕机的机器的对应ip,Zookeeper通过watch机制通知到BService,其监听的ANode下路径或数据发生(zk中的监听分为对路径变化进行监听和对数据变化进行监听)变化,然后BService删除宕机机器的ip地址并不再调用

5. Zookeeper分布式锁案例

Zookeeper分布式锁可以用来分布式系统中的多个服务访问共享变量的情况

5.1 原理

X8cQvmUCOJReAyB

5.2 实现

DistributeLock

package com.zzw.zk.case2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributeLock {

    private final String connectString = "192.168.3.11:2181,192.168.3.22:2181,192.168.3.33:2181";
    private final int sessionTimeout = 2000;
    private ZooKeeper zk;

    // 补充:CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。
    // 阻塞
    private CountDownLatch connectLatch = new CountDownLatch(1);
    // 阻塞:只有当监听的事件(前一个节点删除)才会释放
    private CountDownLatch waitLatch = new CountDownLatch(1);

    private String waitPath;
    private String currentNode;

    public DistributeLock() throws IOException, InterruptedException, KeeperException {
        // 创建zk连接
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

            // 这里就是监听到只后做的事情,即前一个节点被删除(前一个锁释放,当前节点获取到锁)
            @Override
            public void process(WatchedEvent watchedEvent) {
                // connectLatch 如果连接上zk 可以释放
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // waitLatch 需要释放
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        // 为增强代码的健壮性,这里添加一个等待zk连接成功的阻塞,只有zk成功连接代码才会继续向下,该锁(并非分布式锁)才会被释放
        connectLatch.await();

        // 判断根节点/locks是否存在
        // 注意这里无需监听,因为这里本身就是构造器,只会检查一次
        Stat stat = zk.exists("/locks", false);

        // 如果不存在,创建根节点
        if (stat == null) {
            zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 对zk加锁
    public void zkLock() {
        // 创建对应的临时带序号的节点
        try {
            currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // wait一小会, 让结果更清晰一些
            Thread.sleep(10);

            // 判断创建的节点是否是带序号的最小节点:如果是,获取到锁;如果不是,监听他序号的前一个节点
            // 1. 获取所有在'/locks'下的子节点
            List<String> children = zk.getChildren("/locks", false);

            // 2. 如果children只有一个值,那就直接获取锁;如果有多个值,那就判断谁最小
            if (children.size() == 1) {
                return; // 加锁成功
            } else if (children.size() > 1) {
                Collections.sort(children); // 排序后children都是有序的

                // 获取创建的节点名称,如"seq-00000001"
                String thisNode = currentNode.substring("/locks/".length());    // 把前缀截掉,只剩后面的
                // 通过前面获取到的节点名称"seq-00000001"获取节点在list中的位置
                int index = children.indexOf(thisNode);

                if (index == -1) {
                    System.out.println("数据异常"); // 通常不可能
                } else if (index == 0) {
                    // 当前是第一个节点
                    return;
                } else {
                    // 需要监听前一个节点
                    waitPath = "/locks/" + children.get(index - 1);
                    zk.getData(waitPath, true, new Stat()); // 这里必须监听

                    // 等待监听完成,实现阻塞
                    waitLatch.await();

                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    // 对zk解锁
    public void zkUnlock() {
        // 删除节点
        try {
            zk.delete(currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}

DistributeLockTest

package com.zzw.zk.case2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributeLockTest {

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
        DistributeLock lock1 = new DistributeLock();
        DistributeLock lock2 = new DistributeLock();

        new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    lock1.zkLock();
                    System.out.println("线程1启动,获取到锁");
                    Thread.sleep(2 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                lock1.zkUnlock();
                System.out.println("线程1结束,释放锁");
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {


                try {
                    lock2.zkLock();
                    System.out.println("线程2启动,获取到锁");
                    Thread.sleep(2 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                lock2.zkUnlock();
                System.out.println("线程2结束,释放锁");
            }
        }).start();
    }
}

运行结果

线程1启动,获取到锁
线程1结束,释放锁
线程2启动,获取到锁
线程2结束,释放锁

5.3 Curator框架实现分布式锁

原生Java API实现分布式锁存在的问题:(在生产环境下,不具备解决实际问题的能力)

  1. 会话连接是异步的,需要自己处理,如使用CountDownLatch

  2. Watch需要重复注册,不然就不能生效

  3. 开发的复杂性还是比较高

  4. 不支持多点删除和创建,需要自己递归

导入依赖

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.3.0</version>
        </dependency>

CuratorLockTest

package com.zzw.zk.case3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

    public static void main(String[] args) {
        // 创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1启动,获取到锁");
                    lock1.acquire();
                    System.out.println("线程1再次获取锁"); // 可以实现重入
                    Thread.sleep(2 * 1000);
                    lock1.release();
                    System.out.println("线程1结束,释放锁");
                    lock1.release();
                    System.out.println("线程1再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2启动,获取到锁");
                    Thread.sleep(2 * 1000);
                    lock2.release();
                    System.out.println("线程2结束,释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }

    public static CuratorFramework getCuratorFramework() {

        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.11:2181,192.168.3.22:2181,192.168.3.33:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();

        // 启动客户端
        client.start();

        System.out.println("Zookeeper启动成功");

        return client;
    }
}

运行结果

线程1启动,获取到锁
线程1再次获取锁
线程1结束,释放锁
线程1再次释放锁
线程2启动,获取到锁
线程2结束,释放锁

6. 面试真题

6.1 选举机制

半数机制,超过半数的投票通过,即通过

  1. 第一次启动选举规则:

    投票过半数时,服务器id大的胜出

  2. 第二次启动选举规则:

    1. EPOCH大的直接胜出
    2. EPOCH相同,事务id大的胜出
    3. 事务id相同,服务器id大的胜出

6.2 生产集群安装多少台zk合适?

安装奇数台

生产经验(并非强制,只是最佳):

  • 10台服务器:3台zk
  • 20台服务器:5台zk
  • 100台服务器:11台zk
  • 200台服务器:11台zk

zk服务器台数多,好处:提高可靠性;坏处:提高通信延时(半数机制)

6.3 常用命令

ls get create delete

7. 算法基础

思考:Zookeeper如何保持数据一致性?这也是困扰分布式系统框架的一个难题

7.1 拜占庭将军问题

拜占庭将军问题是一个协议问题,拜占庭帝国军队的将军们必须全体一致的决定是否攻击某一支敌军。问题是这些将军在地理上是分隔开来的,并且将 军中存在叛徒。叛徒可以任意行动以达到以下目标:欺骗某些将军采取进攻行动;促成一个不是所有将军都同意的决定,如当将军们不希望进攻时促成进攻 行动;或者迷惑某些将军,使他们无法做出决定。如果叛徒达到了这些目的之一,则任何攻击行动的结果都是注定要失败的,只有完全达成一致的努力才能 获得胜利。

7.2 Paxos算法

这里只做一个简单记录,该算法可深入研究。

7.2.1 解决什么问题

Paxos算法:一种基于消息传递且具有高度容错性一致性算法

Paxos算法解决的问题:如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常,都不会破坏整个系统的一致性

常见的故障和问题:1. 机器宕机 2. 网络异常(延迟、重复、丢失)

7.2.2 算法描述

  • 在一个Paxos系统中,首先将所有节点划分为Proposer(提议者),Acceptor(接受者),和Leaner(学习者)。注意:每个节点都可以身兼数职
  • r5OKEa8iuh9nMm3
  • JVMLyWUtjrkgY8S

7.2.3 不同情况

下面我们针对上述描述做三种情况的推演举例:为了简化流程,我们这里不设置 Learner。

  1. 9YvOGtygpHjx28f

  2. bBpcHMJgy5ZSeRz

    以上情况,A1提出的议案(10%)会被执行,但是A5提出的议案(20%)也会被执行,但是时间稍后,会覆盖A1的议案

  3. rVIwlh1T7c9fUqd

    Paxos 算法缺陷:在网络复杂的情况下,一个应用 Paxos 算法的分布式系统,可能很久 无法收敛,甚至陷入活锁的情况。造成这种情况的原因是系统中有一个以上的 Proposer,多个 Proposers 相互争夺 Acceptor, 造成迟迟无法达成一致的情况。

    针对这种情况,一种改进的 Paxos 算法被提出:从系统中选 出一个节点作为 Leader,只有 Leader 能够发起提案。这样,一次 Paxos 流程中只有一个 Proposer,不会出现活锁的情况,此时只会出现例子中第一种情况。

7.3 Zab算法

Zab算法借鉴了Paxos算法,是特别为Zookeeper设计的支持崩溃恢复的原子广播协议。基于该协议,Zookeeper设计为只有一台客户端(Leader)负责处理外部的写事务请求,然后Leader客户端将数据同步到其他Follower节点。即Zookeeper只有一个节点可以发起提案。

Zab算法包括两种基本模式:消息广播崩溃恢复

消息广播

p2NjLnWDScXbBrF

崩溃恢复

twz3cjJCR6S1FvZ

Leader选举

Te3Ba24Lp6MdmZo

数据恢复

1aBbrkZoRLIE7qV

7.4 CAP理论

69bKHtdfIovgX58

zk满足C和P

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-24 15:37:49  更:2021-08-24 15:38:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:59:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码