IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spring Boot基于zookeeper原生方式实现分布式锁 -> 正文阅读

[大数据]Spring Boot基于zookeeper原生方式实现分布式锁

一、背景

??我在之前的文章SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理详细介绍了它的使用及其原理,现在我们也根据这个思路,用zookeeper原生的方式来实现一个分布式锁,加深对分布式锁的理解。本文中Spring Boot的版本是2.5.2zookeeper的版本是3.6.3

??我们大致的大致的流程图如下图,可作为我们查看代码的一个思路,不然看的头大。(当然本图是没有包含可重入锁的流程判断在里面的
在这里插入图片描述

二、maven依赖

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.alian</groupId>
    <artifactId>zklock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>zklock</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>
		
		<!--主要用于Maps.newConcurrentMap()-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1-jre</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.14</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

三、配置

3.1、application.yml配置

application.yml

server:
  port: 8082
  servlet:
    context-path: /zklock

app:
  zookeeper:
    server: 10.130.3.16:2181
    session-timeout: 15000
    #这里配置的路径没有用"/"结尾
    root-lock-path: /root/alian

3.2、属性配置类

??此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式

AppProperties.java

package com.alian.zklock.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "app.zookeeper")
public class AppProperties {

    /**
     * zookeeper服务地址
     */
    private String server;

    /**
     * session超时时间
     */
    private int sessionTimeout;

    /**
     * 分布式锁路径
     */
    private String rootLockPath;

}

3.3、ZookeeperConfig配置件

ZookeeperConfig.java

package com.alian.zklock.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.CountDownLatch;

@Slf4j
@Configuration
public class ZookeeperConfig {

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    @Autowired
    private AppProperties appProperties;

    @Bean
    public ZooKeeper zookeeper() throws Exception {
        ZooKeeper zookeeper = new ZooKeeper(appProperties.getServer(), appProperties.getSessionTimeout(), event -> {
            log.info("Receive watched event: {}", event.getState());
            //获取事件的状态
            KeeperState keeperState = event.getState();
            //获取时间类型
            EventType eventType = event.getType();
            //如果是建立连接
            if (KeeperState.SyncConnected == keeperState) {
                if (EventType.None == eventType) {
                    //如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
                    countDownLatch.countDown();
                    log.info("zookeeper建立连接");
                }
            }
        });
        //进行阻塞,当执行countDownLatch.countDown();后续代码才会进行
        countDownLatch.await();
        return zookeeper;
    }

}

??这里主要是对ZooKeeper 进行连接配置,关于CountDownLatch的使用,本文最后有相关的介绍。

四、实战

??定义了两个方法:加锁和释放锁。

4.1、接口

ILockService.java

package com.alian.zklock.service;

import java.util.concurrent.TimeUnit;

public interface ILockService {

    /**
     * 加锁
     *
     * @param lockPath
     * @param time
     * @param unit
     * @return
     */
    boolean lock(String lockPath, long time, TimeUnit unit);

    /**
     * 释放锁
     *
     * @return
     */
    void release();

}

4.2、接口核心实现

??这个实现类的注释,我想已经很详细了。可以细细阅读,可以加深你对zookeeper分布式锁实现原理的理解。

ZookeeperLockService.java

package com.alian.zklock.service.impl;

import com.alian.zklock.service.ILockService;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Service
public class ZookeeperLockService implements ILockService {

	//依赖需要导入:<groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>30.1-jre</version>
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

    @Autowired
    private ZooKeeper zooKeeper;

