Java锁(三)CountDownLatch共享锁分析

在开始解读 AQS 的共享功能前,我们再重温一下 CountDownLatch,CountDownLatch 为 java.util.concurrent 包下的计数器工具类,常被用在多线程环境下,它在初始时需要指定一个计数器的大小,然后可被多个线程并发的实现减 1 操作,并在计数器为 0 后调用 await 方法的线程被唤醒,从而实现多线程间的协作。

1、闭锁使用

class Driver2 {
    void main() throws InterruptedException {
        CountDownLatch doneSignal = new CountDownLatch(N);
        Executor e = ...

        for (int i = 0; i <N; ++i) // create and start threads
        e.execute(new WorkerRunnable(doneSignal, i));
        doneSignal.await(); // wait for all to finish
    }
}

class WorkerRunnable implements Runnable {
  private final CountDownLatch doneSignal;
  private final int i;
  WorkerRunnable(CountDownLatch doneSignal, int i) {
    this.doneSignal = doneSignal;
    this.i = i;
  }
  public void run() {
    try {
      doWork(i);
      doneSignal.countDown();
    } catch (InterruptedException ex) {} // return;
  }

  void doWork() { ... }
}

可以看到 CountDownLatch 的作用类似于一个“栏栅”,在 CountDownLatch 的计数为 0 前,调用 await 方法的线程将一直阻塞,直到 CountDownLatch 计数为 0,await 方法才会返回,而 CountDownLatch 的 countDown()方法则一般由各个线程调用,实现 CountDownLatch 计数的减 1。
首先,看下 CountDownLatch 的构造方法:

public CountDownLatch(int count) {
    if (count <0) throw new IllegalArgumentException("count <0");
    this.sync = new Sync(count);
}

和 ReentrantLock 类似,CountDownLatch 内部也有一个叫做 Sync 的内部类,同样也是用它继承了 AQS,子类需要实现 AQS 的 5 个保护方法。

2、state 状态位

对于共享锁,需要实现 tryAcquireShared,tryReleaseShared 这 2 个方法。
image.png
setState 方法设定的 state 是 AQS 的一个“状态位”,在不同的场景下,代表不同的含义,比如在 ReentrantLock 中,表示加锁的次数,在 CountDownLatch 中,则表示 CountDownLatch 的计数器的初始大小,任务分为 N 个子线程去执行,同步状态 state 也初始化为 N。
image.png

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

设置完计数器大小后 CountDownLatch 的构造方法返回,下面我们再看下 CountDownLatch 的 await()方法。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

调用了 Sync 的 acquireSharedInterruptibly 方法,因为 Sync 是 AQS 子类的原因,这里其实是直接调用了 AQS 的 acquireSharedInterruptibly 方法。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) <0)
        doAcquireSharedInterruptibly(arg);
}

这个方法的调用是响应线程的打断的,所以在前两行会检查下线程是否被打断。接着,尝试着获取共享锁,小于 0,表示获取失败,AQS 在获取锁的思路是,先尝试直接获取锁,如果失败会将当前线程放在队列中,按照 FIFO 的原则等待锁。而对于共享锁也是这个思路,如果和独占锁一致,这里的 tryAcquireShared 应该是个空方法,留给子类去判断。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

如果 state 变成 0 了,则返回 1,表示获取成功,否则返回-1 则表示获取失败。

3、获取锁

看到这里,读者可能会发现,await 方法的获取方式更像是在获取一个独占锁,那为什么这里还会用 tryAcquireShared 呢?
回想下 CountDownLatch 的 await 方法是不是只能在主线程中调用?答案是否定的,CountDownLatch 的 await 方法可以在多个线程中调用,当 CountDownLatch 的计数器为 0 后,调用 await 的方法都会依次返回。 也就是说可以多个线程同时在等待 await 方法返回,所以它被设计成了实现 tryAcquireShared 方法,获取的是一个共享锁,锁在所有调用 await 方法的线程间共享,所以叫共享锁。
如果获取共享锁失败(返回了-1,说明 state 不为 0,也就是 CountDownLatch 的计数器还不为 0),进入调用 doAcquireSharedInterruptibly 方法中,按照我们上述的猜想,应该是要将当前线程放入到队列中去。
在这之前,我们再回顾一下 AQS 队列的数据结构:AQS 是一个双向链表,通过节点中的 next,pre 变量分别指向当前节点后一个节点和前一个节点。其中,每个节点中都包含了一个线程和一个类型变量:表示当前节点是独占节点还是共享节点,头节点中的线程为正在占有锁的线程,而后的所有节点的线程表示为正在等待获取锁的线程。
黄色节点为头节点,表示正在获取锁的节点,剩下的蓝色节点(Node1、Node2、Node3)为正在等待锁的节点,他们通过各自的 next、pre 变量分别指向前后节点,形成了 AQS 中的双向链表。每个线程被加上类型(共享还是独占)后便是一个 Node, 也就是本文中说的节点。
回到 acquireSharedInterruptibly 方法:

