Java锁(四)ConditionObject分析

在讲 ConditionObject 之前,先讲解下条件队列。条件队列能够使得一组线程能够通过某种方式来等待特定的条件变成真,条件队列中的成员是一个个正在等待状态的线程。条件队列提供了一种挂起方式,当现场等待的条件非真时,挂起自己并释放锁,一旦等待条件为真,则立即醒来。

条件队列主要功能

1、隐式锁对应的条件队列

对象的内置锁(synchronized 语义对应的同步机制),关联着一个内置的条件队列。Object 的 wait/notify/notifyAll 等方法构成了内部条件队列的 API(即将内部锁与内部条件队列关联的机制)。 内部条件队列是需要内置锁保护的,需要调用对象 X 中的条件队列,必须持有对象 X 上的锁。这是因为状态处于并发环境下,“等待依赖状态的某个条件”与“维护状态的一致性”是绑定在一起的。

2、显式锁对应的条件队列

与内置锁对应的是显式锁,显式锁关联的条件队列是显式条件队列。内置锁的局限是每个内置锁只能关联一个条件队列,当线程需要等待多个条件时,则需要同时获取多个内置锁。 显式锁可以与多个条件队列关联,Condition 是显式锁的条件队列,它是 Object 的 wait/notify/notifyAll 等方法的扩展。提供了在一个对象上设置多个等待集合的功能,即一个对象上设置多个等待条件。

3、condition 使用

Condition 也称为条件队列,与内置锁关联的条件队列类似,它是一种广义的内置条件队列。它提供给线程一种方式使得该线程在调用 wait 方法后执行挂起操作,直到线程的等待的某个条件为真时被唤醒。 条件队列必须跟锁一起使用的,因为对共享状态变量的访问发生在多线程环境下,原理与内部条件队列一样。一个 Condition 的实例必须跟一个 Lock 绑定, Condition 一般作为 Lock 的内部类现。

class BoundedBuffer {
  final Lock lock = new ReentrantLock();
  final Condition notFull  = lock.newCondition();
  final Condition notEmpty = lock.newCondition();

  final Object[] items = new Object[100];
  int putptr, takeptr, count;

  public void put(Object x) throws InterruptedException {
    lock.lock();
    try {
      while (count == items.length)
        notFull.await();
      items[putptr] = x;
      if (++putptr == items.length) putptr = 0;
      ++count;
      notEmpty.signal();
    } finally {
      lock.unlock();
    }
  }

  public Object take() throws InterruptedException {
    lock.lock();
    try {
      while (count == 0)
        notEmpty.await();
      Object x = items[takeptr];
      if (++takeptr == items.length) takeptr = 0;
      --count;
      notFull.signal();
      return x;
    } finally {
      lock.unlock();
    }
  }

}

4、AQS 对条件队列类图

image.png

5、条件队列的节点状态

调用条件队列的等待操作,会设置节点的 waitingStatus 为 Condition,标识当前节点正处于条件队列中。状态转换图如下:
image.png
Node 的各个状态的主要作用,Cancelled 主要是解决线程在持有锁时被外部中断的逻辑,AQS 的可中断锁获取方法 lockInterrutible()是基于该状态实现的,显示锁必须手动释放锁,尤其是有中断的环境中,一个线程被中断可能仍然持有锁,所以必须注意在 finally 中 unlock。Condition 则是条件队列的等待操作,是 Lock 与条件队列关联的基础。Signal 是阻塞后继线程的标识。

6、条件队列的等待和唤醒操作

条件队列上的等待和唤醒操作,本质上是节点在 AQS 线程等待队列和条件队列之间相互转移的过程,当需要等待某个条件时,线程会将当前节点添加到条件队列中,并释放锁;当某个线程执行条件队列的唤醒操作,则会将条件队列的节点转移到 AQS 等待队列。每个 Condition 就是一个条件队列,可以通过 Lock 的 newCondition 创建多个等待条件。操作流程如下:
条件队列等待 await 如下所示

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程所在的节点添加到条件等待队列里面
    Node node = addConditionWaiter();
    // 释放并唤醒后续节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // node不在同步队列中,挂起当前线程,
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 检查在等待时是否中断,如果中断,返回中断模式
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 当前节点进入到队列中,并自旋
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 当前节点的下一个等待者,不为空,移除所有取消的
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

/**
 * 添加一个新的等待者到条件等待队列里面
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果lastWaiter不为状态节点,移除
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 将当前线程关联的node添加到条件队列中
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

/**
 * 使用当前的state值调用release方法,返回保存的state值
 * 失败时抛出异常,并将节点设置成在Node.CANCELLED,
 * @param node 等待的状态节点
 * @return 返回上一次同步队列的state状态
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取当前AQS的状态值state
        int savedState = getState();
        // 释放,并唤醒后继节点
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 释放失败,抛出异常,并且在finally中将节点设置成在Node.CANCELLED
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//unblock,唤醒head的后继节点
        return true;
    }
    return false;
}

await 等待的流程如下图所示
image.png

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link =isHeldExclusively}
 *         returns {@code false}
 */
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/**
 * Moves all threads from the wait queue for this condition to
 * the wait queue for the owning lock.
 *
 * @throws IllegalMonitorStateException if {@link =isHeldExclusively}
 *         returns {@code false}
 */
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

/**
 * Removes and transfers all nodes.
 * @param first (non-null) the first node on condition queue
 */
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

/**
 * 将节点从条件队列转到同步队列中
 * 成功,返回true
 * 否则节点被取消,在收到信号前
 */
final boolean transferForSignal(Node node) {
    /*
     * 节点被取消,不能改变waitStatus
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * 将节点粘到队列中,尝试设置前驱节点的waitStatus,表明线程可能正在等待
     * 如果取消或者尝试设置waitStatus失败,唤醒重新同步
     * 在这种情况下waitStatus会出错,但是是瞬时无害的
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal 通知条件等待
image.png
显式条件队列弥补内置条件队列只能关联一个条件的缺陷,同时继承了 Lock 对象的公平性。在 Condition 对象中,与 Object 的 wait/notify/notifyAll 对应的扩展方法是 await/signal/signallAll,同时也具有 Object 的这三个方法,所以使用的时候需要注意使用版本的正确。另外,显式锁必须遵从特定的使用规范,先 lock 在 finally 中 unlock,以确保锁必然会被正确释放。
此外,AQS 的两个队列都是链表队列,关联类的方法的都相当简洁,尤其是节点移除队列操作过程中,都及时释放了所占内存。读源码,可以学习到一种编码的严谨性,锻炼自己关注 GC 的意识。这是我见到过的第三处及时释放 GC 的的代码了。从最初的 ArrayList 的元素 remove 中,然后是 HashMap 的动态扩容数组转移操作,最近看 AQS 的元素唤醒和锁释放操作。关注 GC 的确是最近开始形成的一种编程意识。

https://alicharles.oss-cn-hangzhou.aliyuncs.com/static/images/mp_qrcode.jpg
文章目录
  1. 条件队列主要功能
    1. 1、隐式锁对应的条件队列
    2. 2、显式锁对应的条件队列
    3. 3、condition 使用
    4. 4、AQS 对条件队列类图
    5. 5、条件队列的节点状态
    6. 6、条件队列的等待和唤醒操作