# 前言

AQS 是一个同步框架的抽象类,实现了锁相关的操作和一个先进先出的阻塞队列,可以通过集成它来实现同步锁操作。它提供的主要方法有 acquire 和 release,获取和释放锁。

# 原理概览

AQS 获取锁 的核心思想是,如果当前没有其他线程竞争锁,就通过 CAS 获取锁,如果有其他线程竞争,就进入等待队列,等待队列是一个基于链表的双向队列。进入等待队列之后,找到有效的前驱节点,等前驱节点释放锁后,唤醒它自己。在释放锁时,唤醒 head 后面的节点。

# 等待队列

等待队列是一个双向链表,头节点已经持有锁,所以其实不属于等待队列中的元素,等待队列是从第二个元素开始的。

AQS 等待队列

# 源码分析

# Node

# 常量

Node 节点是等待队列的节点,分为共享模式和独占模式。节点状态初始为 0,被取消等待是 1,释放锁或者取消等待后需要唤醒后继节点是 -1,在条件队列中是 -2,共享模式下是 -3。

// 共享模式
static final Node SHARED = new Node();
// 独占模式,只是做个标记
static final Node EXCLUSIVE = null;
// 被取消
static final int CANCELLED =  1;
// 需要唤醒后继节点
static final int SIGNAL    = -1;
// 在条件队列中
static final int CONDITION = -2;
// 共享模式下
static final int PROPAGATE = -3;

# 变量

节点需要一个状态标识,以及前驱节点和后继节点。

// 等待状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 持有当前节点的线程
volatile Thread thread;
// 条件对列中的下一个节点,共享模式下是个特殊值
Node nextWaiter;

# 构造函数

/**
 * 给 head 或者 SHARED 使用的
 */ 
Node() {}
/**
 * 共享模式下,nextWaiter 始终是 SHARED,即一个固定的 Node 对象
 * 独占模式下,nextWaiter 初始是 EXCLUSIVE,即 null
 */
Node(Node nextWaiter) {
    this.nextWaiter = nextWaiter;
    THREAD.set(this, Thread.currentThread());
}
/** 
 * 添加下一个条件等待者使用
 */
Node(int waitStatus) {
    WAITSTATUS.set(this, waitStatus);
    THREAD.set(this, Thread.currentThread());
}

# 关键变量

// 等待对列的头结点,头结点可以拿到锁,它不能被取消
private transient volatile Node head;
// 等待队列的尾结点
private transient volatile Node tail;
// 锁的同步状态,表示锁的重入次数,通过 CAS 进行修改
private volatile int state;

# 主要方法

# 获取独占锁

获取锁流程

# acquire
/**
 * 该方法使获取独占锁
 */