/**
 * 在中断模式下获取共享锁
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    /* 类型为Node.SHARED,标示为共享节点。*/
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                /* 头节点获取共享锁 */
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            /* 阻塞并判断是否打断,其实这个判断才是自旋锁真正的猥琐点,
             * 意思是如果你的前继节点不是head,
             * 而且当你的前继节点状态是Node.SIGNAL时,
             * 你这个线程将被park(),
             * 直到另外的线程release时,发现head.next是你这个node时,才unpark,
             * 才能继续循环并获取锁
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

使用了 CAS 更换了头节点,然后,将当前节点的下一个节点取出来,如果同样是“shared”类型的,再做一个”releaseShared”操作

/**
 * 设置队列head节点,检查后继节点是否在共享模式下等待,
 * 如果propagate > 0 或者 节点PROPAGATE状态被设置,状态传播,
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 记录老的头节点
    setHead(node);
    /*
     * 如果传播propagate被调用者caller标示,或者被前一次操作记录
     * 并且下一个节点在共享模式等待,或者为null,
     * 尝试信号通知队列下一个节点
     */
    if (propagate > 0 || h == null || h.waitStatus <0 ||
        (h = head) == null || h.waitStatus <0) {
        Node s = node.next;
        if (s == null || s.isShared())
            /* 共享模式下的释放动作,信号通知后继节点,保证状态传递 */
            doReleaseShared();
    }
}

image.png

4、释放锁

看完 await 方法,我们再来看下 countDown()方法:

public void countDown() {
    sync.releaseShared(1);
}

/**
 * Releases in shared mode. Implemented by unblocking one or more
 * threads if {@link =tryReleaseShared} returns true.
*/
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

/**
 * 共享模式下的释放动作,信号通知后继节点,保证状态传递
 */
private void doReleaseShared() {
    /*
     * 确保释放状态的传播,即使有其他在进行中的acquires/releases操作的情况下。
     * 如果节点需要等待信号,用常用的方式,
     * 尝试unparkSuccessor将head节点的后继unpark
     * 否则状态被设置成PROPAGATE,来保证在释放的时候,传播能够继续。
     * 另外,当执行这个操作的时候,必须循环,防止新的节点被增加,
     * 此外,不像其他使用unparkSuccessor,我们需要知道CAS是否重置状态失败,
     * 如果失败重新检查。
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                /* 如果当前节点是SIGNAL意味着,它正在等待一个信号,
                 * 或者说,它在等待被唤醒,因此做两件事,
                 * 1是重置waitStatus标志位,2是重置成功后,唤醒下一个节点。
                 */
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                /* 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,
                 * 将其设置为“传播”状态。
                 * 意味着需要将状态向后一个节点传播。
                 */
                continue;                // loop on failed CAS }
        if (h == head)                   // loop if head changed
            break;
    }
}

image.png
闭锁 CountdownLatch 总结

  1. 与 AQS 的独占功能一样,共享锁是否可以被获取的判断为空方法,交由子类去实现。
  2. 与 AQS 的独占功能不同,当共享锁被头节点获取后,独占功能是只有头节点获取锁,其余节点的线程继续沉睡,等待锁被释放后,才会唤醒下一个节点的线程,而共享功能是只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒 AQS 队列中的下一个节点的线程,每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。
https://alicharles.oss-cn-hangzhou.aliyuncs.com/static/images/mp_qrcode.jpg
文章目录
  1. 1、闭锁使用
  2. 2、state 状态位
  3. 3、获取锁
  4. 4、释放锁