1、 自定义锁工具类
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributeLock {
private final ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String watchPath;
private String currentNode;
public DistributeLock() throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper("127.0.0.1:2181", 2000, watchedEvent -> {
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected){
connectLatch.countDown();
}
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(watchPath)) {
waitLatch.countDown();
}
});
connectLatch.await();
Stat exists = zk.exists("/locks", false);
if (exists == null) {
zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public void zkLock() throws KeeperException, InterruptedException {
currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zk.getChildren("/locks", false);
if (children.size() == 1){
return;
}else {
Collections.sort(children);
String thisNode = currentNode.substring("/locks/".length());
int index = children.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
return;
} else {
watchPath = "/locks/"+ children.get(index -1);
zk.getData(watchPath,true,null);
waitLatch.await();
}
}
}
public void unZkLock(){
try {
zk.delete(currentNode,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
- 测试自定义
public class DistributeLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
final DistributeLock lock1 = new DistributeLock();
final DistributeLock lock2 = new DistributeLock();
new Thread(() -> {
try {
lock1.zkLock();
System.out.println("线程1 启动获取到锁!");
test(1);
Thread.sleep(5*1000);
lock1.unZkLock();
System.out.println("线程1 释放锁!");
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
lock2.zkLock();
System.out.println("线程2 启动获取到锁!");
test(2);
Thread.sleep(5*1000);
lock2.unZkLock();
System.out.println("线程2 释放锁!");
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}).start();
}
public static void test(Integer num){
System.out.println(num);
}
}
- 使用Curator 框架
public class CuratorTest {
public static void main(String[] args) {
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(() -> {
try {
lock1.acquire();
System.out.println("lock1 获取锁");
Thread.sleep(5000);
lock1.release();
System.out.println("lock1 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
lock2.acquire();
System.out.println("lock2 获取锁");
Thread.sleep(5000);
lock2.release();
System.out.println("lock2 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 3);
CuratorFramework build = CuratorFrameworkFactory.builder().connectString("127.0.1.1:2181")
.connectionTimeoutMs(2000)
.retryPolicy(exponentialBackoffRetry)
.build();
build.start();
System.out.println("zk 连接成功");
return build;
}
}
- pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.9</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
</dependencies>
- log4sj
### 设置###
log4j.rootLogger = debug,stdout,D,E
### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = E://logs/log.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
### 输出ERROR 级别以上的日志到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File =E://logs/error.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
|