初识zookeeper
zookeeper的基本概念:
zookeeper的安装与配置
参见zookeeper的安装与配置(转黑马官方)_chaofengdev的博客-CSDN博客
zookeeper命令操作
数据模型
客户端常用命令
服务端常用命令
zookeeper java api操作
curator介绍
curator常用api
package com.itheima;
?
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
?
import java.nio.charset.StandardCharsets;
import java.util.List;
?
public class curatorTest {
? ?//提升作用域
? ?CuratorFramework client;
//==============================================create=========================================================================
? ?/*
? ?建立连接
? ? */
? ?@Test
? ?public void testConnect1(){
? ? ? ?//获取zookeeper client对象的两种方式。
? ? ? ?//1.第一种方式
? ? ? ?/*
? ? ? ?connectString – list of servers to connect to 连接字符串,zkserver的地址端口 “192.168.200.154:2181”
? ? ? ?sessionTimeoutMs – session timeout 会话超时时间
? ? ? ?connectionTimeoutMs – connection timeout 连接超时时间
? ? ? ?retryPolicy – retry policy to use 重试策略
? ? ? ? */
? ? ? ?//重试策略对象
? ? ? ?RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
? ? ? ?//获取client对象(CuratorFramework)
? ? ? ?client = CuratorFrameworkFactory.newClient("192.168.200.154:2181", 60 * 1000, 15 * 1000, retryPolicy);
? ? ? ?//开启连接
? ? ? ?client.start();
? }
?
? ?@Before
? ?public void testConnect2(){
? ? ? ?//2.第二种方式
? ? ? ?//链式编程
? ? ? ?RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
? ? ? ?client = CuratorFrameworkFactory.builder().connectString("192.168.200.154:2181")
? ? ? ? ? ? ? .sessionTimeoutMs(60 * 1000)
? ? ? ? ? ? ? .connectionTimeoutMs(15 * 1000)
? ? ? ? ? ? ? .retryPolicy(retryPolicy).namespace("itheima").build();//使用名称空间
? ? ? ?client.start();
? }
?
? ?//创建结点:持久、临时、顺序、数据
? ?//1.基本创建
? ?//2.创建结点,带有数据
? ?//3.设置结点类型
? ?//4.创建多级结点
? ?@Test
? ?public void testCreate1() throws Exception {
? ? ? ?//1.基本创建
? ? ? ?//创建结点时没有指定结点的数据,会默认存储客户端的ip地址
? ? ? ?String path = client.create().forPath("/app22");
? ? ? ?System.out.println(path);
? }
?
? ?@Test
? ?public void testCreate2() throws Exception {
? ? ? ?//2.创建结点,带有数据
? ? ? ?String path = client.create().forPath("/app2", "zifushuzu".getBytes(StandardCharsets.UTF_8));
? ? ? ?System.out.println(path);
? }
?
? ?@Test
? ?public void testCreate3() throws Exception {
? ? ? ?//3.设置结点类型
? ? ? ?String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "hello".getBytes(StandardCharsets.UTF_8));
? ? ? ?System.out.println(path);
? }
?
? ?@Test
? ?public void testCreate4() throws Exception {
? ? ? ?//4.创建多级结点
? ? ? ?String path = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1","nihaoya".getBytes(StandardCharsets.UTF_8));
? ? ? ?System.out.println(path);
? }
?
? ?@After
? ?public void close(){
? ? ? ?if(client!=null){
? ? ? ? ? ?client.close();
? ? ? }
? }
?
? ?//==================================================get================================================================
?
? ?/**
? ? * 查询结点:
? ? * 1.查询结点数据 get
? ? * 2.查询结点信息 ls
? ? * 3.查询结点状态信息 ls -s
? ? */
? ?@Test
? ?public void testGet1() throws Exception {
? ? ? ?//1.查询结点数据 get
? ? ? ?byte[] data = client.getData().forPath("/app4/p1");
? ? ? ?System.out.println(new String(data));
? }
?
? ?@Test
? ?public void testGet2() throws Exception{
? ? ? ?//2.查询结点信息 ls
? ? ? ?List<String> child = client.getChildren().forPath("/");
? ? ? ?System.out.println(child);
? }
?
? ?@Test
? ?public void testGet3() throws Exception{
? ? ? ?//3.查询结点状态信息 ls -s
? ? ? ?Stat stat = new Stat();
? ? ? ?System.out.println(stat);//查询前为空
? ? ? ?client.getData().storingStatIn(stat).forPath("/app4");
? ? ? ?System.out.println(stat);//查询结点的状态信息
? }
? ?//===============================================set===================================================================
?
? ?/**
? ? * 修改数据
? ? * 1.修改数据
? ? * 2.根据版本修改
? ? * ? ? version是查询出来的,是为了实现原子性操作(锁的机制)
? ? * ? ? Each time a znode's data changes, the version number increases. For instance, whenever a client retrieves data it also receives the version of the data.
? ? * ? ? 每次 znode 的数据发生更改时,版本号都会增加。例如,每当客户端检索数据时,它也会收到数据的版本。
? ? * @throws Exception
? ? */
? ?@Test
? ?public void testSet() throws Exception {
? ? ? ?Stat stat = client.setData().forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));
? ? ? ?System.out.println(stat);
? }
?
? ?@Test
? ?public void testSetForVersion() throws Exception {
? ? ? ?//查询结点相关信息
? ? ? ?Stat stat = new Stat();
? ? ? ?client.getData().storingStatIn(stat).forPath("/app1");
? ? ? ?//查询接地那信息中的版本信息version(本质上是锁的机制)
? ? ? ?int version = stat.getVersion();
? ? ? ?Stat stat1 = client.setData().withVersion(version).forPath("/app1","haha".getBytes());
? }
?
? ?//==========================================delete=====================================================================
?
? ?/**
? ? * 删除结点
? ? * 1.删除单个结点
? ? * 2.删除带有子节点的结点
? ? * 3.必须成功删除
? ? * 4.回调
? ? */
? ?@Test
? ?public void testDelete() throws Exception {
? ? ? ?//1.删除单个结点
? ? ? ?client.delete().forPath("/app1");
? }
?
? ?@Test
? ?public void testDelete2() throws Exception {
? ? ? ?//2.删除带有子节点的结点
? ? ? ?client.delete().deletingChildrenIfNeeded().forPath("/app4");
? }
?
? ?@Test
? ?public void testDelete3() throws Exception{
? ? ? ?//3.必须成功删除(为了防止网络抖动,本质就是重试几次)
? ? ? ?client.delete().guaranteed().forPath("/app2");
? }
?
? ?@Test
? ?public void testDelete4() throws Exception {
? ? ? ?//4.回调(一知半解)
? ? ? ?client.delete().guaranteed().inBackground(new BackgroundCallback() {
? ? ? ? ? ?@Override
? ? ? ? ? ?public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
? ? ? ? ? ? ? ?System.out.println("delete");//提示删除完成
? ? ? ? ? ? ? ?System.out.println(event);//相关信息
? ? ? ? ? }
? ? ? }).forPath("app4");
? }
}
?
package com.itheima;
?
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
?
import java.nio.charset.StandardCharsets;
import java.util.List;
?
public class curatorWatcherTest {
? ?//提升作用域
? ?CuratorFramework client;
//==============================================create=========================================================================
? ?/*
? ?建立连接
? ? */
? ?@Test
? ?public void testConnect1(){
? ? ? ?//获取zookeeper client对象的两种方式。
? ? ? ?//1.第一种方式
? ? ? ?/*
? ? ? ?connectString – list of servers to connect to 连接字符串,zkserver的地址端口 “192.168.200.154:2181”
? ? ? ?sessionTimeoutMs – session timeout 会话超时时间
? ? ? ?connectionTimeoutMs – connection timeout 连接超时时间
? ? ? ?retryPolicy – retry policy to use 重试策略
? ? ? ? */
? ? ? ?//重试策略对象
? ? ? ?RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
? ? ? ?//获取client对象(CuratorFramework)
? ? ? ?client = CuratorFrameworkFactory.newClient("192.168.200.154:2181", 60 * 1000, 15 * 1000, retryPolicy);
? ? ? ?//开启连接
? ? ? ?client.start();
? }
?
? ?@Before
? ?public void testConnect2(){
? ? ? ?//2.第二种方式
? ? ? ?//链式编程
? ? ? ?RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
? ? ? ?client = CuratorFrameworkFactory.builder().connectString("192.168.200.154:2181")
? ? ? ? ? ? ? .sessionTimeoutMs(60 * 1000)
? ? ? ? ? ? ? .connectionTimeoutMs(15 * 1000)
? ? ? ? ? ? ? .retryPolicy(retryPolicy).namespace("itheima").build();//使用名称空间
? ? ? ?client.start();
? }
?
? ?//创建结点:持久、临时、顺序、数据
? ?//1.基本创建
? ?//2.创建结点,带有数据
? ?//3.设置结点类型
? ?//4.创建多级结点
? ?@Test
? ?public void testCreate1() throws Exception {
? ? ? ?//1.基本创建
? ? ? ?//创建结点时没有指定结点的数据,会默认存储客户端的ip地址
? ? ? ?String path = client.create().forPath("/app22");
? ? ? ?System.out.println(path);
? }
?
? ?@Test
? ?public void testCreate2() throws Exception {
? ? ? ?//2.创建结点,带有数据
? ? ? ?String path = client.create().forPath("/app2", "zifushuzu".getBytes(StandardCharsets.UTF_8));
? ? ? ?System.out.println(path);
? }
?
? ?@Test
? ?public void testCreate3() throws Exception {
? ? ? ?//3.设置结点类型
? ? ? ?String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "hello".getBytes(StandardCharsets.UTF_8));
? ? ? ?System.out.println(path);
? }
?
? ?@Test
? ?public void testCreate4() throws Exception {
? ? ? ?//4.创建多级结点
? ? ? ?String path = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1","nihaoya".getBytes(StandardCharsets.UTF_8));
? ? ? ?System.out.println(path);
? }
?
? ?@After
? ?public void close(){
? ? ? ?if(client!=null){
? ? ? ? ? ?client.close();
? ? ? }
? }
?
? ?//==================================================get================================================================
?
? ?/**
? ? * 查询结点:
? ? * 1.查询结点数据 get
? ? * 2.查询结点信息 ls
? ? * 3.查询结点状态信息 ls -s
? ? */
? ?@Test
? ?public void testGet1() throws Exception {
? ? ? ?//1.查询结点数据 get
? ? ? ?byte[] data = client.getData().forPath("/app4/p1");
? ? ? ?System.out.println(new String(data));
? }
?
? ?@Test
? ?public void testGet2() throws Exception{
? ? ? ?//2.查询结点信息 ls
? ? ? ?List<String> child = client.getChildren().forPath("/");
? ? ? ?System.out.println(child);
? }
?
? ?@Test
? ?public void testGet3() throws Exception{
? ? ? ?//3.查询结点状态信息 ls -s
? ? ? ?Stat stat = new Stat();
? ? ? ?System.out.println(stat);//查询前为空
? ? ? ?client.getData().storingStatIn(stat).forPath("/app4");
? ? ? ?System.out.println(stat);//查询结点的状态信息
? }
? ?//===============================================set===================================================================
?
? ?/**
? ? * 修改数据
? ? * 1.修改数据
? ? * 2.根据版本修改
? ? * ? ? version是查询出来的,是为了实现原子性操作(锁的机制)
? ? * ? ? Each time a znode's data changes, the version number increases. For instance, whenever a client retrieves data it also receives the version of the data.
? ? * ? ? 每次 znode 的数据发生更改时,版本号都会增加。例如,每当客户端检索数据时,它也会收到数据的版本。
? ? * @throws Exception
? ? */
? ?@Test
? ?public void testSet() throws Exception {
? ? ? ?Stat stat = client.setData().forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));
? ? ? ?System.out.println(stat);
? }
?
? ?@Test
? ?public void testSetForVersion() throws Exception {
? ? ? ?//查询结点相关信息
? ? ? ?Stat stat = new Stat();
? ? ? ?client.getData().storingStatIn(stat).forPath("/app1");
? ? ? ?//查询接地那信息中的版本信息version(本质上是锁的机制)
? ? ? ?int version = stat.getVersion();
? ? ? ?Stat stat1 = client.setData().withVersion(version).forPath("/app1","haha".getBytes());
? }
?
? ?//==========================================delete=====================================================================
?
? ?/**
? ? * 删除结点
? ? * 1.删除单个结点
? ? * 2.删除带有子节点的结点
? ? * 3.必须成功删除
? ? * 4.回调
? ? */
? ?@Test
? ?public void testDelete() throws Exception {
? ? ? ?//1.删除单个结点
? ? ? ?client.delete().forPath("/app1");
? }
?
? ?@Test
? ?public void testDelete2() throws Exception {
? ? ? ?//2.删除带有子节点的结点
? ? ? ?client.delete().deletingChildrenIfNeeded().forPath("/app4");
? }
?
? ?@Test
? ?public void testDelete3() throws Exception{
? ? ? ?//3.必须成功删除(为了防止网络抖动,本质就是重试几次)
? ? ? ?client.delete().guaranteed().forPath("/app2");
? }
?
? ?@Test
? ?public void testDelete4() throws Exception {
? ? ? ?//4.回调(一知半解)
? ? ? ?client.delete().guaranteed().inBackground(new BackgroundCallback() {
? ? ? ? ? ?@Override
? ? ? ? ? ?public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
? ? ? ? ? ? ? ?System.out.println("delete");//提示删除完成
? ? ? ? ? ? ? ?System.out.println(event);//相关信息
? ? ? ? ? }
? ? ? }).forPath("app4");
? }
?
?
? ?//======================================watch========================================================================
? ?/**
? ? * 给指定一个结点的*子节点*注册监听器
? ? *
? ? */
? ?@Test
? ?public void testPathChildrenCache() throws Exception{
? ? ? ?//1.创建监听对象
? ? ? ?PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
?
? ? ? ?//2.绑定监听器
? ? ? ?pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
? ? ? ? ? ?@Override
? ? ? ? ? ?public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
? ? ? ? ? ? ? ?System.out.println("child node has changed...");
? ? ? ? ? ? ? ?System.out.println(event);
? ? ? ? ? ? ? ?//监听子节点的数据变更,并且拿到变更后的数据
? ? ? ? ? ? ? ?//1.获取数据类型
? ? ? ? ? ? ? ?PathChildrenCacheEvent.Type type = event.getType();
? ? ? ? ? ? ? ?//2.判断类型是否是update
? ? ? ? ? ? ? ?if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
? ? ? ? ? ? ? ? ? ?//PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p2', stat=192,209,1643545922168,1643546628992,2,0,0,0,3,0,192
? ? ? ? ? ? ? ? ? ?//, data=[49, 49, 49]}}
? ? ? ? ? ? ? ? ? ?System.out.println("child node update...");
? ? ? ? ? ? ? ? ? ?byte[] data = event.getData().getData();//根据event的输出格式来理解此处代码
? ? ? ? ? ? ? ? ? ?System.out.println(new String(data));
? ? ? ? ? ? ? }
? ? ? ? ? }
? ? ? });
?
? ? ? ?//3.开启监听器
? ? ? ?pathChildrenCache.start();
? ? ? ?while(true){
? ? ? ? ? ?//死循环
? ? ? }
? }
?
? ?@Test
? ?public void testNodeCache() throws Exception{
? ? ? ?//1.创建NodeCache对象
? ? ? ?NodeCache nodeCache = new NodeCache(client, "/app1");
? ? ? ?//2.注册监听-理解
? ? ? ?nodeCache.getListenable().addListener(new NodeCacheListener() {
? ? ? ? ? ?@Override
? ? ? ? ? ?public void nodeChanged() throws Exception {
? ? ? ? ? ? ? ?System.out.println("node has changed...");
? ? ? ? ? ? ? ?//获取修改结点后的数据
? ? ? ? ? ? ? ?byte[] data = nodeCache.getCurrentData().getData();
? ? ? ? ? ? ? ?System.out.println(new String(data));
? ? ? ? ? }
? ? ? });
? ? ? ?//3.开启监听,如果设置为true,开启监听时加载缓存数据。
? ? ? ?nodeCache.start(true);
? ? ? ?//死循环用于制造客户端一直连接并监听相关结点的情况
? ? ? ?//这里一直循环,表示客户端一直存活,服务端有响应会返回;
? ? ? ?//否则单元测试结束,表示客户端结束,服务端有结果也无法返回。
? ? ? ?while(true){
? ? ? ? ? ?//死循环
? ? ? }
? }
?
? ?//=========================================treeCache==================================================================
? ?@Test
? ?public void testTreeCache() throws Exception {
? ? ? ?//1.创建监听器
? ? ? ?TreeCache treeCache = new TreeCache(client, "/app2");
? ? ? ?//2.注册监听
? ? ? ?treeCache.getListenable().addListener(new TreeCacheListener() {
? ? ? ? ? ?@Override
? ? ? ? ? ?public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
? ? ? ? ? ? ? ?System.out.println("treeCache changed...");
? ? ? ? ? ? ? ?System.out.println(event);
? ? ? ? ? }
? ? ? });
? ? ? ?//3.开启监听
? ? ? ?treeCache.start();
? ? ? ?while (true) {
? ? ? }
? }
?
}
?
分布式锁
模拟12306
package com.itheima;
?
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
?
import java.util.concurrent.TimeUnit;
?
/**
* 基于zookeeper实现分布式锁。
* 基本原理参见:zookeeper分布式锁原理
* 核心思想:当客户端要获取锁,创建结点;使用完锁,则删除结点;
* 客户端获取锁时,在lock结点下创建*临时顺序结点*
* 其他过程暂略。
*/
public class Ticket12306 implements Runnable{
? ?private int ticket = 100000000;//票数
? ?private InterProcessMutex interProcessMutex;//互斥锁
? ?CuratorFramework client;//客户端对象
? ?public Ticket12306() {
? ? ? ?RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
? ? ? ?client = CuratorFrameworkFactory.builder().connectString("192.168.200.154:2181")
? ? ? ? ? ? ? .sessionTimeoutMs(60 * 1000)
? ? ? ? ? ? ? .connectionTimeoutMs(15 * 1000)
? ? ? ? ? ? ? .retryPolicy(retryPolicy).namespace("itheima").build();//使用名称空间
? ? ? ?client.start();
? ? ? ?interProcessMutex = new InterProcessMutex(client,"/lock");
? }
?
? ?@Override
? ?public void run() {
? ? ? ?while(true){
? ? ? ? ? ?//加锁
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?interProcessMutex.acquire(3, TimeUnit.SECONDS);
? ? ? ? ? ? ? ?if (ticket > 0) {
? ? ? ? ? ? ? ? ? ?System.out.println(Thread.currentThread().getName()+":"+ticket);
? ? ? ? ? ? ? ? ? ?ticket--;
? ? ? ? ? ? ? }
? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }finally {
? ? ? ? ? ? ? ?//释放锁
? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ?interProcessMutex.release();//突然理解异常的用处了。
? ? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ? ? }
? ? ? ? ? }
?
? ? ? }
? }
}
package com.itheima;
?
/**
* 测试基于zookeeper实现的分布式锁
*/
public class lockTest {
? ?public static void main(String[] args) {
? ? ? ?Ticket12306 ticket12306 = new Ticket12306();
? ? ? ?//创建客户端
? ? ? ?Thread xiecheng = new Thread(ticket12306, "xiecheng");
? ? ? ?Thread feizhu = new Thread(ticket12306, "feizhu");
? ? ? ?//开启两个线程来运行买票服务
? ? ? ?xiecheng.start();
? ? ? ?feizhu.start();
? }
}
?
zookeeper集群搭建
参见:搭建Zookeeper集群(转黑马官方,补充了一点可能遇到的小坑)_chaofengdev的博客-CSDN博客
zookeeper核心理论
|