IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Zookeeper -> 正文阅读

[大数据]Zookeeper

目录

1、Zookeeper 概念

?2、Zookeeper命令操作

1、Zookeeper 数据模型

?2、Zookeeper 服务端常用命令

3、Zookeeper 客户端常用命令

?3、JavaAPI操作

1、Curator API?常用操作

? 2、Watch事件监听

3、分布式锁

?分布式锁案例 – 模拟12306售票

?4、搭建集群


???????

1、Zookeeper 概念

Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务。

Zookeeper 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)Pig(小 猪)的管理员。简称zk

Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。

Zookeeper 提供的主要功能包括:

  • 配置管理
  • 分布式
  • 集群管理

?2、Zookeeper命令操作

1、Zookeeper 数据模型

  • ZooKeeper 一个树形目录服务,其数据模型Unix的文件系统目录树很类似,拥有一个层次化结构。
  • 这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息
  • 节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
  • 节点可以分为四大类:
PERSISTENT 持久化节点
EPHEMERAL 临时节点 -e(会话断开,自动删除)
PERSISTENT_SEQUENTIAL 持久化顺序节点 -s
EPHEMERAL_SEQUENTIAL 临时顺序节点 ?: - es

?2、Zookeeper 服务端常用命令

?

3、Zookeeper 客户端常用命令

?或

?

?

?

?

?

?

?3、JavaAPI操作

1、Curator API?常用操作

  • Curator Apache ZooKeeper Java客户端库
  • 常见ZooKeeper Java API
? 原生 Java API
? ZkClient
? Curator
  • Curator 项目的目标是简化 ZooKeeper 客户端的使用
  • Curator 最初是 Netfix 研发,后来捐献了 Apache 基金会,目前是 Apache 顶级项目。
  • 网:http://curator.apache.org/

?

package com.zk.test;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class CuratorTest {

    CuratorFramework client;
    //建立连接
    @Before
    public void testConnect(){
        //第一种方式
        /*
        * zk地址和端口
        * 会话超时时间 默认 60*1000
        * 连接超时时间 默认 15*1000
        * 重试策略
        * */
        RetryPolicy rp = new ExponentialBackoffRetry(3000,10);
//        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.126.129:2181", rp);
//        //开启连接
//        client.start();

        //第二种方式
        client = CuratorFrameworkFactory.builder()
                .connectString("192.168.126.129:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)//namespace会让app1作为根目录
                .retryPolicy(rp).namespace("app1").build();

        //开启连接
        client.start();
    }
    /*=================创建结点=======================*/
    @Test
    public void testCreate() throws Exception {
        //如果没有指定数据,客户端ip会作为数据
        String s = client.create().forPath("/app2","abc".getBytes());
        System.out.println(s);
    }

    @Test
    public void testCreate2() throws Exception {
        //指定类型 默认:持久化
        String s = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","abc".getBytes());
        System.out.println(s);

        //延迟会话执行结束
        int i=0;
        while(i<20){
            Thread.sleep(1000);
            i++;
            System.out.println(i);
        }
    }

    @Test
    public void testCreate3() throws Exception {
        //创建多级结点
        //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
        String s = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
        System.out.println(s);
    }
/*=================查询结点=======================*/

    //查询数据
    @Test
    public void get() throws Exception {
        byte[] bytes = client.getData().forPath("/app2");
        System.out.println(new String(bytes));
    }

    //查询结点
    @Test
    public void get2() throws Exception {
        List<String> paths = client.getChildren().forPath("/app4");
        System.out.println(paths);
    }

    //状态信息
    @Test
    public void get3() throws Exception {
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/app4");
        System.out.println(stat);
    }

    /*=================修改结点=======================*/
    //修改数据
    @Test
    public void set() throws Exception {
        client.setData().forPath("/app2","aaa".getBytes());
    }

    //根据版本修改数据
    @Test
    public void set2() throws Exception {
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/app2");

        int version = stat.getVersion();
        System.out.println(version);
        client.setData().withVersion(version).forPath("/app2","bbb".getBytes());
    }

    /*=================删除结点=======================*/
    //删除单个结点
    @Test
    public void delete() throws Exception {
        client.delete().forPath("/app2");
    }

    //删除带子结点的结点
    @Test
    public void delete2() throws Exception {
        client.delete().deletingChildrenIfNeeded().forPath("/app4");
    }

    //必须删除
    @Test
    public void delete3() throws Exception {
        client.delete().guaranteed().forPath("/app4");
    }

    //回调
    @Test
    public void delete4() throws Exception {
        client.delete().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                System.out.println("已删除");
                System.out.println(curatorEvent);
            }
        }).forPath("/app3");
    }

    @After
    public void after(){
        if(client != null){
            client = null;
        }
    }
}

