服务器动态上下线监听案例
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。 如上图所示,我们希望客户端能够实时监听服务器的节点变化情况。
具体实现
- (1)先在集群上创建
/servers 节点[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers
- (2)在 Idea 中创建包名:
com.atguigu.zkcase1 - (3)服务器端向 Zookeeper 注册代码
package com.atguigu.zkcase1;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
public class DistributeServer {
private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
public void getConnect() throws IOException{
zk = new ZooKeeper(connectString, sessionTimeout, new
Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
public void registServer(String hostname) throws Exception{
String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname +" is online "+ create);
}
public void business(String hostname) throws Exception{
System.out.println(hostname + " is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
DistributeServer server = new DistributeServer();
server.getConnect();
server.registServer(args[0]);
server.business(args[0]);
}
}
- 客户端代码,监听zk的节点变化
package com.atguigu.zkcase1;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DistributeClient {
private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void getServerList() throws Exception {
List<String> children = zk.getChildren(parentNode, true);
ArrayList<String> servers = new ArrayList<>();
for (String child : children) {
byte[] data = zk.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}
System.out.println(servers);
}
public void business() throws Exception{
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
DistributeClient client = new DistributeClient();
client.getConnect();
client.getServerList();
client.business();
}
}
测试
1)在 Linux 命令行上操作增加减少服务器
-
(1)启动 DistributeClient 客户端 -
(2)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点 [zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop102 "hadoop102"
[zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop103 "hadoop103"
-
(3)观察 Idea 控制台变化
[hadoop102, hadoop103]
-
(4)执行删除操作 [zk: localhost:2181(CONNECTED) 8] delete /servers/hadoop1020000000000
-
(5)观察 Idea 控制台变化 [hadoop103]
2)在 Idea 上操作增加减少服务器
- (1)启动
DistributeClient 客户端(如果已经启动过,不需要重启 - (2)启动
DistributeServer 服务
- ①点击 Edit Configurations…
- ②在弹出的窗口中(Program arguments)输入想启动的主机,例如,
hadoop102 - 回到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击
Run “DistributeServer.main()” - ④观察 DistributeServer 控制台,提示 hadoop102 is working
- ⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线
ZooKeeper 分布式锁案例
获取锁
- 在 Zookeeper 当中创建一个持久节,当第一个客户端 Client1 想要获得锁时,需要在这个节点下面创建一个临时顺序节点。
- Client1查找持久节点下面所有的临时顺序节点并排序,判断自己所创建的节点是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。
- 如果再有一个客户端 Client2 前来获取锁,则在持久节点下面再创建一个临时顺序节点Lock2。
- Client2查找持久节点下面所有的临时顺序节点并排序,判断自己所创建的节点Lock2是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的。于是,Client2向排序仅比它靠前的节点Lock1注册Watcher,用于监听Lock1节点是否存在。这意味着Client2抢锁失败,进入了等待状态。
- 如果又有一个客户端Client3前来获取锁,则在持久节点下载再创建一个临时顺序节点Lock3。
- Client3查找持久节点下面所有的临时顺序节点并排序,判断自己所创建的节点Lock3是不是顺序最靠前的一个,结果同样发现节点Lock3并不是最小的。于是,Client3向排序仅比它靠前的节点Lock2注册Watcher,用于监听Lock2节点是否存在。这意味着Client3同样抢锁失败,进入了等待状态。
释放锁
释放锁就比较简单了,因为前面创建的临时顺序节点,所以在出现下面两种情况时,都会自动释放锁:
- 任务完成后,Client 会释放锁。
- 任务没完成,Client 就崩溃了,也会自动释放锁。
代码编写
- 分布式锁的编写
package com.atguigu.lock2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock {
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
private String rootNode = "locks";
private String subNode = "seq-";
private String waitPath;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String currentNode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath))
{
waitLatch.countDown();
}
}
});
connectLatch.await();
Stat stat = zk.exists("/" + rootNode, false);
if (stat == null) {
System.out.println("根节点不存在");
zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public void zkLock() {
try {
currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
Thread.sleep(10);
List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
if (childrenNodes.size() == 1) {
return;
} else {
Collections.sort(childrenNodes);
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
return;
} else {
this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
zk.getData(waitPath, true, new Stat());
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void zkUnlock() {
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
- 测试zookeeper 的分布式锁
package com.atguigu.lock2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributedLockTest {
public static void main(String[] args) throws
InterruptedException, IOException, KeeperException {
final DistributedLock lock1 = new DistributedLock();
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("线程 1 获取锁");
Thread.sleep(5 * 1000);
lock1.zkUnlock();
System.out.println("线程 1 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("线程 2 获取锁");
Thread.sleep(5 * 1000);
lock2.zkUnlock();
System.out.println("线程 2 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
- 观察控制台变化:
线程 1 获取锁
线程 1 释放锁
线程 2 获取锁
线程 2 释放锁
Curator 框架实现分布式锁案例
1)原生的 Java API 开发存在的问题
- (1)会话连接是异步的,需要自己去处理。比如使用 `CountDownLatch
- (2)Watch 需要重复注册,不然就不能生效
- (3)开发的复杂性还是比较高的
- (4)不支持多节点删除和创建。需要自己去递归
2)Curator 是一个专门解决分布式锁的框架
- Curator 解决了原生 JavaAPI 开发分布式遇到的问题
- 详情请查看官方文档:
https://curator.apache.org/index.html
3)Curator 案例实操
- 1、添加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
- 代码编写
package com.atguigu.lock;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import
org.apache.curator.framework.recipes.locks.InterProcessLock;
import
org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
private String rootNode = "/locks";
private String connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int connectionTimeout = 2000;
private int sessionTimeout = 2000;
public static void main(String[] args) {
new CuratorLockTest().test();
}
private void test() {
final InterProcessLock lock1 = new
InterProcessMutex(getCuratorFramework(), rootNode);
final InterProcessLock lock2 = new
InterProcessMutex(getCuratorFramework(), rootNode);
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.acquire();
System.out.println("线程 1 获取锁");
lock1.acquire();
System.out.println("线程 1 再次获取锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程 1 释放锁");
lock1.release();
System.out.println("线程 1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.acquire();
System.out.println("线程 2 获取锁");
lock2.acquire();
System.out.println("线程 2 再次获取锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程 2 释放锁");
lock2.release();
System.out.println("线程 2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public CuratorFramework getCuratorFramework () {
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectString)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(policy).build();
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}
- 观察控制台变化
观察控制台变化:
线程 1 获取锁
线程 1 再次获取锁
线程 1 释放锁
线程 1 再次释放锁
线程 2 获取锁
线程 2 再次获取锁
线程 2 释放锁
线程 2 再次释放锁
|