一、 故事起源
很久很久以前,大概也就是昨天,肠胃作祟,急需排泄,提裤奔跑,进入厕所,
左右扫视,没有坑位,灵光一闪,换了楼层,找到坑位, 拉完粑粑接着回去写bug。
二、不安现状
身为程序员的我,想着改变世界,但遗憾暂未实现。改变一个厕所总可以吧,我不配吗?
所以我打算写一个程序来模拟厕所剩余坑位(当然了,业务上可能不会这样写,我之所以写这种代码是为了学习Semaphore,你应该懂我的良苦用心,还有就是不要给我找多线程的问题,这个示例禁不起大佬review,因为我过得浑浑噩噩。)
1. 代码先上
wc.java
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class WC {
private Semaphore s = new Semaphore(2);
public int surplus(){
return s.availablePermits();
}
public void enter(String name){
boolean b = s.tryAcquire();
if (b) {
System.out.println(name+"来了,有坑位,开始占用");
sleepRandom();
s.release();
System.out.println("爽呆呆,"+name+"释放坑位");
} else {
System.out.println(name+"来了,木有空位,伤心离去");
}
}
private void sleepRandom(){
try {
int random=(int)(Math.random()*10+1);
Thread.sleep(TimeUnit.SECONDS.toMillis(random));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Test.java
public class Test {
public static void main(String[] args) {
WC wc = new WC();
new Thread(()-> wc.enter("小强")).start();
new Thread(()-> wc.enter("小月月")).start();
new Thread(()-> wc.enter("小月鸟")).start();
new Thread(()->{
int sp = wc.surplus();
if (sp > 0) {
wc.enter("小明");
} else {
System.out.println("小明知道没坑,先不去");
}
}).start();
}
}
输出结果:
2. 理论跟上
咦? 好神奇,一个Semaphore就能实现。对,就是这么牛.
这时候就有人问了,他是怎么实现这么高级的功能的,少年你是问出这个问题的时候我就知道你没有看我前边的文章(这时候可以关注:木子的昼夜编程 发现更多精彩)。
他之所以这么厉害,是因为他有一个好友名字叫做:AbstractQueuedSynchronizer 用我蹩脚的英语翻译:抽象队列同步器,不重要,每个人都有小名,你可以叫他小名:AQS 。
AOS人品极好,他有很多朋友,有你熟悉的ReentrantLock、CountDownLatch 等
这里再简单介绍一下我们的兄弟AQS吧。 他的主要构成有两个:state , queue
state: 记录可用凭证数(剩余坑位)
queue: 记录等待获取凭证的线程(厕所外排队拉粑粑的人)
获取坑的流程是这样的:
看着是不是很简单,判断是否有空位,有就进入,没有就排队,等待有空位再进入。
越简单的事情其实越复杂,比如为什么 1+1 = 2 。
3. 聊聊我们的 Semaphore方法
import java.util.concurrent.Semaphore;
public class TestMethod {
public static void main(String[] args) throws InterruptedException {
Semaphore s = new Semaphore(2);
s.acquire();
s.acquire(2);
s.acquireUninterruptibly();
s.acquireUninterruptibly(2);
boolean b = s.tryAcquire();
boolean nb = s.tryAcquire(10);
s.availablePermits();
s.release();
s.release(2);
int count = s.drainPermits();
boolean fair = s.isFair();
int queueLength = s.getQueueLength();
boolean b1 = s.hasQueuedThreads();
}
}
4. AQS 对朋友的标准是什么
AQS说了,我有我的原则,你们只要遵循我的原则,我们就能成为好朋友。
原则是什么呢?AQS定义了很多方法,但是都是空实现,他的朋友们需要实现这些方法。
这不就是“模板方法”模式。
包括但不限于以下方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
我们拿tryAcquire 举例调用链是这样的:
AQS定义了流程,朋友在遵循流程的基础上进行自我发挥就好了。
我们来看一下代码简单流程:
5.Semaphore的属性sync
sync是Semaphore属性,他可能是FairSync 也可能是NonfairSync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current)
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
什么是公平,什么是非公平:
我先讲个例子看大家能不能理解了。
一堆人在厕所门口排队,小强也来排队了,一看好多人排队,就乖乖的站在最后,等前边人蹲完坑了自己再去蹲,过了会儿小月鸟来了,一看厕所门口没有人排队,他就直接进去看看有没有坑,有的话就蹲,没有就算了,过了会儿小月月来了,他直接去厕所看有没有可用坑位,有的话他就占用,没有他再去后边排队。
请问他们三个人谁是公平,谁是不公平。
答案:小强、小月鸟是公平的,小月月是不公平的 。
怎么区分出来的,就是他们来了有没有先看看是否有排队,如果有排队就站后边,没有人排队才能直接进去找坑位。
6. 排队是什么操作:AQS的方法之addWaiter
一直在说排队排队,是怎么排的呢,直接上代码
final Node node = addWaiter(Node.SHARED);
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
7. 队列中的我们是怎么知道前边没人了呢
现实中我们可能会自己盯着,前边没人了我们自己就知道了,还有一种情况不知道大家遇到过没?
在你上学的时候,每次考完试,班主任会找那些成绩波动大的人谈话,老师谈完一个后,就会让这个人通知下一个人出去跟老师谈话,你在那惶恐不安,等着被叫。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
三、还有谁
大概用到的几个点都写了写,要再细的我也没有很了解了。
更多精彩关注:木子的昼夜编程
|