IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 13 Zookeeper原理知识,paxos、zab、角色功能、API开发基础 -> 正文阅读

[大数据]13 Zookeeper原理知识,paxos、zab、角色功能、API开发基础

ZooKeeper 分布式协调,扩展、可靠性、时序性
在这里插入图片描述

paxos小故事

以下内容摘录自《Zookeeper全解析——Paxos作为灵魂》一文,文章地址:
https://www.douban.com/note/208430424/

Paxos描述了这样一个场景,有一个叫做Paxos的小岛(Island)上面住了一批居民,岛上面所有的事情由一些特殊的人决定,他们叫做议员(Senator)。议员的总数(Senator Count)是确定的,不能更改。岛上每次环境事务的变更都需要通过一个提议(Proposal),每个提议都有一个编号(PID),这个编号是一直增长的,不能倒退。每个提议都需要超过半数((Senator Count)/2 +1)的议员同意才能生效。每个议员只会同意大于当前编号的提议,包括已生效的和未生效的。如果议员收到小于等于当前编号的提议,他会拒绝,并告知对方:你的提议已经有人提过了。这里的当前编号是每个议员在自己记事本上面记录的编号,他不断更新这个编号。整个议会不能保证所有议员记事本上的编号总是相同的。现在议会有一个目标:保证所有的议员对于提议都能达成一致的看法。

好,现在议会开始运作,所有议员一开始记事本上面记录的编号都是0。有一个议员发了一个提议:将电费设定为1元/度。他首先看了一下记事本,嗯,当前提议编号是0,那么我的这个提议的编号就是1,于是他给所有议员发消息:1号提议,设定电费1元/度。其他议员收到消息以后查了一下记事本,哦,当前提议编号是0,这个提议可接受,于是他记录下这个提议并回复:我接受你的1号提议,同时他在记事本上记录:当前提议编号为1。发起提议的议员收到了超过半数的回复,立即给所有人发通知:1号提议生效!收到的议员会修改他的记事本,将1好提议由记录改成正式的法令,当有人问他电费为多少时,他会查看法令并告诉对方:1元/度。

现在看冲突的解决:假设总共有三个议员S1-S3,S1和S2同时发起了一个提议:1号提议,设定电费。S1想设为1元/度, S2想设为2元/度。结果S3先收到了S1的提议,于是他做了和前面同样的操作。紧接着他又收到了S2的提议,结果他一查记事本,咦,这个提议的编号小于等于我的当前编号1,于是他拒绝了这个提议:对不起,这个提议先前提过了。于是S2的提议被拒绝,S1正式发布了提议: 1号提议生效。S2向S1或者S3打听并更新了1号法令的内容,然后他可以选择继续发起2号提议。

好,我觉得Paxos的精华就这么多内容。现在让我们来对号入座,看看在ZK Server里面Paxos是如何得以贯彻实施的。

小岛(Island)——ZK Server Cluster

议员(Senator)——ZK Server

提议(Proposal)——ZNode Change(Create/Delete/SetData…)

提议编号(PID)——Zxid(ZooKeeper Transaction Id)

正式法令——所有ZNode及其数据

貌似关键的概念都能一一对应上,但是等一下,Paxos岛上的议员应该是人人平等的吧,而ZK Server好像有一个Leader的概念。没错,其实Leader的概念也应该属于Paxos范畴的。如果议员人人平等,在某种情况下会由于提议的冲突而产生一个“活锁”(所谓活锁我的理解是大家都没有死,都在动,但是一直解决不了冲突问题)。Paxos的作者Lamport在他的文章”The Part-Time Parliament“中阐述了这个问题并给出了解决方案——在所有议员中设立一个总统,只有总统有权发出提议,如果议员有自己的提议,必须发给总统并由总统来提出。好,我们又多了一个角色:总统。

总统——ZK Server Leader

又一个问题产生了,总统怎么选出来的?oh, my god! It’s a long story. 在淘宝核心系统团队的Blog上面有一篇文章是介绍如何选出总统的,有兴趣的可以去看看:http://rdc.taobao.com/blog/cs/?p=162

现在我们假设总统已经选好了,下面看看ZK Server是怎么实施的。

情况一:

屁民甲(Client)到某个议员(ZK Server)那里询问(Get)某条法令的情况(ZNode的数据),议员毫不犹豫的拿出他的记事本(local storage),查阅法令并告诉他结果,同时声明:我的数据不一定是最新的。你想要最新的数据?没问题,等着,等我找总统Sync一下再告诉你。

