ReentrantReadWriteLock 是一种读写锁,也是基于 AQS 实现的,同时依赖于 AQS 的共享锁和独占锁操作,支持公平和非公平模式。

# 使用示例

public static void main(String[] args) {
  ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
  ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
  new Thread(() -> {
    try {
      readLock.lock();
      System.out.println(Thread.currentThread().getName() + " get read lock.");
      Thread.sleep(5000L);
    } catch (Exception e) {
      // ...
    } finally {
      readLock.unlock();
      System.out.println(Thread.currentThread().getName() + " release read lock.");
    }
  }, "readThread1").start();
  new Thread(() -> {
    try {
      readLock.lock();
      System.out.println(Thread.currentThread().getName() + " get read lock.");
      Thread.sleep(10000L);
    } catch (Exception e) {
      // ...
    } finally {
      readLock.unlock();
      System.out.println(Thread.currentThread().getName() + " release read lock.");
    }
  }, "readThread2").start();
  new Thread(() -> {
    try {
      writeLock.lock();
      System.out.println(Thread.currentThread().getName() + " get write lock.");
    } catch (Exception e) {
      // ...
    } finally {
      writeLock.unlock();
      System.out.println(Thread.currentThread().getName() + " release write lock.");
    }
  }, "writeThread1").start();
}

运行结果:

readThread1 get read lock.
readThread2 get read lock.
readThread1 release read lock.
readThread2 release read lock.
writeThread1 get write lock.
writeThread1 release write lock.

可以看出,只有其他线程释放读锁后,才能获取到写锁。

# 源码分析

ReentrantReadWriteLock 实现了 ReadWriteLock 接口,它定义了两个方法:readLock () 和 writeLock (),分别获取一个读锁和写锁。

Lock readLock();
Lock writeLock();

公平和非公平模式,通过构造函数参数指定,默认非公平模式。

public ReentrantReadWriteLock() {
  this(false);
}
public ReentrantReadWriteLock(boolean fair) {
  // 先构造同步器,再构造读写锁
  sync = fair ? new FairSync() : new NonfairSync();
  readerLock = new ReadLock(this);
  writerLock = new WriteLock(this);
}

锁内部通过 ThreadLocal 的方式记录了每个线程重入读锁的次数,并且再单独记录下最后一个获取读锁的线程重入的次数,在下一个要释放的线程是最后一个获取的线程的情况下,节省 ThreadLocal 查找。

private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
  public HoldCounter initialValue() {
    return new HoldCounter();
  }
}
static final class HoldCounter {
  int count = 0;
  // 这里使用线程 ID,而不是引用线程,是为了避免影响线程的垃圾回收
  final long tid = getThreadId(Thread.currentThread());
}

同时也记录了第一个获取读锁的线程以及它的重入次数。

private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

# 锁实现

Sync 继承 AQS,实现了一个基础锁操作。

AQS 只有一个 state 用来保存锁的状态,读写锁通过控制 int 的前 16 位和后 16 位来保存状态,这样当低位是 0 时,就可以获取写锁和读锁。下来看下代码,后面具体是如何维护锁的状态的。

# tryRelease

这个方法是释放写锁的。

protected final boolean tryRelease(int releases) {
  // 只有锁的持有者才能释放锁
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  // 释放锁后的数量
  int nextc = getState() - releases;
  // 是否释放了写锁
  boolean free = exclusiveCount(nextc) == 0;
  // 释放写锁时将持有者置空
  if (free)
    setExclusiveOwnerThread(null);
  // 更新 state
  setState(nextc);
  return free;
}

# tryAcquire

这个方法是获取写锁的。

