IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> ZooKeeper-B站黑马程序员多多三连 -> 正文阅读

[大数据]ZooKeeper-B站黑马程序员多多三连

1.zookeeper主要功能

1.1 配置管理

分布式项目的配置总管理处

?

1.2 分布式锁

对于分布式项目修改共享数据时加入锁管理(同一时间只能有一个服务对数据进更改)

1.3 集群管理

最常见的功能,作为注册中心使用.

?

2.zookeeper命令操作

2.1 zookeeper数据模型

  • Zookeeper是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构

  • 这里面的每一个结点都被称为ZNode,每个节点都会保存自己的数据和节点信息

  • 节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下

  • 节点可以分为四大类

    • PERSISTENT 持久化节点

    • EPHEMRAL 临时节点 : -e

    • PERSISTENT_SENQUENTIAL 持久化顺序节点 : -s

    • EPHEMRAL_SEQUENTIAL 临时顺序节点 : -es

?

2.2 zookeeper 服务端常用命令

在安装目录的bin目录下

  • 启动Zookeeper服务 : ./zkServer.sh start

  • 查看Zookeeper服务状态 : ./zkServer.sh statu

  • 停止Zookeeper服务 : ./zkServer.sh stop

  • 重启Zookeeper服务 : ./zkServer.sh restart

2.3 zookeeper 客户端常用命令

  • 连接ZooKeeper服务端

    • ./zkCli.sh -server ip:port

  • 断开客户端连接

    • quit

  • 设置节点的值

    • set /节点path value

  • 查看帮助命令

    • help

  • 删除单个节点

    • delete /节点path

  • 显示指定目录下的节点

    • ls 目录

  • 删除带有子节点的节点

    • deleteall /节点path

  • 创建节点

    • create /节点path value

  • 获取节点的值

    • get /节点path

  • 创建临时节点

    • create -e /节点path value

  • 创建顺序节点

    • create -s /节点path value

  • 查询节点详细信息

    • ls -s /节点path

  • 节点详细信息

    • czxid : 节点被创建的事务ID

    • dataversion : 数据版本号

    • ctime : 创建时间

    • aclversion : 权限版本号

    • mzxid : 最后一次被更新的事务ID

    • ephemeralOwner : 用于临时节点 ,代表临时节点的事务ID,如果为持久节点则为0

    • pzxid : 子节点列表最后一次被更新的事务ID

    • dataLength : 节点存储的数据长度

    • cversion : 子节点的版本号

    • numChildren : 当前节点的子节点数

3.Java API-Cuator

  • Curator是Apache ZooKeeper的Java客户端库

4.Java API 常用操作

  • 建立连接

  • 添加节点

  • 删除节点

  • 修改节点

  • 查询节点

  • Watch时间监听

  • 分布式锁实现

4.1 建立连接

4.1.1 方式一

        /**
         * Create a new client
         *
         * @param connectString       list of servers to connect to "127.0.0.1:2181'127.0.0.2:8182"
         * @param sessionTimeoutMs    session timeout
         * @param connectionTimeoutMs connection timeout
         * @param retryPolicy         retry policy to use
         * @return client
         */
        //重试策略1 每间隔x一共重试x次  每隔3秒连接一次一共连接10次
        RetryPolicy r = new ExponentialBackoffRetry(3000, 10);
        //第一种方式
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient("116.62.71.234:2181",
                60 * 1000,
                15 * 1000,
                r);
        //开启连接
        zookeeperClient.start();

4.1.2 方式二 : 链式编程

    //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.234:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();
  • namespace : 创建根节点itheima(为了方便,不用之后每次进行客户端操作都写/根节点path)

4.2 添加节点

4.2.1 基本创建

 public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.235:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();

        /**
         * 创建节点
         * 1.基本创建 create().forPath("")
         * 2.创建节点,带有数据 create.forPath("",data.getBytes())
         * 3.设置节点的类型  create.withMode(CreateMode.ENUM).forPath("")
         * 4.创建多级节点 create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
         */
        /**
         * 基本创建
         * 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
         */
        String path = zookeeperClient.create().forPath("/app1");
        System.out.println(path);
        //关闭连接
        zookeeperClient.close();
    }

4.2.2 创建节点带有数据

  • 如果没有节点信息,则信息默认为客户端ip