    //好的思想直接拿来用
    private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);
		//构造方法
        private LockData(Thread owningThread, String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

    /**
     * 加锁
     *
     * @param lockPath
     * @return
     * @throws Exception
     */
    public boolean lock(String lockPath, long time, TimeUnit unit) {
        //可重入,确保同一线程,可以重复加锁
        Thread currentThread = Thread.currentThread();
        //根据线程号获取线程锁数据
        LockData lockData = threadData.get(currentThread);
        if (lockData != null) {
            // 说明该线程已加锁过,直接放行
            lockData.lockCount.incrementAndGet();
            return true;
        }
        String currentLockPath = attemptLock(lockPath, time, unit);
        //如果不为空则表示获取到了锁
        if (StringUtils.isNotBlank(currentLockPath)) {
            //把数据缓存起来
            LockData newLockData = new LockData(currentThread, currentLockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }

    /**
     * 尝试获取锁,获取成功返回锁路径
     *
     * @param lockPath
     * @param time
     * @param unit
     * @return
     */
    public String attemptLock(String lockPath, long time, TimeUnit unit) {
        //创建临时有序节点,传入的lockPath没有"/"
        try {
            String currentLockPath = zooKeeper.create(lockPath + "/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            log.info("线程:【{}】->【{}】尝试竞争锁", Thread.currentThread().getName(), currentLockPath);
            //创建临时节点失败
            if (StringUtils.isBlank(currentLockPath)) {
                throw new Exception("生成临时节点异常");
            }
            //检查当前节点是否获取到了锁
            boolean hasLock = checkLocked(lockPath, currentLockPath, time, unit);
            //获取到了锁则返回锁节点路径
            return hasLock ? currentLockPath : null;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 检查是否获取到锁
     *
     * @param lockPath
     * @param currentLockPath
     * @param time
     * @param unit
     * @return
     * @throws Exception
     */
    public boolean checkLocked(String lockPath, String currentLockPath, long time, TimeUnit unit) {
        boolean hasLock = false;
        boolean toDelete = false;
        try {
            while (!hasLock) {
                //检查是否获取到了锁,没有获取到则返回前一个节点
                Pair<Boolean, String> pair = getsTheLock(lockPath, currentLockPath);
                //当前节点是否获取到了锁
                boolean currentLock = pair.getLeft();
                //获取前一个节点
                String preSequencePath = pair.getRight();
                if (currentLock) {
                    //获取到了锁
                    hasLock = true;
                } else {
                    //等待
                    final CountDownLatch latch = new CountDownLatch(1);
                    //订阅比自己次小顺序节点的删除事件
                    Watcher watcher = watchedEvent -> {
                        log.info("监听到的变化【】 watchedEvent = {}", watchedEvent);
                        latch.countDown();
                    };
                    Stat stat = zooKeeper.exists(preSequencePath, watcher);
                    if (stat != null) {
                        log.info("线程:【{}】等待锁【{}】释放", Thread.currentThread().getName(), preSequencePath);
                        boolean await = latch.await(time, unit);
                        if (!await) {
                            //说明超时了
                            log.info("获取锁超时");
                            toDelete = true;
                            break;
                        }
                    }
                    //检查锁
                    Pair<Boolean, String> checkPair = getsTheLock(lockPath, currentLockPath);
                    if (checkPair.getLeft()) {
                        hasLock = true;
                    }
                }
            }
        } catch (Exception e) {
            log.error("检查是否获取到锁异常", e);
            if (e instanceof InterruptedException) {
                toDelete = true;
            }
        } finally {
            if (toDelete) {
                deleteCurrentPath(currentLockPath);
            }
        }
        return hasLock;
    }

    /**
     * 检测是否已经获取到了锁,没有获取到则返回前一个节点
     *
     * @param lockPath
     * @param currentLock
     * @return
     * @throws Exception
     */
    private Pair<Boolean, String> getsTheLock(String lockPath, String currentLock) throws Exception {
        //获取根节点下所有子节点,不能用/结尾
        List<String> childrenList = zooKeeper.getChildren(lockPath, false);
        //节点按照编号,升序排列
        Collections.sort(childrenList);
        //如果是第一个,代表自己已经获得了锁
        String currentLockNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
        if (currentLockNode.equals(childrenList.get(0))) {
            log.info("节点【{}】成功的获取分布式锁", currentLock);
            return Pair.of(true, "");
        }
        //判断自己排第几个,返回的是对象所在列表的序号
        int index = Collections.binarySearch(childrenList, currentLockNode);
        if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有自己了
            throw new Exception("节点没有找到: " + currentLockNode);
        }
        //如果没有获得锁,则要监听前一个节点
        String preSequencePath = lockPath + "/" + childrenList.get(index - 1);
        //返回监听的前一个节点
        return Pair.of(false, preSequencePath);
    }

    /**
     * 删除当前获取锁的节点
     *
     * @param currentLockPath
     */
    private void deleteCurrentPath(String currentLockPath) {
        try {
            //判断路径是否存在
            Stat stat = zooKeeper.exists(currentLockPath, false);
            if (stat != null) {
                //存在则删除
                zooKeeper.delete(currentLockPath, -1);
            }
        } catch (InterruptedException | KeeperException e) {
            log.error("删除节点异常");
        }
    }

    @Override
    public void release() {
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程的数据
        LockData lockData = threadData.get(currentThread);
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: ");
        }
        //锁计数器减1
        int newLockCount = lockData.lockCount.decrementAndGet();
        if (newLockCount > 0) {
            //可重入锁,暂时不擅长节点
            return;
        }
        if (newLockCount < 0) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: ");
        }
        try {
            //删除节点
            zooKeeper.delete(lockData.lockPath, -1);
            log.info("线程:【{}】释放锁【{}】", Thread.currentThread().getName(), lockData.lockPath);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        } finally {
            threadData.remove(currentThread);
        }
    }

}

4.3、测试类

??我们为了方便检验我们的分布式锁,初始化库存为100,就使用3个线程进行并发,每个线程减55个库存,我这里也不使用测试工具jmeter了,就相当于单机测试了。(如果是要进行分布式部署测试,那么库存值不能像我这样直接在程序写死 ,可以放redis或者数据库,然后通过负载均衡、压力测试工具jmeter去完成,具体使用可以参考:windows下Nginx配置及负载均衡使用),我们主要目的是:为了验证我们写的分布式锁,加深对分布式锁的理解

TestLockService.java

package com.alian.zklock.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Service
public class TestLockService {

    @Autowired
    private ILockService lockService;

    AtomicInteger stock = new AtomicInteger(100);

    @PostConstruct
    public void testLock() {
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    //使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值
                    countDownLatch.await();
                    //获得锁
                    boolean lock = lockService.lock("/root/alian", 10, TimeUnit.SECONDS);
                    if (lock) {
                        //业务处理
                        Thread.sleep(100);
                        //库存减1
                        decrement();
                        //释放锁
                        lockService.release();
                        log.info("线程【{}】扣减完,剩余库存:{}", Thread.currentThread().getName(), stock.get());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {

                }
            }, "Thread" + i).start();
            //递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少.
            countDownLatch.countDown();
        }
    }

    private void decrement() {
        for (int i = 0; i < 5; i++) {
            stock.decrementAndGet();
        }
    }

}

