IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Java并发编程(多线程) -- 第四部分(JUC - 2) -> 正文阅读

[Java知识库]Java并发编程(多线程) -- 第四部分(JUC - 2)

十六、JUC下常用类(包含源码) - 第二部分

1. StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁

	long stamp = lock.readLock();
	lock.unlockRead(stamp);

加解写锁

	long stamp = lock.writeLock();
	lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

	long stamp = lock.tryOptimisticRead();
	// 验戳
	if(!lock.validate(stamp)){
		// 锁升级(乐观读锁升级为读锁)
	}

例子:

	public int read(int readTime) {
		long stamp = lock.tryOptimisticRead();
		log.debug("optimistic read locking...{}", stamp);
		sleep(readTime);
		if (lock.validate(stamp)) {
			log.debug("read finish...{}, data:{}", stamp, data);
			return data;
		}
		// 锁升级 - 读锁
		log.debug("updating to read lock... {}", stamp);
		try {
			stamp = lock.readLock();
			log.debug("read lock {}", stamp);
			sleep(readTime);
			log.debug("read finish...{}, data:{}", stamp, data);
			return data;
		} finally {
			log.debug("read unlock {}", stamp);
			lock.unlockRead(stamp);
		}
	}
	
	public void write(int newData) {
		long stamp = lock.writeLock();
		log.debug("write lock {}", stamp);
		try {
			sleep(2);
			this.data = newData;
		} finally {
			log.debug("write unlock {}", stamp);
			lock.unlockWrite(stamp);
		}
	}

注意:

  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入

2. Semaphore

Semaphore 信号量,用来限制能同时访问共享资源的线程上限。

举例

	public class TestSemaphore {
	    public static void main(String[] args) {
	        // 1. 创建 semaphore 对象,并规定大小为 3
	        Semaphore semaphore = new Semaphore(3);
	
	        // 创建10个线程同时运行
	        for (int i = 0; i < 10; i++) {
	            new Thread(()->{
	                // 加锁
	                try {
	                    semaphore.acquire();
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	                try {
	                    System.out.println("running..." + new Date());
	                    try {
	                        Thread.sleep(1000);
	                    } catch (InterruptedException e) {
	                        e.printStackTrace();
	                    }
	                    System.out.println("end..." + new Date());
	                } finally {
	                    semaphore.release();
	                }
	            }).start();
	        }
	    }
	}

在这里插入图片描述

加锁解锁流程

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一

刚开始,permits(state)为 3,这时 5 个线程来获取资源
在这里插入图片描述
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
在这里插入图片描述
这时 Thread-4 释放了 permits,状态如下
在这里插入图片描述
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
在这里插入图片描述

1. 构造方法,设置信号量原理

	public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

	// 调用到,传给父类
	NonfairSync(int permits) {
        super(permits);
    }
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits); // 发现就是将传入的信号量赋值给了state变量
        }

		......
}

2. acquire() 原理

	public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
	public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

进入Semaphore内部非公平的实现

	protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
	
	// 这里会将信号量(state一直减一,直到减为0,就不能在有更多的线程获取锁了)
	final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;  // state一直减一
            // 当state为0,remaining为-1;会跳过CAS操作,不允许更多线程加入
            if (remaining < 0 ||   
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
	private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
               	 	// tryAcquireShared()内部也就是调用nonfairTryAcquireShared()
                    int r = tryAcquireShared(arg); 
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
               	// 获取不到锁,到AQS队列中park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

被唤醒之后,并且获取到了锁,进入setHeadAndPropagate方法,修改头结点,unpark紧跟在后面的一个共享Node。

	private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
	private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

3. release()原理

	public void release() {
        sync.releaseShared(1);
    }
	public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

进入Semaphore内部非公平的实现

	protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;  // state加1
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))  // CAS修改state
                return true;
        }
    }
	private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                	// 将Node状态从-1 CAS 修改为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 成功 unpark 头结点后的节点,该节点线程再次尝试获取锁
                    unparkSuccessor(h);  
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
	private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-11-28 11:07:59  更:2021-11-28 11:10:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 3:29:29-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码