ZooKeeper 分布式锁案例
什么叫做分布式锁呢?
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。
那么我们把这个分布式环境下的这个锁叫作分布式锁
锁代码
public class DistributeLock {
private final String connectString = "hadoop113:2181,hadoop114:2181,hadoop115:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
private String currentMode;
private String waitPath;
private CountDownLatch connectDownLatch = new CountDownLatch(1);
private CountDownLatch waitDownLatch = new CountDownLatch(1);
public DistributeLock() throws IOException, KeeperException, InterruptedException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectDownLatch.countDown();
}
if (watchedEvent.getType() == Event.EventType.NodeDeleted
&& watchedEvent.getPath().equals(waitPath)) {
waitDownLatch.countDown();
}
}
});
connectDownLatch.await();
Stat stat = zkClient.exists("/locks", false);
if (stat == null) {
zkClient.create("/locks", "locks".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public void zkLock() throws KeeperException, InterruptedException {
currentMode = zkClient.create("/locks/" + "seq-", null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zkClient.getChildren("/locks", false);
if (children.size() == 1) {
return;
} else {
Collections.sort(children);
String thisNode = currentMode.substring("/locks/".length());
int index = children.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
return;
} else {
waitPath = "/locks/" + children.get(index - 1);
zkClient.getData(waitPath, true, null);
waitDownLatch.await();
}
}
}
public void unZkLock() throws KeeperException, InterruptedException {
zkClient.delete(currentMode, -1);
}
}
测试代码
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
final DistributeLock lock1 = new DistributeLock();
final DistributeLock lock2 = new DistributeLock();
new Thread(new Runnable() {
public void run() {
try {
lock1.zkLock();
System.out.println("线程1启动,获得锁");
Thread.sleep(5 * 1000);
lock1.unZkLock();
System.out.println("线程1释放锁");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
lock2.zkLock();
System.out.println("线程2启动,获得锁");
Thread.sleep(5 * 1000);
lock2.unZkLock();
System.out.println("线程2释放锁");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}).start();
}
}
Curator 框架实现分布式锁案例
1)原生的 Java API 开发存在的问题
(1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
(2)Watch 需要重复注册,不然就不能生效
(3)开发的复杂性还是比较高的
(4)不支持多节点删除和创建。需要自己去递归
2)Curator 是一个专门解决分布式锁的框架,解决了原生 Java API 开发分布式遇到的问题。
详情请查看官方文档:https://curator.apache.org/index.html
3)Curator 案例实操
添加依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
public class CuratorLockTest {
public static void main(String[] args) {
final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() {
public void run() {
try {
lock1.acquire();
System.out.println("线程1获得锁");
lock1.acquire();
System.out.println("线程1再次获得锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程1释放锁");
lock1.release();
System.out.println("线程1再次释放锁");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
public void run() {
try {
lock2.acquire();
System.out.println("线程2获得锁");
lock2.acquire();
System.out.println("线程2再次获得锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程2释放锁");
lock2.release();
System.out.println("线程2再次释放锁");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("hadoop113:2181,hadoop114:2181,hadoop115:2181")
.connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(policy)
.build();
client.start();
System.out.println("zookeeper连接启动成功");
return client;
}
}
|