public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.235:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();
        //开启连接
        zookeeperClient.start();
        /**
         * 创建节点
         * 1.基本创建 create().forPath("")
         * 2.创建节点,带有数据 create.forPath("",data.getBytes())
         * 3.设置节点的类型  create.withMode(CreateMode.ENUM).forPath("")
         * 4.创建多级节点 create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
         */
        /**
         * 基本创建,带有数据
         * 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
         */
        String path = zookeeperClient.create().forPath("/app2","hehe".getBytes());
        System.out.println(path);
        //关闭连接
        zookeeperClient.close();
    }

4.2.3 创建节点,设置节点类型

  • 默认为持久化类型

public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.235:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();

        /**
         * 创建节点
         * 1.基本创建 create().forPath("")
         * 2.创建节点,带有数据 create.forPath("",data.getBytes())
         * 3.设置节点的类型  create.withMode(CreateMode.ENUM).forPath("")
         * 4.创建多级节点 create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
         */
        /**
         * 创建节点带有类型
         * 默认类型:持久化
         */
        String path = zookeeperClient.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","hehe".getBytes());
        System.out.println(path);


        //关闭连接  app3就会被释放
        zookeeperClient.close();
    }

4.2.4 创建多级节点

  • ZooKeeper如果父节点不存在无法创建子节点

  public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.235:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();

        /**
         * 创建节点
         * 1.基本创建 create().forPath("")
         * 2.创建节点,带有数据 create.forPath("",data.getBytes())
         * 3.设置节点的类型  create.withMode(CreateMode.ENUM).forPath("")
         * 4.创建多级节点 create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
         */
        /**
         * 创建多级节点
         * 父节点不存在无法直接创建子节点
         * creatingParentsIfNeeded
         */
        String path = zookeeperClient.create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
        System.out.println(path);


        //关闭连接  app3就会被释放
        zookeeperClient.close();
    }

4.3 查询节点

4.3.1 查询节点数据

public static void main(String[] args) throws Exception {
            //第二种连接方式:链式编程
            CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                    connectString("116.62.71.235:2181").
                    sessionTimeoutMs(60 * 1000).
                    connectionTimeoutMs(15 * 1000).
                    retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                    namespace("itheima").build();

            //开启连接
            zookeeperClient.start();

            /**
             * 查询节点
             * 1.查询数据 get : getData().forPath()
             * 2.查询子节点 ls getChildren.forPath();
             * 3.查询节点状态信息 ; ls -s getData().storingStatIn(status).forPath("/app1")
             */
            byte[] bytes = zookeeperClient.getData().forPath("/app1");
            System.out.println(new String(bytes));
            //关闭连接
            zookeeperClient.close();
        }

4.3.2 查询节点子节点

public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.235:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();

        /**
         * 查询节点
         * 1.查询数据 get : getData().forPath()
         * 2.查询子节点 ls getChildren.forPath();
         * 3.查询节点状态信息 ; ls -s getData().storingStatIn(status).forPath("/app1")
         */
        List<String> strings = zookeeperClient.getChildren().forPath("/app1");
        //关闭连接
        zookeeperClient.close();
    }

4.3.3 查询节点详细信息 2.3详见详细信息

  • 返回的结果需要封装到一个JavaBean中

 public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.235:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();

        /**
         * 查询节点
         * 1.查询数据 get : getData().forPath()
         * 2.查询子节点 ls getChildren.forPath();
         * 3.查询节点状态信息 ; ls -s getData().storingStatIn(status).forPath("/app1")
         */
        /**
         * Stat是一个Bean将返回的节点的详细信息封装到bean中
         */
        Stat status = new Stat();
        zookeeperClient.getData().storingStatIn(status).forPath("/app1");
        //关闭连接
        zookeeperClient.close();
    }

4.4 修改节点信息

4.4.1 基本修改

  public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.235:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();

        /**
         * 修改节点数据
         * 1.修改数据
         * 2.根据版本修改
         */
        zookeeperClient.setData().forPath("/app1","itcast".getBytes());
        //关闭连接
        zookeeperClient.close();
    }

4.4.2 根据版本修改节点信息(推荐)

 public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.234:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();

        /**
         * 修改节点数据
         * 1.修改数据
         * 2.根据版本修改
         */
        /**
         * 根据版本修改前需要先查出版本,查询出的版本和传入的版本号需要一致才能修改成功,目的就是让其他客户端或者其他线程不干扰我.
         */
        Stat status = new Stat();
        zookeeperClient.getData().storingStatIn(status).forPath("/app1");
        //查询出版本
        int version = status.getVersion();
        System.out.println(version);
        zookeeperClient.setData().withVersion(3).forPath("/app1","itcast".getBytes());
        //关闭连接
        zookeeperClient.close();
    }

4.5 删除节点