情况二:

屁民乙(Client)到某个议员(ZK Server)那里要求政府归还欠他的一万元钱,议员让他在办公室等着,自己将问题反映给了总统,总统询问所有议员的意见,多数议员表示欠屁民的钱一定要还,于是总统发表声明,从国库中拿出一万元还债,国库总资产由100万变成99万。屁民乙拿到钱回去了(Client函数返回)。

情况三:

总统突然挂了,议员接二连三的发现联系不上总统,于是各自发表声明,推选新的总统,总统大选期间政府停业,拒绝屁民的请求。

ZAB 原子广播协议

在Leader存在的状态下
在这里插入图片描述

ZK选举过程

1、3888造成两两通信
2、只要任何人投票,都会触发那个准Leader发起自己的投票
3、推选制:先比较Zxid,如果Zxid相同,再比较myid

watch

在这里插入图片描述

API

创建项目study-03,选择一个模板
在这里插入图片描述
填写项目坐标
在这里插入图片描述
填写maven信息
在这里插入图片描述
新建的项目
在这里插入图片描述
maven仓库查找ZooKeeper,客户端版本号要与安装的zk集群版本号一致
在这里插入图片描述
项目中引入ZooKeeper客户端,并下载源码和文档
在这里插入图片描述

callback > reactive 响应式编程

更充分压榨OS、HW资源、性能

package org.garen.study;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * Hello world!
 */
public class App 
{
    public static void main( String[] args ) throws IOException, InterruptedException, KeeperException {

        System.out.println("Hello world!");

        /**
         * 知识点 - 1 : 连接字符串的写法
         */
        String connectString = "192.168.174.62:2181,192.168.174.63:2181,192.168.174.64:2181,192.168.174.62:2181";

        /**
         * 知识点 - 3 : 异步,立即执行完,但是zk还是连接中CONNECTING状态,如果想等连接成功CONNECTED后再继续执行,就要加上阻塞
         *
         * 创建阻塞对象
         */
        final CountDownLatch cd = new CountDownLatch(1);

        /**
         * 知识点 - 2 : 连接zk集群,创建zk对象
         *
         * zk是有session概念的,没有连接池的概念
         * zk连接的节点挂掉了,会重新建立连接,sessionId不变
         */
        final ZooKeeper zk = new ZooKeeper(connectString, 3000,
                /**
                 * 知识点 - 4 : watch 观察,回调
                 * watch1:new zk时候,传入的watch,这个watch,session级别的,跟path、node没有关系。
                 */
                new Watcher() {
            // watch的回调方法
            @Override
            public void process(WatchedEvent event) {
                Event.KeeperState state = event.getState();
                Event.EventType type = event.getType();
                String path = event.getPath();
                System.out.println("new zk watch: " + event.toString());

                switch (state) {
                    case Unknown:
                        break;
                    case Disconnected:
                        break;
                    case NoSyncConnected:
                        break;
                    case SyncConnected:
                        System.out.println("connected");
                        /**
                         * 知识点 - 3 : 异步,立即执行完,但是zk还是连接中CONNECTING状态,如果想等连接成功CONNECTED后再继续执行,就要加上阻塞
                         *
                         * 连接成功,减一,结束阻塞
                         */
                        cd.countDown();
                        break;
                    case AuthFailed:
                        break;
                    case ConnectedReadOnly:
                        break;
                    case SaslAuthenticated:
                        break;
                    case Expired:
                        break;
                    case Closed:
                        break;
                }

                switch (type) {
                    case None:
                        break;
                    case NodeCreated:
                        break;
                    case NodeDeleted:
                        break;
                    case NodeDataChanged:
                        break;
                    case NodeChildrenChanged:
                        break;
                    case DataWatchRemoved:
                        break;
                    case ChildWatchRemoved:
                        break;
                }

            }
        });

        /**
         * 知识点 - 3 : 异步,立即执行完,但是zk还是连接中CONNECTING状态,如果想等连接成功CONNECTED后再继续执行,就要加上阻塞
         *
         * 线程阻塞
         */
        cd.await();

        ZooKeeper.States state = zk.getState();
        switch (state) {
            case CONNECTING:
                System.out.println("ing......");
                break;
            case ASSOCIATING:
                break;
            case CONNECTED:
                System.out.println("ed......");
                break;
            case CONNECTEDREADONLY:
                break;
            case CLOSED:
                break;
            case AUTH_FAILED:
                break;
            case NOT_CONNECTED:
                break;
        }

        /**
         * 知识点 - 5 : 创建一个临时节点
         */
        String pathName = zk.create("/ooxx", "olddata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("created node name : " + pathName);

        final Stat stat = new Stat();
        /**
         * 知识点 - 6 : 同步查询
         */
        byte[] node = zk.getData("/ooxx",
                /**
                 * 知识点 - 4 : watch 观察,回调
                 * watch2:watch的注册值发生在 读类型调用,get exites。
                 */
                new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("getData watch: " + event.toString());
                try {
                    /**
                     * 知识点 - 4 : watch 观察,回调
                     * watch3:true -> watch1, 是创建zk的watch
                     */
//                    zk.getData("/ooxx", true, stat);
                    /**
                     * 知识点 - 4 : watch 观察,回调
                     * watch4:this, 是watch2的watch
                     */
                    zk.getData("/ooxx", this, stat);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, stat);
        System.out.println(new String(node));

        // 触发回调 watch2
        Stat stat1 = zk.setData("/ooxx", "newdata".getBytes(), 0);
        // 触发回调 watch4的this -> watch2
        Stat stat2 = zk.setData("/ooxx", "newdata01".getBytes(), stat1.getVersion());

        /**
         * 知识点 - 7 : 异步查询
         */
        System.out.println("----- async start -----");
        zk.getData("/ooxx", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                System.out.println("----- async callback -----");
                System.out.println(ctx.toString());
                System.out.println(new String(data));
            }
        }, "abcdefg");
        System.out.println("----- async over -----");

        Thread.sleep(1000000000);
    }
}