protected final boolean tryAcquire(int acquires) {
  Thread current = Thread.currentThread();
  int c = getState();
  // 写锁数量
  int w = exclusiveCount(c);
  //state 数量不为 0
  if (c != 0) {
    // 写锁数量为 0,说明当前读锁数量不为 0,不能获取写锁
    // 写锁数量不为 0,且当前线程并没有持有写锁,也不能获取写锁
    if (w == 0 || current != getExclusiveOwnerThread())
      return false;
    // 超出写锁数量
    if (w + exclusiveCount(acquires) > MAX_COUNT)
      throw new Error("Maximum lock count exceeded");
    // 当前线程原来就持有写锁,更新写锁数量
    setState(c + acquires);
    return true;
  }
  //state 数量是 0,writerShouldBlock () 方法是为了处理公平性的,如果是非公平模式,可以直接抢占锁,如果是公平模式,需要判断队列里是否有等待的线程
  if (writerShouldBlock() ||
      !compareAndSetState(c, c + acquires))
    return false;
  // 已成功获取到锁,更新锁的当前持有者
  setExclusiveOwnerThread(current);
  return true;
}

# tryReleaseShared

这个方法是释放读锁的,读锁的释放为什么不需要指定数量。

protected final boolean tryReleaseShared(int unused) {
  Thread current = Thread.currentThread();
  // 当前线程是第一个获取读锁的线程
  if (firstReader == current) {
    // 只重入过 1 次,将 firstReader 置为空
    if (firstReaderHoldCount == 1)
      firstReader = null;
    // 重入过多次,将重入次数减一
    else
      firstReaderHoldCount--;
  } else {
    // 当前线程不是第一个获取读锁的线程,先从缓存找下有没有
    HoldCounter rh = cachedHoldCounter;
    // 缓存的不是当前线程,从 ThreadLocal 中查找
    if (rh == null || rh.tid != getThreadId(current))
      rh = readHolds.get();
    int count = rh.count;
    // 只重入过 1 次,删除 ThreadLocal 中的计数
    if (count <= 1) {
      readHolds.remove();
      if (count <= 0)
        throw unmatchedUnlockException();
    }
    // 重入过多次,减一
    --rh.count;
  }
  // 自旋
  for (;;) {
    int c = getState();
    // SHARED_UNIT = 1 << 16,相当于读锁数量 - 1
    int nextc = c - SHARED_UNIT;
    if (compareAndSetState(c, nextc))
      // 返回是否完全释放掉锁,即读锁和写锁是否都已完全释放,这个时候可以唤醒等待写锁的线程了
      return nextc == 0;
  }
}

# tryAcquireShared

protected final int tryAcquireShared(int unused) {
  Thread current = Thread.currentThread();
  int c = getState();
  // 已有其他线程持有写锁,不能获取到锁
  if (exclusiveCount(c) != 0 &&
      getExclusiveOwnerThread() != current)
    return -1;
  // 读锁被持有的数量
  int r = sharedCount(c);
  //readerShouldBlock () 会是处理公平性的,在公平模式下,要排在等待队列中的线程之后,在非公平模式下,会判断等待队列的第一个等待线程是否获取的是写锁,如果是的话,也会进行等待。
  if (!readerShouldBlock() &&
      r < MAX_COUNT &&
      compareAndSetState(c, c + SHARED_UNIT)) {
    // 读锁之前没有被持有,记录 firstReader
    if (r == 0) {
      firstReader = current;
      firstReaderHoldCount = 1;
    } else if (firstReader == current) {
      // 读锁之前被持有过,且第一次持有读锁的线程是自己,更新持有数量
      firstReaderHoldCount++;
    } else {
      // 读锁之前被持有过,但不是第一次持有的,先看有没有在缓存里,然后再去 ThreadLocal 里找,更新持有数量
      HoldCounter rh = cachedHoldCounter;
      if (rh == null || rh.tid != getThreadId(current))
        cachedHoldCounter = rh = readHolds.get();
      else if (rh.count == 0)
        readHolds.set(rh);
      rh.count++;
    }
    return 1;
  }
  // 获取锁失败,继续更复杂的方式获取
  return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
  HoldCounter rh = null;
  // 自旋
  for (;;) {
    int c = getState();
    // 写锁已被持有
    if (exclusiveCount(c) != 0) {
      // 不是当前线程持有的写锁,获取失败
      if (getExclusiveOwnerThread() != current)
        return -1;
    } else if (readerShouldBlock()) {
      // 需要阻塞的话,如果当前线程之前没有获取过读锁,就直接获取失败了
      // 如果是第一个获取到锁的线程,说明之前获取到过读锁
      if (firstReader == current) {
        
      } else {
        if (rh == null) {
          rh = cachedHoldCounter;
          if (rh == null || rh.tid != getThreadId(current)) {
            rh = readHolds.get();
            if (rh.count == 0)
              readHolds.remove();
          }
        }
        if (rh.count == 0)
          return -1;
      }
    }
    // 检查读锁数量
    if (sharedCount(c) == MAX_COUNT)
      throw new Error("Maximum lock count exceeded");
    // 读锁数量加一
    if (compareAndSetState(c, c + SHARED_UNIT)) {
      // 更新相关记录
      if (sharedCount(c) == 0) {
        firstReader = current;
        firstReaderHoldCount = 1;
      } else if (firstReader == current) {
        firstReaderHoldCount++;
      } else {
        if (rh == null)
          rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
          rh = readHolds.get();
        else if (rh.count == 0)
          readHolds.set(rh);
        rh.count++;
        cachedHoldCounter = rh; 
      }
      return 1;
    }
  }
}

