ReentrantReadWriteLock 是一种读写锁,也是基于 AQS 实现的,同时依赖于 AQS 的共享锁和独占锁操作,支持公平和非公平模式。
# 使用示例
public static void main(String[] args) { | |
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); | |
ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); | |
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); | |
new Thread(() -> { | |
try { | |
readLock.lock(); | |
System.out.println(Thread.currentThread().getName() + " get read lock."); | |
Thread.sleep(5000L); | |
} catch (Exception e) { | |
// ... | |
} finally { | |
readLock.unlock(); | |
System.out.println(Thread.currentThread().getName() + " release read lock."); | |
} | |
}, "readThread1").start(); | |
new Thread(() -> { | |
try { | |
readLock.lock(); | |
System.out.println(Thread.currentThread().getName() + " get read lock."); | |
Thread.sleep(10000L); | |
} catch (Exception e) { | |
// ... | |
} finally { | |
readLock.unlock(); | |
System.out.println(Thread.currentThread().getName() + " release read lock."); | |
} | |
}, "readThread2").start(); | |
new Thread(() -> { | |
try { | |
writeLock.lock(); | |
System.out.println(Thread.currentThread().getName() + " get write lock."); | |
} catch (Exception e) { | |
// ... | |
} finally { | |
writeLock.unlock(); | |
System.out.println(Thread.currentThread().getName() + " release write lock."); | |
} | |
}, "writeThread1").start(); | |
} |
运行结果:
readThread1 get read lock. | |
readThread2 get read lock. | |
readThread1 release read lock. | |
readThread2 release read lock. | |
writeThread1 get write lock. | |
writeThread1 release write lock. |
可以看出,只有其他线程释放读锁后,才能获取到写锁。
# 源码分析
ReentrantReadWriteLock 实现了 ReadWriteLock 接口,它定义了两个方法:readLock () 和 writeLock (),分别获取一个读锁和写锁。
Lock readLock(); | |
Lock writeLock(); |
公平和非公平模式,通过构造函数参数指定,默认非公平模式。
public ReentrantReadWriteLock() { | |
this(false); | |
} | |
public ReentrantReadWriteLock(boolean fair) { | |
// 先构造同步器,再构造读写锁 | |
sync = fair ? new FairSync() : new NonfairSync(); | |
readerLock = new ReadLock(this); | |
writerLock = new WriteLock(this); | |
} |
锁内部通过 ThreadLocal 的方式记录了每个线程重入读锁的次数,并且再单独记录下最后一个获取读锁的线程重入的次数,在下一个要释放的线程是最后一个获取的线程的情况下,节省 ThreadLocal 查找。
private transient ThreadLocalHoldCounter readHolds; | |
private transient HoldCounter cachedHoldCounter; | |
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { | |
public HoldCounter initialValue() { | |
return new HoldCounter(); | |
} | |
} | |
static final class HoldCounter { | |
int count = 0; | |
// 这里使用线程 ID,而不是引用线程,是为了避免影响线程的垃圾回收 | |
final long tid = getThreadId(Thread.currentThread()); | |
} |
同时也记录了第一个获取读锁的线程以及它的重入次数。
private transient Thread firstReader = null; | |
private transient int firstReaderHoldCount; |
# 锁实现
Sync 继承 AQS,实现了一个基础锁操作。
AQS 只有一个 state 用来保存锁的状态,读写锁通过控制 int 的前 16 位和后 16 位来保存状态,这样当低位是 0 时,就可以获取写锁和读锁。下来看下代码,后面具体是如何维护锁的状态的。
# tryRelease
这个方法是释放写锁的。
protected final boolean tryRelease(int releases) { | |
// 只有锁的持有者才能释放锁 | |
if (!isHeldExclusively()) | |
throw new IllegalMonitorStateException(); | |
// 释放锁后的数量 | |
int nextc = getState() - releases; | |
// 是否释放了写锁 | |
boolean free = exclusiveCount(nextc) == 0; | |
// 释放写锁时将持有者置空 | |
if (free) | |
setExclusiveOwnerThread(null); | |
// 更新 state | |
setState(nextc); | |
return free; | |
} |
# tryAcquire
这个方法是获取写锁的。
protected final boolean tryAcquire(int acquires) { | |
Thread current = Thread.currentThread(); | |
int c = getState(); | |
// 写锁数量 | |
int w = exclusiveCount(c); | |
//state 数量不为 0 | |
if (c != 0) { | |
// 写锁数量为 0,说明当前读锁数量不为 0,不能获取写锁 | |
// 写锁数量不为 0,且当前线程并没有持有写锁,也不能获取写锁 | |
if (w == 0 || current != getExclusiveOwnerThread()) | |
return false; | |
// 超出写锁数量 | |
if (w + exclusiveCount(acquires) > MAX_COUNT) | |
throw new Error("Maximum lock count exceeded"); | |
// 当前线程原来就持有写锁,更新写锁数量 | |
setState(c + acquires); | |
return true; | |
} | |
//state 数量是 0,writerShouldBlock () 方法是为了处理公平性的,如果是非公平模式,可以直接抢占锁,如果是公平模式,需要判断队列里是否有等待的线程 | |
if (writerShouldBlock() || | |
!compareAndSetState(c, c + acquires)) | |
return false; | |
// 已成功获取到锁,更新锁的当前持有者 | |
setExclusiveOwnerThread(current); | |
return true; | |
} |
# tryReleaseShared
这个方法是释放读锁的,读锁的释放为什么不需要指定数量。
protected final boolean tryReleaseShared(int unused) { | |
Thread current = Thread.currentThread(); | |
// 当前线程是第一个获取读锁的线程 | |
if (firstReader == current) { | |
// 只重入过 1 次,将 firstReader 置为空 | |
if (firstReaderHoldCount == 1) | |
firstReader = null; | |
// 重入过多次,将重入次数减一 | |
else | |
firstReaderHoldCount--; | |
} else { | |
// 当前线程不是第一个获取读锁的线程,先从缓存找下有没有 | |
HoldCounter rh = cachedHoldCounter; | |
// 缓存的不是当前线程,从 ThreadLocal 中查找 | |
if (rh == null || rh.tid != getThreadId(current)) | |
rh = readHolds.get(); | |
int count = rh.count; | |
// 只重入过 1 次,删除 ThreadLocal 中的计数 | |
if (count <= 1) { | |
readHolds.remove(); | |
if (count <= 0) | |
throw unmatchedUnlockException(); | |
} | |
// 重入过多次,减一 | |
--rh.count; | |
} | |
// 自旋 | |
for (;;) { | |
int c = getState(); | |
// SHARED_UNIT = 1 << 16,相当于读锁数量 - 1 | |
int nextc = c - SHARED_UNIT; | |
if (compareAndSetState(c, nextc)) | |
// 返回是否完全释放掉锁,即读锁和写锁是否都已完全释放,这个时候可以唤醒等待写锁的线程了 | |
return nextc == 0; | |
} | |
} |
# tryAcquireShared
protected final int tryAcquireShared(int unused) { | |
Thread current = Thread.currentThread(); | |
int c = getState(); | |
// 已有其他线程持有写锁,不能获取到锁 | |
if (exclusiveCount(c) != 0 && | |
getExclusiveOwnerThread() != current) | |
return -1; | |
// 读锁被持有的数量 | |
int r = sharedCount(c); | |
//readerShouldBlock () 会是处理公平性的,在公平模式下,要排在等待队列中的线程之后,在非公平模式下,会判断等待队列的第一个等待线程是否获取的是写锁,如果是的话,也会进行等待。 | |
if (!readerShouldBlock() && | |
r < MAX_COUNT && | |
compareAndSetState(c, c + SHARED_UNIT)) { | |
// 读锁之前没有被持有,记录 firstReader | |
if (r == 0) { | |
firstReader = current; | |
firstReaderHoldCount = 1; | |
} else if (firstReader == current) { | |
// 读锁之前被持有过,且第一次持有读锁的线程是自己,更新持有数量 | |
firstReaderHoldCount++; | |
} else { | |
// 读锁之前被持有过,但不是第一次持有的,先看有没有在缓存里,然后再去 ThreadLocal 里找,更新持有数量 | |
HoldCounter rh = cachedHoldCounter; | |
if (rh == null || rh.tid != getThreadId(current)) | |
cachedHoldCounter = rh = readHolds.get(); | |
else if (rh.count == 0) | |
readHolds.set(rh); | |
rh.count++; | |
} | |
return 1; | |
} | |
// 获取锁失败,继续更复杂的方式获取 | |
return fullTryAcquireShared(current); | |
} |
final int fullTryAcquireShared(Thread current) { | |
HoldCounter rh = null; | |
// 自旋 | |
for (;;) { | |
int c = getState(); | |
// 写锁已被持有 | |
if (exclusiveCount(c) != 0) { | |
// 不是当前线程持有的写锁,获取失败 | |
if (getExclusiveOwnerThread() != current) | |
return -1; | |
} else if (readerShouldBlock()) { | |
// 需要阻塞的话,如果当前线程之前没有获取过读锁,就直接获取失败了 | |
// 如果是第一个获取到锁的线程,说明之前获取到过读锁 | |
if (firstReader == current) { | |
} else { | |
if (rh == null) { | |
rh = cachedHoldCounter; | |
if (rh == null || rh.tid != getThreadId(current)) { | |
rh = readHolds.get(); | |
if (rh.count == 0) | |
readHolds.remove(); | |
} | |
} | |
if (rh.count == 0) | |
return -1; | |
} | |
} | |
// 检查读锁数量 | |
if (sharedCount(c) == MAX_COUNT) | |
throw new Error("Maximum lock count exceeded"); | |
// 读锁数量加一 | |
if (compareAndSetState(c, c + SHARED_UNIT)) { | |
// 更新相关记录 | |
if (sharedCount(c) == 0) { | |
firstReader = current; | |
firstReaderHoldCount = 1; | |
} else if (firstReader == current) { | |
firstReaderHoldCount++; | |
} else { | |
if (rh == null) | |
rh = cachedHoldCounter; | |
if (rh == null || rh.tid != getThreadId(current)) | |
rh = readHolds.get(); | |
else if (rh.count == 0) | |
readHolds.set(rh); | |
rh.count++; | |
cachedHoldCounter = rh; | |
} | |
return 1; | |
} | |
} | |
} |
上面两个方法看起来逻辑类似,区别在于,第一个获取锁的方式简单,只是简单的试下,第二个是在 CAS 尝试失败后的处理,其实大致逻辑和第一个相同,只是在需要阻塞时,将不是重入的踢出去,让它排队去,并且在获取锁数量超出上限时,抛出异常。
# NonfairSync & FairSync
NonfairSync 和 FairSync 继承 Sync,主要是实现了获取读锁和写锁时是否需要阻塞,以控制公平性。
static final class NonfairSync extends Sync { | |
final boolean writerShouldBlock() { | |
// 非公平模式下,读锁不需要阻塞,直接获取锁 | |
return false; | |
} | |
final boolean readerShouldBlock() { | |
// 等待队列的第一个线程是否是要获取写锁,为了避免饿死获取写锁的线程,如果即将轮到它,就不让新来的获取读锁的线程欺负它了 | |
return apparentlyFirstQueuedIsExclusive(); | |
} | |
} | |
static final class FairSync extends Sync { | |
final boolean writerShouldBlock() { | |
// 公平模式下都需要按照先来后到的顺序排队 | |
return hasQueuedPredecessors(); | |
} | |
final boolean readerShouldBlock() { | |
return hasQueuedPredecessors(); | |
} | |
} |
####ReadLock & WriteLock
ReadLock 和 WriteLock 实现 Lock 接口,依赖 Sync 等类包装了获取锁和释放锁等方法。
# 总结
ReentrantReadWriteLock 提供一个读锁和一个写锁,锁内部依赖 Sync 实现的获取锁和释放锁的操作,同时支持公平和非公平模式。
写锁和读锁的状态合在一起构成 state,前 16 位是读锁,后 16 位是写锁,这样不管是获取读锁,还是获取写锁,直接判断低位是否为 0 就可以。
获取写锁时要看 state 是否 0 或写锁是否被自己持有,获取读锁时要看低位是否为 0 或写锁是否被自己持有;释放写锁时由于不会有其他线程同时更新 state,不需要要 CAS 更新,直接进行更新即可,释放读锁时因为可能有其他读锁线程同时更新 state,所以需要自旋 CAS 更新。
写锁由于同时只能有一个线程持有,所以释放锁时是否完全释放,看 state 低位是否为 0 即可,读锁同时可以有多个线程,所以还需要额外记录每个线程重入的次数。
读锁使用 ThreadLocal 记录每个线程重入的次数,其实查找效率很低的。在同时只有一个线程获取读锁的情况下,增加了一个 firstReader 和 firstReaderHoldCount 来避免使用低效的 map 查找。在读锁的很多场景下,要释放锁的线程其实是最后一个获取锁的线程,所以用一个 cacheHoldCounter 来缓存最后一个线程获取读锁的次数,减少 map 查询。