AQS概述
- AbstractQueuedSynchronizer(抽象队列式同步器)
- 多线程访问共享资源的同步器框架
- AQS是实现锁的关键
锁面向使用者,AQS面向锁实现者
AQS底层结构
1.state
- AQS内部用volatile修饰的int类型的成员变量state来控制同步状态
state=0:表示没有线程正在独占共享资源的锁
state=1:表示有线程正在共享资源的锁
1.占没占
独占锁state两个状态01表示占没占
读写锁32位数拆成前后16位分别表示两把锁
2.占多少
state数字量
getState():获取当前的同步状态
setState():设置当前同步状态,非安全
compareAndSetState():使用CAS设置状态,保证状态设置的原子性
2.CLH队列
- 虚拟双端队列(不存在队列实例,只存在节点间的关联关系)
- AQS内部将请求共享资源的线程包装成CLH锁队列的一个节点Node实现锁分配
3.模板模式
- 工作的流程固定,但具体实现不固定
- 比如:JdbcTemplate、HttpServlet
1.JDBC中获取数据库连接等流程是固定不变的,只是sql语句不同,返回结果包装成什么对象不确定
JdbcTemplate将其固定流程封装,只需用户编写sql以及返回部分
--------------------------------------------------
@Override
public Account findAccount(Integer accountid) {
Account account = this.jdbcTemplate.queryForObject("select * from account where accountid=" + accountid, (rs, rowNum) -> {
Account newAccount = new Account();
newAccount.setAccountid(rs.getInt(1));
newAccount.setBalance(rs.getDouble(2));
return newAccount;
});
return (Account) account;
}
2.HttpServlet的调用流程是 构造->init->service->doGet/doPost..
service中判断具体调用doGet/doPost..哪个方法
而doGet/doPost..需要自己实现
1.独占式——获取
acquire(int arg)
acquireInterruptibly(int arg):响应中断
boolean tryAcquireNanos(int arg, long nanosTimeout):尝试获取,有超时设置
2.独占式——释放
release()
---------------------------------------------
1.共享式——获取
acquireShared(int arg)
acquireSharedInterruptibly(int arg):响应中断
boolean tryAcquireSharedNanos(int arg, long nanosTimeout):尝试获取,有超时设置
2.共享式——释放
releaseShared()
1.独占式
tryAcquire(int):尝试获取资源
成功则返回true,失败则返回false
tryRelease(int):尝试释放资源
成功则返回true,失败则返回false
2.共享式
tryAcquireShared(int):尝试获取资源
负数表示失败
0表示成功,且没有剩余可用资源
正数表示成功,且有剩余资源
tryReleaseShared(int):尝试释放资源
释放后允许唤醒后续等待结点返回true,否则返回false
3.同步器是否处于独占模式
isHeldExclusively()
一般情况,只需实现共享或独占式的一种即可
但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
-----------------------------------------
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
-----------------------------------------
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
自定义CountDownLatch(共享锁)
public class MyCountDownLatch {
private static final class MySync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 8055037635125704937L;
public MySync(int count) {
setState(count);
}
public int getCount() {
return getState();
}
@Override
protected int tryAcquireShared(int arg) {
return (getState() == 0) ? 1 : -1;
}
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
int c = getState();
if (c == 0) {
return false;
}
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
}
private final MySync mySync;
public MyCountDownLatch(int count) {
this.mySync = new MySync(count);
}
public void await() {
mySync.acquireShared(1);
}
public void countDown() {
mySync.releaseShared(1);
}
public int getCount() {
return mySync.getCount();
}
}
自定义Semaphore(共享锁 非公平锁)
public class MySemaphore {
static abstract class MySync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 8055037635125704937L;
public MySync(int count) {
setState(count);
}
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
int oldState = getState();
int newState = oldState + arg;
if (compareAndSetState(oldState, newState)) {
return true;
}
}
}
}
static final class NonFairSync extends MySync {
private static final long serialVersionUID = -2352767803293687627L;
public NonFairSync(int count) {
super(count);
}
@Override
protected int tryAcquireShared(int arg) {
for (; ; ) {
int oldState = getState();
int newState = oldState - arg;
if (newState < 0 || compareAndSetState(oldState, newState)) {
return newState;
}
}
}
}
private final MySync mySync;
public MySemaphore(int count) {
this.mySync = new NonFairSync(count);
}
public void acquire() {
mySync.acquireShared(1);
}
public void release() {
mySync.releaseShared(1);
}
}
自定义ReentrantLock(独占锁)
public class MyReentrantLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6712919921457609668L;
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new UnsupportedOperationException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
三元共享同步工具类
public class TrinityLock implements Lock {
private final Sync sync = new Sync(3);
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5919240659191346108L;
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
@Override
public int tryAcquireShared(int reduceCount) {
for (; ; ) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
public boolean tryReleaseShared(int returnCount) {
for (; ; ) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
final ConditionObject newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
|