? 2、Watch事件监听

  • ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制ZooKeeper 实现分布式协调服务的重要特性。
  • ZooKeeper 引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者
  • Curator引入Cache 实现ZooKeeper 服务端事件的监听
  • ZooKeeper提供了三Watcher
? NodeCache : 只是监听某一个特定的 节点
? PathChildrenCache : 监控一个 ZNode 的子节点 .
? TreeCache : 可以监控整个树上的所有节点, 类似于 PathChildrenCache NodeCache 的组合

@Test
    public void testNodeCache() throws Exception {
        //创建NodeCache对象
        final NodeCache nc = new NodeCache(client,"/app1");
        //注册监听
        nc.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("结点变化了。。。");
                //获取修改结点后的数据
                byte[] data = nc.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        //开启监听
        nc.start(true);

        while (true){

        }
    }

    @Test
    public void testPathChildrenCache() throws Exception {
        final PathChildrenCache pcc = new PathChildrenCache(client,"/app1",true);

        pcc.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("子结点变化了。。。");
                System.out.println(pathChildrenCacheEvent);

                //监听子结点数据变更
                //获取类型
                PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
                //判断是否为update
                if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    System.out.println("数据变了");
                    byte[] data = pathChildrenCacheEvent.getData().getData();
                    System.out.println(new String(data));
                }
            }
        });
        pcc.start();

        while (true){

        }
    }

    @Test
    public void test() throws Exception {
        TreeCache tc = new TreeCache(client,"/app");
        tc.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println("结点变化了。。。");
                System.out.println(treeCacheEvent);
            }
        });

        tc.start();

        while(true){

        }
    }

3、分布式锁

  • 在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。
  • 当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题
  • 那么就需要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。

?

原理:

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除 节点。
  1. 客户端获取锁时,在lock节点下创建临时顺序节点。
  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
  3. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
  4. 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

?

Curator实现分布式锁API?

Curator 中有五种锁 方案:
  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁
  • InterProcessMutex分布式可重入排它锁
  • InterProcessReadWriteLock分布式读写锁
  • InterProcessMultiLock将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2共享信号量

?分布式锁案例 模拟12306售票 ???????

?

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class Ticket implements Runnable {

    private int tickets = 100;
    InterProcessMutex lock;

    public Ticket(){
        RetryPolicy rp = new ExponentialBackoffRetry(3000,10);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.126.129:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(rp).build();

        client.start();
        lock = new InterProcessMutex(client,"/lock");
    }

    @Override
    public void run() {
        while(true){
            try {
                //获取锁
                lock.acquire(2, TimeUnit.SECONDS);
                if(tickets>0){
                    System.out.println(Thread.currentThread().getName()+" 卖出第 "+tickets--+" 张票");
                    Thread.sleep(500);
                }else{
                    break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //释放锁
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

?

?4、搭建集群

Leader选举:

  • Serverid:服务器ID

?比如有三台服务器,编号分别是1,2,3

编号越大在选择算法中的权重越大

  • Zxid:数据ID

? 服务器中存放的最大数据ID.值越大说明数据? 越,在选举算法中数据越新权重越大

  • Leader选举的过程中,如果某台ZooKeeper获得超过半数的选票ZooKeeper就可以成为Leader了。

?

?搭建zookeeper集群:https://blog.csdn.net/weixin_56697114/article/details/119057344

?

ZooKeeper集群服中务中有三个角色

  • Leader 领导者 :? ????????

1. 处理事务请求

2. 集群内部各服务器的调度者

  • Follower 跟随者 :

1. 处理客户端非事务请求,转发事务请求给Leader服务器

2. 参与Leader选举投票

  • Observer 观察者

1. 处理客户端非事务请求,转发事务请求给Leader服务器

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-25 11:45:16  更:2021-07-25 11:46:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/30 21:43:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码