在开始解读 AQS 的共享功能前,我们再重温一下 CountDownLatch,CountDownLatch 为 java.util.concurrent 包下的计数器工具类,常被用在多线程环境下,它在初始时需要指定一个计数器的大小,然后可被多个线程并发的实现减 1 操作,并在计数器为 0 后调用 await 方法的线程被唤醒,从而实现多线程间的协作。
1、闭锁使用
class Driver2 {
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i <N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
可以看到 CountDownLatch 的作用类似于一个“栏栅”,在 CountDownLatch 的计数为 0 前,调用 await 方法的线程将一直阻塞,直到 CountDownLatch 计数为 0,await 方法才会返回,而 CountDownLatch 的 countDown()方法则一般由各个线程调用,实现 CountDownLatch 计数的减 1。
首先,看下 CountDownLatch 的构造方法:
public CountDownLatch(int count) {
if (count <0) throw new IllegalArgumentException("count <0");
this.sync = new Sync(count);
}
和 ReentrantLock 类似,CountDownLatch 内部也有一个叫做 Sync 的内部类,同样也是用它继承了 AQS,子类需要实现 AQS 的 5 个保护方法。
2、state 状态位
对于共享锁,需要实现 tryAcquireShared,tryReleaseShared 这 2 个方法。
setState 方法设定的 state 是 AQS 的一个“状态位”,在不同的场景下,代表不同的含义,比如在 ReentrantLock 中,表示加锁的次数,在 CountDownLatch 中,则表示 CountDownLatch 的计数器的初始大小,任务分为 N 个子线程去执行,同步状态 state 也初始化为 N。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
设置完计数器大小后 CountDownLatch 的构造方法返回,下面我们再看下 CountDownLatch 的 await()方法。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
调用了 Sync 的 acquireSharedInterruptibly 方法,因为 Sync 是 AQS 子类的原因,这里其实是直接调用了 AQS 的 acquireSharedInterruptibly 方法。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) <0)
doAcquireSharedInterruptibly(arg);
}
这个方法的调用是响应线程的打断的,所以在前两行会检查下线程是否被打断。接着,尝试着获取共享锁,小于 0,表示获取失败,AQS 在获取锁的思路是,先尝试直接获取锁,如果失败会将当前线程放在队列中,按照 FIFO 的原则等待锁。而对于共享锁也是这个思路,如果和独占锁一致,这里的 tryAcquireShared 应该是个空方法,留给子类去判断。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
如果 state 变成 0 了,则返回 1,表示获取成功,否则返回-1 则表示获取失败。
3、获取锁
看到这里,读者可能会发现,await 方法的获取方式更像是在获取一个独占锁,那为什么这里还会用 tryAcquireShared 呢?
回想下 CountDownLatch 的 await 方法是不是只能在主线程中调用?答案是否定的,CountDownLatch 的 await 方法可以在多个线程中调用,当 CountDownLatch 的计数器为 0 后,调用 await 的方法都会依次返回。 也就是说可以多个线程同时在等待 await 方法返回,所以它被设计成了实现 tryAcquireShared 方法,获取的是一个共享锁,锁在所有调用 await 方法的线程间共享,所以叫共享锁。
如果获取共享锁失败(返回了-1,说明 state 不为 0,也就是 CountDownLatch 的计数器还不为 0),进入调用 doAcquireSharedInterruptibly 方法中,按照我们上述的猜想,应该是要将当前线程放入到队列中去。
在这之前,我们再回顾一下 AQS 队列的数据结构:AQS 是一个双向链表,通过节点中的 next,pre 变量分别指向当前节点后一个节点和前一个节点。其中,每个节点中都包含了一个线程和一个类型变量:表示当前节点是独占节点还是共享节点,头节点中的线程为正在占有锁的线程,而后的所有节点的线程表示为正在等待获取锁的线程。
黄色节点为头节点,表示正在获取锁的节点,剩下的蓝色节点(Node1、Node2、Node3)为正在等待锁的节点,他们通过各自的 next、pre 变量分别指向前后节点,形成了 AQS 中的双向链表。每个线程被加上类型(共享还是独占)后便是一个 Node, 也就是本文中说的节点。
回到 acquireSharedInterruptibly 方法:
/**
* 在中断模式下获取共享锁
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
/* 类型为Node.SHARED,标示为共享节点。*/
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
/* 头节点获取共享锁 */
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
/* 阻塞并判断是否打断,其实这个判断才是自旋锁真正的猥琐点,
* 意思是如果你的前继节点不是head,
* 而且当你的前继节点状态是Node.SIGNAL时,
* 你这个线程将被park(),
* 直到另外的线程release时,发现head.next是你这个node时,才unpark,
* 才能继续循环并获取锁
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
使用了 CAS 更换了头节点,然后,将当前节点的下一个节点取出来,如果同样是“shared”类型的,再做一个”releaseShared”操作
/**
* 设置队列head节点,检查后继节点是否在共享模式下等待,
* 如果propagate > 0 或者 节点PROPAGATE状态被设置,状态传播,
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录老的头节点
setHead(node);
/*
* 如果传播propagate被调用者caller标示,或者被前一次操作记录
* 并且下一个节点在共享模式等待,或者为null,
* 尝试信号通知队列下一个节点
*/
if (propagate > 0 || h == null || h.waitStatus <0 ||
(h = head) == null || h.waitStatus <0) {
Node s = node.next;
if (s == null || s.isShared())
/* 共享模式下的释放动作,信号通知后继节点,保证状态传递 */
doReleaseShared();
}
}
4、释放锁
看完 await 方法,我们再来看下 countDown()方法:
public void countDown() {
sync.releaseShared(1);
}
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link =tryReleaseShared} returns true.
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
/**
* 共享模式下的释放动作,信号通知后继节点,保证状态传递
*/
private void doReleaseShared() {
/*
* 确保释放状态的传播,即使有其他在进行中的acquires/releases操作的情况下。
* 如果节点需要等待信号,用常用的方式,
* 尝试unparkSuccessor将head节点的后继unpark
* 否则状态被设置成PROPAGATE,来保证在释放的时候,传播能够继续。
* 另外,当执行这个操作的时候,必须循环,防止新的节点被增加,
* 此外,不像其他使用unparkSuccessor,我们需要知道CAS是否重置状态失败,
* 如果失败重新检查。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
/* 如果当前节点是SIGNAL意味着,它正在等待一个信号,
* 或者说,它在等待被唤醒,因此做两件事,
* 1是重置waitStatus标志位,2是重置成功后,唤醒下一个节点。
*/
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
/* 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,
* 将其设置为“传播”状态。
* 意味着需要将状态向后一个节点传播。
*/
continue; // loop on failed CAS }
if (h == head) // loop if head changed
break;
}
}
闭锁 CountdownLatch 总结
- 与 AQS 的独占功能一样,共享锁是否可以被获取的判断为空方法,交由子类去实现。
- 与 AQS 的独占功能不同,当共享锁被头节点获取后,独占功能是只有头节点获取锁,其余节点的线程继续沉睡,等待锁被释放后,才会唤醒下一个节点的线程,而共享功能是只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒 AQS 队列中的下一个节点的线程,每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。