ConditionObject 是 AQS 的内部类,是线程进行等待的条件对象,实现了 Condition 接口。也可以借助 Condition 来进行线程间的通信。

# 使用示例

这里使用 ReentrantLock 来做示例。

public static void main(String[] args) {
  ReentrantLock lock = new ReentrantLock();
  Condition condition = lock.newCondition();
  new Thread(() -> {
    try {
      lock.lock();
      System.out.println(Thread.currentThread().getName() + " get lock.");
      condition.await();
      System.out.println(Thread.currentThread().getName() + " get signal.");
    } catch (Exception e) {
      // ...
    } finally {
      lock.unlock();
      System.out.println(Thread.currentThread().getName() + " release lock.");
    }
  }, "thread1").start();
  new Thread(() -> {
    try {
      lock.lock();
      System.out.println(Thread.currentThread().getName() + " get lock.");
      condition.signal();
      System.out.println(Thread.currentThread().getName() + " signal another.");
    } catch (Exception e) {
      // ...
    } finally {
      lock.unlock();
      System.out.println(Thread.currentThread().getName() + " release lock.");
    }
  }, "thread2").start();
}

输出:

thread1 get lock.
thread2 get lock.
thread2 signal another.
thread2 release lock.
thread1 get signal.
thread1 release lock.

可以看出,Condition 的 await ()/signal () 和 Object 的 wait ()/notify () 类似,都是先暂时释放锁等待其他线程唤醒,这个时候进入等待状态,等其他线程唤醒他之后,就进入了就绪状态,重新竞争 CPU 和锁。那 Condition 和 Object 有什么区别呢?后面再来分析。

# 源码分析

# Condition

Condition 的主要方法有:

  • await () - 等待,响应中断抛出异常
  • awaitUninterruptibly () - 等待,不响应中断
  • awaitNanos (long nanosTimeout) - 超时等待,响应中断
  • await (long time, TimeUnit unit) - 超时等待,响应中断
  • awaitUntil (Date deadling) - 等待至某个时间点,响应中断
  • signal () - 随机唤醒一个等待的线程
  • signalAll () - 唤醒所有等待的线程

# ConditionObject

然后再来看下 ConditionObject 是如何实现的。

ConditionObject 记录了条件队列的头结点和尾结点。这里,我们用同步队列来表示 AQS 中需要排队获取锁的队列,用条件队列来表示需要等待其他线程唤醒的队列。

private transient Node firstWaiter;
private transient Node lastWaiter;

# await

public final void await() throws InterruptedException {
  // 响应中断,抛出异常
  if (Thread.interrupted())
    throw new InterruptedException();
  // 加入条件队列
  Node node = addConditionWaiter();
  // 暂时放弃锁,记录之前的锁状态
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  // 没有在同步队列,就利用 LockSupport 来暂停当前线程
  while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    // 0 代表没有中断,如果被中断了,就跳出循环继续执行下面的逻辑
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  // 在同步队列或者被中断或被唤醒,就再次请求锁,这里拿不到锁会一直等待
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  // 清理掉条件队列中的该节点
  if (node.nextWaiter != null)
    unlinkCancelledWaiters();
  // 响应中断
  if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
}

# signal

public final void signal() {
  // 检查当前线程是否持有锁
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  // 唤醒第一个等待的线程
  Node first = firstWaiter;
  if (first != null)
    doSignal(first);
}
private void doSignal(Node first) {
  do {
    // 将条件队列的第一个线程转移到同步队列
    if ( (firstWaiter = first.nextWaiter) == null)
      lastWaiter = null;
    first.nextWaiter = null;
  } while (!transferForSignal(first) &&
           (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
  // 如果已不在条件队列,不进行转移
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
  // 放入到同步队列,并进行唤醒
  Node p = enq(node);
  int ws = p.waitStatus;
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
  return true;
}

# 总结

在 AQS 整体的基础上,再来总结下 AQS 的流程。

首先是获取锁,如果尝试获取锁失败,会进入同步队列进行等待,直到被中断或被队列中前面的线程唤醒,会再次尝试获取锁,成功之后会响应中断或执行拿到锁之后的逻辑。对于共享锁,获取锁成功之后,还要唤醒队列中其他等待的线程。

释放锁时,会唤醒同步队列中第一个等待的线程。对于共享锁,会唤醒所有等待的线程。

获取到锁的线程,可能需要等待其他线程完成某件事情才能继续,这个时候需要进入条件队列进行等待。其他线程完成这件事情之后,会将他移动到同步队列的尾部并进行唤醒,然后它再次竞争锁。