4.5.1 基本删除

 public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.234:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();

        /**
         * 删除节点 : delete deleteall
         * 1.删除单个节点 delete().forPath();
         * 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath("/app1");
         * 3.必须成功的删除 delete().guaranteed().forPath("/app1"); 为了防止网络抖动,本质就是重试.
         * 4.删除回调  zookeeperClient.delete().guaranteed().inBackground(
         */
        zookeeperClient.delete().forPath("/app1");
        
        //关闭连接
        zookeeperClient.close();
    }

4.5.2 带有子节点的递归删除

 public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.234:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();
        /**
         * 删除节点 : delete deleteall
         * 1.删除单个节点 delete().forPath();
         * 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath("/app1");
         * 3.必须成功的删除 delete().guaranteed().forPath("/app1"); 为了防止网络抖动,本质就是重试.
         * 4.删除回调  zookeeperClient.delete().guaranteed().inBackground(
         */

        zookeeperClient.delete().deletingChildrenIfNeeded().forPath("/app1");

        //关闭连接
        zookeeperClient.close();
    }

4.5.3 必须删除成功

  • guaranteed();

public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.234:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();
        /**
         * 删除节点 : delete deleteall
         * 1.删除单个节点 delete().forPath();
         * 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath("/app1");
         * 3.必须成功的删除 delete().guaranteed().forPath("/app1"); 为了防止网络抖动,本质就是重试.
         * 4.删除回调  zookeeperClient.delete().guaranteed().inBackground(
         */
        zookeeperClient.delete().guaranteed().forPath("/app1");

        //关闭连接
        zookeeperClient.close();
    }

4.5.4 删除回调

  • inBackground(()->{})

public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.234:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();
        /**
         * 删除节点 : delete deleteall
         * 1.删除单个节点 delete().forPath();
         * 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath("/app1");
         * 3.必须成功的删除 delete().guaranteed().forPath("/app1"); 为了防止网络抖动,本质就是重试.
         * 4.删除回调  zookeeperClient.delete().guaranteed().inBackground(
         */
        zookeeperClient.delete().guaranteed().inBackground(new BackgroundCallback(){
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("我被删除了");
                System.out.println(event);
            }
        }).forPath("/app1");

        //关闭连接
        zookeeperClient.close();
    }

5.Java API 高级操作

5.1 Watch事件监听

5.1.1 基本概念

  • ZooKeeper允许用户在指定节点上注册一些Watchr,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制时ZooKeeper实现分布式协调服务的重要特性.

  • ZooKeeper中引入了Watcher机制来实现了发布/订阅功能.能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化的时候,会通知所有订阅者.

  • ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐.

  • Curator引入了Cache来实现对ZooKeeper服务端事件的监听.

  • ZooKeeper提供了三种Watcher;

    • NodeCache : 只是监听了某一特定节点

    • PathChildrenCache : 监控一个ZNode的子节点

    • TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache组合.

5.1.2 NodeCache

   public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.234:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();

        /**
         * NodeCache : 给指定一个节点注册监听器
         */
        //1.创建NodeCache对象
        NodeCache nodeCache = new NodeCache(zookeeperClient, "/app1");
        //2.注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点变化了");
                //获取修改节点后的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        //3.开启监听,如果设置为true,则开启监听时,加载缓存数据
        nodeCache.start();;

        //while (true){} //保证线程不结束 , web环境不需要加,这里是主方法演示
       
       //关闭连接
        zookeeperClient.close();
    }

5.1.3 PathChildrenCache

  • 参数event可以获得修改的详细信息

 public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.234:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();


        /**
         * PathChildrenCache : 监听某个节点的字节点
         */
        //1.创建PathChildrenCache对象
        PathChildrenCache p = new PathChildrenCache(zookeeperClient,"/app2",true);
        //2.注册监听
        p.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("子节点发送变化了");
                System.out.println(event);
                //监听子节点的数据变更,并且拿到变更后的数据
                //1.获取类型
                PathChildrenCacheEvent.Type type = event.getType();
                //2.判断类型是否为update
                if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    System.out.println("数据变了");
                    byte[] data = event.getData().getData();
                    System.out.println(new String(data));
                }
            }
        });
        //3.开启监听,如果设置为true,则开启监听时,加载缓存数据
        p.start();

        //while (true){} //保证线程不结束 , web环境不需要加,这里是主方法演示
        //关闭连接
        zookeeperClient.close();
    }

5.1.4 TreeCache