public final void acquire(int arg) {
    // 先尝试 CAS 获取锁,如果获取到了直接拿到锁,如果没拿到,说明存在竞争,则进入等待队列
    //tryAcquire 由子类实现,通过 CAS 尝试获取锁,万一拿到了就不用进入等待队列了
    //addWaiter 会不断自旋,直到进入等待队列
    //acquireQueued 也会不断自旋,直到拿到锁或被中断 
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
# addWaiter
private Node addWaiter(Node mode) {
    // 先初始化一个 Node
    Node node = new Node(mode);
    // 自旋
    for (;;) {
        Node oldTail = tail;
        // 尾结点有值,说明等待队列不为空,通过 CAS 将该节点放在队尾
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            // 尾结点为空,说明等待队列为空,进行初始化
            initializeSyncQueue();
        }
    }
}
# initializeSyncQueue
private final void initializeSyncQueue() {
    Node h;
    // 将头节点和尾结点都设置为一个空节点
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}
# acquireQueued
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        // 自旋
        for (;;) {
            // 前驱节点
            final Node p = node.predecessor();
            // 前驱节点是头结点并且通过 CAS 尝试获取锁成功,这里是 head 节点的线程已经释放掉锁
            if (p == head && tryAcquire(arg)) {
                // 将 node 设置为头结点
                setHead(node);
                // 帮助 GC
                p.next = null; 
                // 返回中断状态,外部会进行中断
                return interrupted;
            }
            // 请求锁失败后是否需要暂停,如果没有进入上面的 if,即没有成为头结点,shouldParkAfterFailedAcquire 最终会返回 true,然后进行并检查阻塞状态
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        // 发生了异常,取消获取锁,如果有中断标志则进行中断
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
# shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前置结点等待状态
    int ws = pred.waitStatus;
    // 如果已经是 SIGNAL 状态,它释放锁之后就会唤醒当前节点,可以直接暂停
    if (ws == Node.SIGNAL)
        return true;
    // 大于 0 是被取消状态,向前遍历找到能够唤醒当前节点的节点
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 小于 0 说明是 0 或者 PROGAGATE,把前置节点设置为 SIGNAL 模式,以便唤醒自己
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    // 不可中断的,acquireQueued 中自旋再次执行相同逻辑,下次再进入此方法时,就会直接返回 true
    return false;
}
# parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
    // 暂停
    LockSupport.park(this);
    // 检查并清除中断标志
    return Thread.interrupted();
}
# cancelAcquire
private void cancelAcquire(Node node) {
    //node 为空忽略掉
    if (node == null)
        return;
    // 清空节点对应的线程
    node.thread = null;
    // 前置节点
    Node pred = node.prev;
    // 前置节点也是取消状态
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 前置节点的后置节点,就是为了 CAS 设置新的后置节点
    Node predNext = pred.next;
    // 节点状态设置为 CANCELLED
    node.waitStatus = Node.CANCELLED;
    // 如果是尾结点,设置前置节点为新的尾结点,并将前置节点的 next 节点设置为 null
    if (node == tail && compareAndSetTail(node, pred)) {
        pred.compareAndSetNext(predNext, null);
    } else {
        // 不是尾结点,或者有其他线程新加入节点
        int ws;
        // 前置节点不是头结点
        if (pred != head &&
            // 前置节点是 SIGNAL 状态
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             // 前置节点不是 SIGNAL 状态,也不是 CANCELLED 状态,且被设置为 SIGNAL 状态
             (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
            // 前置节点的线程不为空
            pred.thread != null) {
            // 后置节点
            Node next = node.next;
            // 后置节点有值,且没有被取消,去掉当前节点,即将前置节点指向后置节点
            if (next != null && next.waitStatus <= 0)
                pred.compareAndSetNext(predNext, next);
        } else {
            // 前置节点是头节点,或是 CANCELLED 状态,或前置节点的线程为空,唤醒第一个有效的后置节点
            unparkSuccessor(node);
        }
        // 帮助 GC
        node.next = node; 
    }
}
# unparkSuccessor
private void unparkSuccessor(Node node) {
    // 节点的等待状态
    int ws = node.waitStatus;
    // 如果小于 0,设置为 0
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);
    // 后置节点
    Node s = node.next;
    // 后置节点为空或者等待状态是 CANCELLED
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从尾结点向前遍历,找到 node 或 null 后面第一个等待状态不是 CANCELLED 的节点
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    // 唤醒 node 后面第一个有效的节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

# 释放独占锁

# release
public final boolean release(int arg) {
  // 尝试释放锁,通常由子类实现
  if (tryRelease(arg)) {
    Node h = head;
    // 唤醒下一个节点
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

# 获取共享锁

获取共享锁流程

# acquireShared
public final void acquireShared(int arg) {
  // 尝试获取共享锁失败,通常由子类实现
  if (tryAcquireShared(arg) < 0)
    // 获取共享锁
    doAcquireShared(arg);
}
# doAcquireShared
private void doAcquireShared(int arg) {
  // 自旋添加到等待队列
  final Node node = addWaiter(Node.SHARED);
  boolean interrupted = false;
  try {
    // 自旋
    for (;;) {
      // 前置节点
      final Node p = node.predecessor();
      // 前置节点已获取到锁
      if (p == head) {
        // 尝试获取下锁,万一前置节点已释放呢
        int r = tryAcquireShared(arg);
        // 拿到了锁,设置为 head,唤醒后面的节点
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          // 帮助 GC
          p.next = null; 
          return;
        }
      }
      // 是否可以进行等待
      if (shouldParkAfterFailedAcquire(p, node))
        // 等待,等待结束需要知道是否是中断
        interrupted |= parkAndCheckInterrupt();
    }
  } catch (Throwable t) {
    // 取消排队
    cancelAcquire(node);
    throw t;
  } finally {
    // 进行中断
    if (interrupted)
      selfInterrupt();
  }
}
# setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head; 
  // 将当前节点设置为 head
  setHead(node);
  
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    // 下一个没了或者是共享,释放共享锁,唤醒后面的节点
    if (s == null || s.isShared())
      doReleaseShared();
  }
}
# doReleaseShared
private void doReleaseShared() {
  // 自旋
  for (;;) {
    Node h = head;
    // 队列中有元素
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      // 将 SIGNAL 设置为 0
      if (ws == Node.SIGNAL) {
        // CAS 设置直到成功
        if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
          continue;   
        // 唤醒后面的节点
        unparkSuccessor(h);
      }
      // 
      else if (ws == 0 &&
               // CAS 设置直到成功
               !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
        continue;                
    }
    //head 还是原来的线程,说明没有被上面 unparkSuccessor 唤醒,直接退出循环
    // 如果其他线程已经占领了 lead,则继续帮忙唤醒后面的元素
    if (h == head)                   
      break;
  }
}

# 释放共享锁

# releaseShared
public final boolean releaseShared(int arg) {
  // 尝试释放共享锁,由子类实现
  if (tryReleaseShared(arg)) {
		// 释放锁
    doReleaseShared();
    return true;
  }
  return false;
}

# 总结

AQS 提供了获取锁和释放锁的基础方法,子类只需要实现 tryAcquire、tryRelease、tryAcquireShared 和 tryReleaseShared 方法,就能实现锁的相关操作,也就是说,AQS 屏蔽了底层实现,上层业务实现留给子类。