ConditionObject 是 AQS 的内部类,是线程进行等待的条件对象,实现了 Condition 接口。也可以借助 Condition 来进行线程间的通信。
# 使用示例
这里使用 ReentrantLock 来做示例。
public static void main(String[] args) { | |
ReentrantLock lock = new ReentrantLock(); | |
Condition condition = lock.newCondition(); | |
new Thread(() -> { | |
try { | |
lock.lock(); | |
System.out.println(Thread.currentThread().getName() + " get lock."); | |
condition.await(); | |
System.out.println(Thread.currentThread().getName() + " get signal."); | |
} catch (Exception e) { | |
// ... | |
} finally { | |
lock.unlock(); | |
System.out.println(Thread.currentThread().getName() + " release lock."); | |
} | |
}, "thread1").start(); | |
new Thread(() -> { | |
try { | |
lock.lock(); | |
System.out.println(Thread.currentThread().getName() + " get lock."); | |
condition.signal(); | |
System.out.println(Thread.currentThread().getName() + " signal another."); | |
} catch (Exception e) { | |
// ... | |
} finally { | |
lock.unlock(); | |
System.out.println(Thread.currentThread().getName() + " release lock."); | |
} | |
}, "thread2").start(); | |
} |
输出:
thread1 get lock. | |
thread2 get lock. | |
thread2 signal another. | |
thread2 release lock. | |
thread1 get signal. | |
thread1 release lock. |
可以看出,Condition 的 await ()/signal () 和 Object 的 wait ()/notify () 类似,都是先暂时释放锁等待其他线程唤醒,这个时候进入等待状态,等其他线程唤醒他之后,就进入了就绪状态,重新竞争 CPU 和锁。那 Condition 和 Object 有什么区别呢?后面再来分析。
# 源码分析
# Condition
Condition 的主要方法有:
- await () - 等待,响应中断抛出异常
- awaitUninterruptibly () - 等待,不响应中断
- awaitNanos (long nanosTimeout) - 超时等待,响应中断
- await (long time, TimeUnit unit) - 超时等待,响应中断
- awaitUntil (Date deadling) - 等待至某个时间点,响应中断
- signal () - 随机唤醒一个等待的线程
- signalAll () - 唤醒所有等待的线程
# ConditionObject
然后再来看下 ConditionObject 是如何实现的。
ConditionObject 记录了条件队列的头结点和尾结点。这里,我们用同步队列来表示 AQS 中需要排队获取锁的队列,用条件队列来表示需要等待其他线程唤醒的队列。
private transient Node firstWaiter; | |
private transient Node lastWaiter; |
# await
public final void await() throws InterruptedException { | |
// 响应中断,抛出异常 | |
if (Thread.interrupted()) | |
throw new InterruptedException(); | |
// 加入条件队列 | |
Node node = addConditionWaiter(); | |
// 暂时放弃锁,记录之前的锁状态 | |
int savedState = fullyRelease(node); | |
int interruptMode = 0; | |
// 没有在同步队列,就利用 LockSupport 来暂停当前线程 | |
while (!isOnSyncQueue(node)) { | |
LockSupport.park(this); | |
// 0 代表没有中断,如果被中断了,就跳出循环继续执行下面的逻辑 | |
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) | |
break; | |
} | |
// 在同步队列或者被中断或被唤醒,就再次请求锁,这里拿不到锁会一直等待 | |
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) | |
interruptMode = REINTERRUPT; | |
// 清理掉条件队列中的该节点 | |
if (node.nextWaiter != null) | |
unlinkCancelledWaiters(); | |
// 响应中断 | |
if (interruptMode != 0) | |
reportInterruptAfterWait(interruptMode); | |
} |
# signal
public final void signal() { | |
// 检查当前线程是否持有锁 | |
if (!isHeldExclusively()) | |
throw new IllegalMonitorStateException(); | |
// 唤醒第一个等待的线程 | |
Node first = firstWaiter; | |
if (first != null) | |
doSignal(first); | |
} | |
private void doSignal(Node first) { | |
do { | |
// 将条件队列的第一个线程转移到同步队列 | |
if ( (firstWaiter = first.nextWaiter) == null) | |
lastWaiter = null; | |
first.nextWaiter = null; | |
} while (!transferForSignal(first) && | |
(first = firstWaiter) != null); | |
} | |
final boolean transferForSignal(Node node) { | |
// 如果已不在条件队列,不进行转移 | |
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) | |
return false; | |
// 放入到同步队列,并进行唤醒 | |
Node p = enq(node); | |
int ws = p.waitStatus; | |
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) | |
LockSupport.unpark(node.thread); | |
return true; | |
} |
# 总结
在 AQS 整体的基础上,再来总结下 AQS 的流程。
首先是获取锁,如果尝试获取锁失败,会进入同步队列进行等待,直到被中断或被队列中前面的线程唤醒,会再次尝试获取锁,成功之后会响应中断或执行拿到锁之后的逻辑。对于共享锁,获取锁成功之后,还要唤醒队列中其他等待的线程。
释放锁时,会唤醒同步队列中第一个等待的线程。对于共享锁,会唤醒所有等待的线程。
获取到锁的线程,可能需要等待其他线程完成某件事情才能继续,这个时候需要进入条件队列进行等待。其他线程完成这件事情之后,会将他移动到同步队列的尾部并进行唤醒,然后它再次竞争锁。