public static void main(String[] args) throws Exception {
        //第二种连接方式:链式编程
        CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
                connectString("116.62.71.234:2181").
                sessionTimeoutMs(60 * 1000).
                connectionTimeoutMs(15 * 1000).
                retryPolicy(new ExponentialBackoffRetry(3000, 10)).
                namespace("itheima").build();

        //开启连接
        zookeeperClient.start();


        /**
         * TreeCache : 监听某个节点及其子节点
         */
        //1.创建NodeCache对象
        TreeCache treeCache = new TreeCache(zookeeperClient, "/app1");
        //2.注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("结点变化了");
                System.out.println(event);
            }
        });
        //3.开启监听,如果设置为true,则开启监听时,加载缓存数据
        treeCache.start();
        //关闭连接
        zookeeperClient.close();

        //while (true){} //保证线程不结束 , web环境不需要加,这里是主方法演示

    }

6.分布式锁

6.1 分布式锁

  • 在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者lock的方式来解决多线程见的代码同步问题,这时多线程的运行都是运行在同一 JVM下,没有任何问题

  • 但当我们的应用时分布式集群工作的情况下,属于JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题

  • 那么久需要一种更加高级的锁机制,来处理跨机器的进程之间的数据同步问题---这就是分布式锁.

6.2 分布式锁原理

  • 核心思想 : 当客户端要获取锁,则创建节点,使用完锁则删除该节点

    • 客户端获取锁时,在lock节点下创建临时顺序节点

    • 然后分别获取lock下面的所有子节点,客户端获取到所有子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁.

    • 如果发现自己创建的节点并非lock所有子节点中最小的,说明还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件.

    • 如果发现比自己小得那个节点被删除,则客户端的Watcher会受到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取比自己小的一个节点并注册监听.

6.3 模拟12306售票案例

  • 在Curator中有五种锁方案

    • InterProcessSemaphoreMutex : 分布式排它锁 (非可重入锁)

    • InterProcessMutex : 分布式可重入排它锁

    • InterProcessReadWriteLock : 分布式读写锁

    • InterProcessMultiLock : 将多个锁作为单个实体管理的容器

    • InterProcessSemaphoreV2 : 共享信号量

?Runnable

public class Ticket12306 implements Runnable {
    private int tickets = 10;


    //第二种连接方式:链式编程
   private static CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
            connectString("116.62.71.235:2181").
            sessionTimeoutMs(60 * 1000).
            connectionTimeoutMs(15 * 1000).
            retryPolicy(new ExponentialBackoffRetry(3000, 10)).
            build();


    private static InterProcessMutex lock = new InterProcessMutex(zookeeperClient,"/lock") ;


    @Override
    public void run() {
        while (true){
            //获取锁
            try {
                lock.acquire(3, TimeUnit.SECONDS);
                if(tickets > 0){
                    System.out.println(Thread.currentThread().getName()+":"+tickets);
                    Thread.sleep(100);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }


        }
    }
}

Main

public class Lock12306 {
    public static void main(String[] args) {
        Ticket12306 t = new Ticket12306();
        Thread t1 = new Thread(t,"携程");
        Thread t2 = new Thread(t,"飞猪");
        t1.start();
        t2.start();
    }
}

7 ZooKeeper集群

7.1 集群简介

  • Leader选举 :

    • Serverid ; 服务器ID

      • 比如有三台机器,编号为1,2,3.编号越大在选择算法中权重越大.

    • Zxid : 数据ID

      • 服务器中存放的最大数据ID,值越大说明数据越新,在选举算法中数据越新则权重越大

    • 在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了

7.2 Zookeer集群搭建

  • 与创建单机环境类似

  • 修改conf目录下的zoo_sample.cfg为zoo.cfg

  • 修改data目录为指定目录,每个集群成员分别配置

  • 在data目录下创建myid文件,内容分别为1,2,3

  • 分别在conf目录下的zoo.cfg文件加入如下内容

server.1=192.168.149.135:2881:3881
server.2=192.168.149.135:2882:3882
server.3=192.168.149.135:2883:3883
  • 解释:server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口,搭建伪集群,端口可以写成127.0.0.1

7.3 Zookeeper集群角色

  • Leader领导者

    • 处理事务请求(增删改)

    • 集群内部各个服务器的调度者

  • Follower跟随者

    • 处理客户端非事务请求(查),转发事务请求给Leader服务器.

    • 参数Leader选举投票

  • Observer观察者

    • 处理客户端非事务请求,转发事务请求给Leader服务器.

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-18 17:27:40  更:2021-10-18 17:29:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 6:25:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码