# 前言
AQS 是一个同步框架的抽象类,实现了锁相关的操作和一个先进先出的阻塞队列,可以通过集成它来实现同步锁操作。它提供的主要方法有 acquire 和 release,获取和释放锁。
# 原理概览
AQS 获取锁 的核心思想是,如果当前没有其他线程竞争锁,就通过 CAS 获取锁,如果有其他线程竞争,就进入等待队列,等待队列是一个基于链表的双向队列。进入等待队列之后,找到有效的前驱节点,等前驱节点释放锁后,唤醒它自己。在释放锁时,唤醒 head 后面的节点。
# 等待队列
等待队列是一个双向链表,头节点已经持有锁,所以其实不属于等待队列中的元素,等待队列是从第二个元素开始的。
# 源码分析
# Node
# 常量
Node 节点是等待队列的节点,分为共享模式和独占模式。节点状态初始为 0,被取消等待是 1,释放锁或者取消等待后需要唤醒后继节点是 -1,在条件队列中是 -2,共享模式下是 -3。
| |
| static final Node SHARED = new Node(); |
| |
| static final Node EXCLUSIVE = null; |
| |
| static final int CANCELLED = 1; |
| |
| static final int SIGNAL = -1; |
| |
| static final int CONDITION = -2; |
| |
| static final int PROPAGATE = -3; |
# 变量
节点需要一个状态标识,以及前驱节点和后继节点。
| |
| volatile int waitStatus; |
| |
| volatile Node prev; |
| |
| volatile Node next; |
| |
| volatile Thread thread; |
| |
| Node nextWaiter; |
# 构造函数
| |
| * 给 head 或者 SHARED 使用的 |
| */ |
| Node() {} |
| |
| |
| * 共享模式下,nextWaiter 始终是 SHARED,即一个固定的 Node 对象 |
| * 独占模式下,nextWaiter 初始是 EXCLUSIVE,即 null |
| */ |
| Node(Node nextWaiter) { |
| this.nextWaiter = nextWaiter; |
| THREAD.set(this, Thread.currentThread()); |
| } |
| |
| |
| * 添加下一个条件等待者使用 |
| */ |
| Node(int waitStatus) { |
| WAITSTATUS.set(this, waitStatus); |
| THREAD.set(this, Thread.currentThread()); |
| } |
# 关键变量
| |
| private transient volatile Node head; |
| |
| private transient volatile Node tail; |
| |
| private volatile int state; |
# 主要方法
# 获取独占锁
# acquire
| |
| * 该方法使获取独占锁 |
| */ |
| public final void acquire(int arg) { |
| |
| |
| |
| |
| if (!tryAcquire(arg) && |
| acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) |
| selfInterrupt(); |
| } |
# addWaiter
| private Node addWaiter(Node mode) { |
| |
| Node node = new Node(mode); |
| |
| |
| for (;;) { |
| Node oldTail = tail; |
| |
| if (oldTail != null) { |
| node.setPrevRelaxed(oldTail); |
| if (compareAndSetTail(oldTail, node)) { |
| oldTail.next = node; |
| return node; |
| } |
| } else { |
| |
| initializeSyncQueue(); |
| } |
| } |
| } |
# initializeSyncQueue
| private final void initializeSyncQueue() { |
| Node h; |
| |
| if (HEAD.compareAndSet(this, null, (h = new Node()))) |
| tail = h; |
| } |
# acquireQueued
| final boolean acquireQueued(final Node node, int arg) { |
| boolean interrupted = false; |
| try { |
| |
| for (;;) { |
| |
| final Node p = node.predecessor(); |
| |
| if (p == head && tryAcquire(arg)) { |
| |
| setHead(node); |
| |
| p.next = null; |
| |
| return interrupted; |
| } |
| |
| if (shouldParkAfterFailedAcquire(p, node)) |
| interrupted |= parkAndCheckInterrupt(); |
| } |
| } catch (Throwable t) { |
| |
| cancelAcquire(node); |
| if (interrupted) |
| selfInterrupt(); |
| throw t; |
| } |
| } |
# shouldParkAfterFailedAcquire
| private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
| |
| int ws = pred.waitStatus; |
| |
| if (ws == Node.SIGNAL) |
| return true; |
| |
| if (ws > 0) { |
| do { |
| node.prev = pred = pred.prev; |
| } while (pred.waitStatus > 0); |
| pred.next = node; |
| } else { |
| |
| pred.compareAndSetWaitStatus(ws, Node.SIGNAL); |
| } |
| |
| return false; |
| } |
# parkAndCheckInterrupt
| private final boolean parkAndCheckInterrupt() { |
| |
| LockSupport.park(this); |
| |
| return Thread.interrupted(); |
| } |
# cancelAcquire
| private void cancelAcquire(Node node) { |
| |
| if (node == null) |
| return; |
| |
| |
| node.thread = null; |
| |
| |
| Node pred = node.prev; |
| |
| while (pred.waitStatus > 0) |
| node.prev = pred = pred.prev; |
| |
| |
| Node predNext = pred.next; |
| |
| |
| node.waitStatus = Node.CANCELLED; |
| |
| |
| if (node == tail && compareAndSetTail(node, pred)) { |
| pred.compareAndSetNext(predNext, null); |
| } else { |
| |
| int ws; |
| |
| if (pred != head && |
| |
| ((ws = pred.waitStatus) == Node.SIGNAL || |
| |
| (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && |
| |
| pred.thread != null) { |
| |
| Node next = node.next; |
| |
| if (next != null && next.waitStatus <= 0) |
| pred.compareAndSetNext(predNext, next); |
| } else { |
| |
| unparkSuccessor(node); |
| } |
| |
| |
| node.next = node; |
| } |
| } |
# unparkSuccessor
| private void unparkSuccessor(Node node) { |
| |
| int ws = node.waitStatus; |
| |
| if (ws < 0) |
| node.compareAndSetWaitStatus(ws, 0); |
| |
| |
| Node s = node.next; |
| |
| if (s == null || s.waitStatus > 0) { |
| s = null; |
| |
| for (Node p = tail; p != node && p != null; p = p.prev) |
| if (p.waitStatus <= 0) |
| s = p; |
| } |
| |
| if (s != null) |
| LockSupport.unpark(s.thread); |
| } |
# 释放独占锁
# release
| public final boolean release(int arg) { |
| |
| if (tryRelease(arg)) { |
| Node h = head; |
| |
| if (h != null && h.waitStatus != 0) |
| unparkSuccessor(h); |
| return true; |
| } |
| return false; |
| } |
# 获取共享锁
# acquireShared
| public final void acquireShared(int arg) { |
| |
| if (tryAcquireShared(arg) < 0) |
| |
| doAcquireShared(arg); |
| } |
# doAcquireShared
| private void doAcquireShared(int arg) { |
| |
| final Node node = addWaiter(Node.SHARED); |
| boolean interrupted = false; |
| try { |
| |
| for (;;) { |
| |
| final Node p = node.predecessor(); |
| |
| if (p == head) { |
| |
| int r = tryAcquireShared(arg); |
| |
| if (r >= 0) { |
| setHeadAndPropagate(node, r); |
| |
| p.next = null; |
| return; |
| } |
| } |
| |
| if (shouldParkAfterFailedAcquire(p, node)) |
| |
| interrupted |= parkAndCheckInterrupt(); |
| } |
| } catch (Throwable t) { |
| |
| cancelAcquire(node); |
| throw t; |
| } finally { |
| |
| if (interrupted) |
| selfInterrupt(); |
| } |
| } |
# setHeadAndPropagate
| private void setHeadAndPropagate(Node node, int propagate) { |
| Node h = head; |
| |
| setHead(node); |
| |
| if (propagate > 0 || h == null || h.waitStatus < 0 || |
| (h = head) == null || h.waitStatus < 0) { |
| Node s = node.next; |
| |
| if (s == null || s.isShared()) |
| doReleaseShared(); |
| } |
| } |
# doReleaseShared
| private void doReleaseShared() { |
| |
| for (;;) { |
| Node h = head; |
| |
| if (h != null && h != tail) { |
| int ws = h.waitStatus; |
| |
| if (ws == Node.SIGNAL) { |
| |
| if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) |
| continue; |
| |
| unparkSuccessor(h); |
| } |
| |
| else if (ws == 0 && |
| |
| !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) |
| continue; |
| } |
| |
| |
| if (h == head) |
| break; |
| } |
| } |
# 释放共享锁
# releaseShared
| public final boolean releaseShared(int arg) { |
| |
| if (tryReleaseShared(arg)) { |
| |
| doReleaseShared(); |
| return true; |
| } |
| return false; |
| } |
# 总结
AQS 提供了获取锁和释放锁的基础方法,子类只需要实现 tryAcquire、tryRelease、tryAcquireShared 和 tryReleaseShared 方法,就能实现锁的相关操作,也就是说,AQS 屏蔽了底层实现,上层业务实现留给子类。