运行结果

D:\GarenGosling\Java\jdk1.8.0_291\bin\java.exe "-javaagent:D:\GarenGosling\JetBrains\IntelliJ IDEA Educational Edition 2020.2.1\lib\idea_rt.jar=52497:D:\GarenGosling\JetBrains\IntelliJ IDEA Educational Edition 2020.2.1\bin" -Dfile.encoding=UTF-8 -classpath D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\charsets.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\deploy.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\access-bridge-64.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\cldrdata.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\dnsns.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\jaccess.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\jfxrt.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\localedata.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\nashorn.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\sunec.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\sunjce_provider.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\sunmscapi.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\sunpkcs11.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\zipfs.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\javaws.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\jce.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\jfr.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\jfxswt.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\jsse.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\management-agent.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\plugin.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\resources.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\rt.jar;D:\GarenGosling\workspace\idea_study\study-03\target\classes;D:\GarenGosling\apache-maven-3.6.3\repo\org\apache\zookeeper\zookeeper\3.5.9\zookeeper-3.5.9.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\apache\zookeeper\zookeeper-jute\3.5.9\zookeeper-jute-3.5.9.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\apache\yetus\audience-annotations\0.5.0\audience-annotations-0.5.0.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-handler\4.1.50.Final\netty-handler-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-common\4.1.50.Final\netty-common-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-resolver\4.1.50.Final\netty-resolver-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-buffer\4.1.50.Final\netty-buffer-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-transport\4.1.50.Final\netty-transport-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-codec\4.1.50.Final\netty-codec-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-transport-native-epoll\4.1.50.Final\netty-transport-native-epoll-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-transport-native-unix-common\4.1.50.Final\netty-transport-native-unix-common-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\slf4j\slf4j-log4j12\1.7.25\slf4j-log4j12-1.7.25.jar;D:\GarenGosling\apache-maven-3.6.3\repo\log4j\log4j\1.2.17\log4j-1.2.17.jar org.garen.study.App
Hello world!
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
new zk watch: WatchedEvent state:SyncConnected type:None path:null
connected
ed......
created node name : /ooxx
olddata
getData watch: WatchedEvent state:SyncConnected type:NodeDataChanged path:/ooxx
getData watch: WatchedEvent state:SyncConnected type:NodeDataChanged path:/ooxx
----- async start -----
----- async over -----
----- async callback -----
abcdefg
newdata01

上一篇《12 Zookeeper介绍、安装、shell cli 使用,基本概念验证》
下一篇 《14 Zookeeper案例:分布式配置注册发现、分布式锁、ractive模式编程》

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 12:48:25  更:2021-07-30 12:48:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 0:05:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码