上面两个方法看起来逻辑类似,区别在于,第一个获取锁的方式简单,只是简单的试下,第二个是在 CAS 尝试失败后的处理,其实大致逻辑和第一个相同,只是在需要阻塞时,将不是重入的踢出去,让它排队去,并且在获取锁数量超出上限时,抛出异常。

获取读锁

# NonfairSync & FairSync

NonfairSync 和 FairSync 继承 Sync,主要是实现了获取读锁和写锁时是否需要阻塞,以控制公平性。

static final class NonfairSync extends Sync {
    final boolean writerShouldBlock() {
        // 非公平模式下,读锁不需要阻塞,直接获取锁
        return false; 
    }
    final boolean readerShouldBlock() {
        // 等待队列的第一个线程是否是要获取写锁,为了避免饿死获取写锁的线程,如果即将轮到它,就不让新来的获取读锁的线程欺负它了
        return apparentlyFirstQueuedIsExclusive();
    }
}
static final class FairSync extends Sync {
    final boolean writerShouldBlock() {
        // 公平模式下都需要按照先来后到的顺序排队
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}

####ReadLock & WriteLock

ReadLock 和 WriteLock 实现 Lock 接口,依赖 Sync 等类包装了获取锁和释放锁等方法。

# 总结

ReentrantReadWriteLock 提供一个读锁和一个写锁,锁内部依赖 Sync 实现的获取锁和释放锁的操作,同时支持公平和非公平模式。

写锁和读锁的状态合在一起构成 state,前 16 位是读锁,后 16 位是写锁,这样不管是获取读锁,还是获取写锁,直接判断低位是否为 0 就可以。

获取写锁时要看 state 是否 0 或写锁是否被自己持有,获取读锁时要看低位是否为 0 或写锁是否被自己持有;释放写锁时由于不会有其他线程同时更新 state,不需要要 CAS 更新,直接进行更新即可,释放读锁时因为可能有其他读锁线程同时更新 state,所以需要自旋 CAS 更新。

写锁由于同时只能有一个线程持有,所以释放锁时是否完全释放,看 state 低位是否为 0 即可,读锁同时可以有多个线程,所以还需要额外记录每个线程重入的次数。

读锁使用 ThreadLocal 记录每个线程重入的次数,其实查找效率很低的。在同时只有一个线程获取读锁的情况下,增加了一个 firstReader 和 firstReaderHoldCount 来避免使用低效的 map 查找。在读锁的很多场景下,要释放锁的线程其实是最后一个获取锁的线程,所以用一个 cacheHoldCounter 来缓存最后一个线程获取读锁的次数,减少 map 查询。