4.4、结果

运行结果图:
在这里插入图片描述
从我们的结果图可以看出来(为了方便,节点前面的变化文章里就省略了,实际是存在的):

  • 同时三个线程(Thread0、Thread1、Thread2)创建了节点(180、179,181)去抢占资源
  • Thread1创建的179号节点是最小的,获取到了锁,这时候,Thread0监听179节点,Thread2监听180节点
  • Thread1扣减库存5次,然后释放锁,也就是删除了节点179,触发监听
  • 因为Thread0监听179节点,所以Thread0继续执行抢占到了锁,同样扣减库存后,删除180节点
  • 然后Thread2监听的是180节点,同样的Thread2抢占到了锁,扣减库存,删除181节点
  • 最后得到库存85

超时的验证则可以在业务执行的时候设置一个休眠时间,可重入锁也是支持的,直接使用curator里面的,优秀的东西就直接拿来用了

4.5、关于CountDownLatch

也许有很多小伙伴,不知道CountDownLatch是怎么用的,我这里就简单介绍下,主要有两个方法:

  • public void countDown()

递减锁存器的计数,如果计数到达零,则释放所有等待的线程,如果当前计数大于零,则将计数减少。

  • public boolean await(long timeout,TimeUnit unit) throws InterruptedException

??使当前线程在锁存器倒计数至0之前一直等待,除非线程被中断或超出了指定的等待时间。如果计数到达零,则返回true;如果在计数到达零之前超过了等待时间,则返回false,以下三种情况之一前,该线程将一直出于休眠状态:

  • 如果计数到达零,则该方法返回true值
  • 如果超出了指定的等待时间,则返回值为false。如果该时间小于等于零,则该方法根本不会等待
  • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态

??类似本文中的测试方法,就相当于并发,当三个线程都创建完,都走到countDownLatch.await()这里就不执行了,直到执行countDownLatch.countDown()后面才会走。

    public void race() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    countDownLatch.await();
                    Thread.sleep(100);
                    log.info(Thread.currentThread().getName()+"开始跑步");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "Thread" + i).start();
        }
        countDownLatch.countDown();
        log.info("主线程执行完");
    }

结果:

2021-10-26 20:43:06 458 [main] INFO:主线程执行完
2021-10-26 20:43:06 561 [Thread2] INFO:Thread2开始跑步
2021-10-26 20:43:06 561 [Thread0] INFO:Thread0开始跑步
2021-10-26 20:43:06 561 [Thread1] INFO:Thread1开始跑步

??我们也可以反过来,使主线程阻塞,这个时候就是线程执行到countDownLatch.await()后,主线程后面的不执行,直到前面的子线程都执行完,主线程才往后执行。

    public void multitasking() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                log.info(Thread.currentThread().getName()+"执行完");
                countDownLatch.countDown();
            }, "Thread" + i).start();
        }
        countDownLatch.await();
        log.info("主线程执行完");
    }

结果:

2021-10-26 20:45:21 053 [Thread0] INFO:Thread0执行完
2021-10-26 20:45:21 053 [Thread1] INFO:Thread1执行完
2021-10-26 20:45:21 053 [Thread2] INFO:Thread2执行完
2021-10-26 20:45:21 053 [main] INFO:主线程执行完

结语

??也许本文的写的分布式还有些许的瑕疵,但我们主要目的是:为了加深对zookeeper分布式锁实现原理的理解,实际使用中我们还是使用curator是比较方便和稳定,具体可以参考我另外一篇文章:SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-28 12:27:20  更:2021-10-28 12:29:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 1:51:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码