# 前言
HashMap 虽然很常用,但是线程不安全,具体哪里会线程不安全呢?可以看看 ConcurrentHash 是如何处理线程安全的,它就是对 HashMap 线程安全的改造。HashTable 也是线程安全的,但是它采用了在方法同步的方式,效率很低。
注:本文仅针对 JDK8。
# 数据结构
数据结构采用 Node 数组 + 链表 + 红黑树。
# Node
static class Node<K,V> implements Map.Entry<K,V> { | |
final int hash; | |
final K key; | |
volatile V val; | |
volatile Node<K,V> next; | |
} |
# ForwardingNode
扩容时使用到的结构,标志数组当前位置的元素已经全部迁移完成。
static final class ForwardingNode<K,V> extends Node<K,V> { | |
final Node<K,V>[] nextTable; | |
// 已经迁移完成 | |
ForwardingNode(Node<K,V>[] tab) { | |
super(MOVED, null, null, null); | |
this.nextTable = tab; | |
} | |
} |
# TreeNode
红黑树节点。
static final class TreeNode<K,V> extends Node<K,V> { | |
TreeNode<K,V> parent; | |
TreeNode<K,V> left; | |
TreeNode<K,V> right; | |
TreeNode<K,V> prev; | |
boolean red; | |
} |
# TreeBin
红黑树结构。
static final class TreeBin<K,V> extends Node<K,V> { | |
TreeNode<K,V> root; | |
volatile TreeNode<K,V> first; | |
// 等待线程 | |
volatile Thread waiter; | |
// 锁状态 | |
volatile int lockState; | |
// 写锁 | |
static final int WRITER = 1; | |
// 等待锁 | |
static final int WAITER = 2; | |
// 读锁 | |
static final int READER = 4; | |
} |
# 关键变量
# 常量
// 最大容量 | |
private static final int MAXIMUM_CAPACITY = 1 << 30; | |
// 默认容量 | |
private static final int DEFAULT_CAPACITY = 16; | |
// 链表变为红黑树的阈值 | |
static final int TREEIFY_THRESHOLD = 8; | |
// 红黑树变为链表的阈值 | |
static final int UNTREEIFY_THRESHOLD = 6; | |
// 当这个哈希表容量大于该值时,才允许转为红黑树,否则直接扩容 | |
static final int MIN_TREEIFY_CAPACITY = 64; | |
// 表示数组正在扩容,且当前位置扩容完成 | |
static final int MOVED = -1; | |
// 表示已转为红黑树 | |
static final int TREEBIN = -2; | |
// 该标记的类在 compute 和 computeIf 方法中使用 | |
static final int RESERVED = -3; |
# table
transient volatile Node<K,V>[] table; |
存储数据的数组,大小是 2 的倍数,延迟初始化。
# nextTable
private transient volatile Node<K,V>[] nextTable; |
扩容用到的数组,table 中的元素都会移动到 nextTable 中。
# baseCount
private transient volatile long baseCount; |
基本计数器,主要在没有并发时使用,通过 CAS 更新。
# sizeCtl
private transient volatile int sizeCtl; |
如果为负,表示数组正在初始化或调整大小,-1 表示正在初始化,-n 表示有 n - 1 个线程正在进行扩容;如果 table 没有进行初始化,表示 table 初始化大小,如果 table 已经初始化,表示 table 下次扩容的容量。
# transferIndex
private transient volatile int transferIndex; |
扩容时的标记,扩容后数组尾部开始,向头部移动,表示最后一个扩容的元素下标。
# 原子操作
@SuppressWarnings("unchecked") | |
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { | |
return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE); | |
} | |
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, | |
Node<K,V> c, Node<K,V> v) { | |
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v); | |
} | |
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { | |
U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v); | |
} |
使用了 Unsafe 中提供的原子方法,Unsafe 参考 Java 魔法类:Unsafe 应用解析。
# 主要方法
# initTable
private final Node<K,V>[] initTable() { | |
Node<K,V>[] tab; int sc; | |
while ((tab = table) == null || tab.length == 0) { | |
//sizeCtl 小于 0 说明别的线程在初始化,让出 CPU 执行权 | |
if ((sc = sizeCtl) < 0) | |
Thread.yield(); | |
// CAS 设置 sizeCtl 的值为 -1,设置成功则进行初始化 | |
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { | |
try { | |
if ((tab = table) == null || tab.length == 0) { | |
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; | |
@SuppressWarnings("unchecked") | |
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; | |
table = tab = nt; | |
//sizeCtl 设置为 table 容量的 0.75 倍 | |
sc = n - (n >>> 2); | |
} | |
} finally { | |
sizeCtl = sc; | |
} | |
break; | |
} | |
} | |
return tab; | |
} |
# put
public V put(K key, V value) { | |
return putVal(key, value, false); | |
} | |
final V putVal(K key, V value, boolean onlyIfAbsent) { | |
// ConcurrentHashMap key 和 value 都不支持空值 | |
if (key == null || value == null) throw new NullPointerException(); | |
// 计算哈希 | |
int hash = spread(key.hashCode()); | |
// 记录链表元素个数 | |
int binCount = 0; | |
for (Node<K,V>[] tab = table;;) { | |
Node<K,V> f; int n, i, fh; K fk; V fv; | |
// 1. table 为空,进行初始化,然后继续循环 | |
if (tab == null || (n = tab.length) == 0) | |
tab = initTable(); | |
// 2. table 中目标位置为空,直接进行赋值,然后跳出循环 | |
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { | |
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) | |
break; | |
} | |
// 3. 在扩容,帮助扩容,然后继续循环,在扩容后的位置进行插入和替换 | |
else if ((fh = f.hash) == MOVED) | |
tab = helpTransfer(tab, f); | |
// 4. 这种情况下,不需要遍历且没有对元素做改变,不需要加锁,提前判断下,直接返回原值 | |
else if (onlyIfAbsent | |
&& fh == hash | |
&& ((fk = f.key) == key || (fk != null && key.equals(fk))) | |
&& (fv = f.val) != null) | |
return fv; | |
// 5. 给头结点加锁,并遍历链表或红黑树,进行插入或替换 | |
else { | |
V oldVal = null; | |
synchronized (f) { | |
if (tabAt(tab, i) == f) { | |
if (fh >= 0) { | |
binCount = 1; | |
for (Node<K,V> e = f;; ++binCount) { | |
K ek; | |
if (e.hash == hash && | |
((ek = e.key) == key || | |
(ek != null && key.equals(ek)))) { | |
oldVal = e.val; | |
if (!onlyIfAbsent) | |
e.val = value; | |
break; | |
} | |
Node<K,V> pred = e; | |
if ((e = e.next) == null) { | |
pred.next = new Node<K,V>(hash, key, value); | |
break; | |
} | |
} | |
} | |
else if (f instanceof TreeBin) { | |
Node<K,V> p; | |
binCount = 2; | |
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, | |
value)) != null) { | |
oldVal = p.val; | |
if (!onlyIfAbsent) | |
p.val = value; | |
} | |
} | |
else if (f instanceof ReservationNode) | |
throw new IllegalStateException("Recursive update"); | |
} | |
} | |
if (binCount != 0) { | |
if (binCount >= TREEIFY_THRESHOLD) | |
treeifyBin(tab, i); | |
if (oldVal != null) | |
return oldVal; | |
break; | |
} | |
} | |
} | |
// 给元素个数加 1 | |
addCount(1L, binCount); | |
return null; | |
} |
put 操作大致流程是:
- 数组为空先初始化数组
- 数组元素为空直接赋值
- 在扩容的话帮助扩容,然后插入到扩容之后的位置
- 有元素也没有在扩容的话,锁住元素,然后进行替换或插入,如果是链表且长度大于阈值了就变成红黑树
- 最后调整 count(具体细节暂时不太清楚)
# 流程图
有颜色的地方做了并发处理,找到数组元素使用 tabAt 方法,赋值使用 casTabAt 方法,锁住数组元素使用 synchronized 锁。标红部分均在方法内部使用了 Unsafe 的方法来保证原子性。
# get
public V get(Object key) { | |
//tab 等于 table | |
//e 是 key 在 table 中元素; | |
//p 是 e 链表中找到的 key 相等的元素 | |
//n 是 table 的数组长度 | |
//eh 是 e 的哈希值 | |
//ek 是循环链表 e 时,每个元素的 key | |
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; | |
int h = spread(key.hashCode()); | |
// 头结点有值 | |
if ((tab = table) != null && (n = tab.length) > 0 && | |
(e = tabAt(tab, (n - 1) & h)) != null) { | |
//eh 和 e 哈希值相等,可能就是目标值 | |
if ((eh = e.hash) == h) { | |
if ((ek = e.key) == key || (ek != null && key.equals(ek))) | |
return e.val; | |
} | |
// 头结点哈希值小于 0,说明正在扩容或者是红黑树 | |
else if (eh < 0) | |
return (p = e.find(h, key)) != null ? p.val : null; | |
// 遍历链表 | |
while ((e = e.next) != null) { | |
if (e.hash == h && | |
((ek = e.key) == key || (ek != null && key.equals(ek)))) | |
return e.val; | |
} | |
} | |
return null; | |
} |
get 方法在获取结点元素值时,也使用原子方法来保证读到的值没有被其他线程修改。
# remove
public V remove(Object key) { | |
return replaceNode(key, null, null); | |
} | |
final V replaceNode(Object key, V value, Object cv) { | |
int hash = spread(key.hashCode()); | |
for (Node<K,V>[] tab = table;;) { | |
Node<K,V> f; int n, i, fh; | |
//table 为空或头结点为空,说明没有要替换的 | |
if (tab == null || (n = tab.length) == 0 || | |
(f = tabAt(tab, i = (n - 1) & hash)) == null) | |
break; | |
// 如果在扩容,先帮助扩容,然后再扩容后的位置进行替换 | |
else if ((fh = f.hash) == MOVED) | |
tab = helpTransfer(tab, f); | |
// 锁住头结点,遍历链表或红黑树进行替换 | |
else { | |
V oldVal = null; | |
boolean validated = false; | |
synchronized (f) { | |
if (tabAt(tab, i) == f) { | |
// 链表 | |
if (fh >= 0) { | |
validated = true; | |
for (Node<K,V> e = f, pred = null;;) { | |
K ek; | |
// 找到目标位置 | |
if (e.hash == hash && | |
((ek = e.key) == key || | |
(ek != null && key.equals(ek)))) { | |
V ev = e.val; | |
// 当 cv 为空或目标值等于 cv 时才进行替换 | |
if (cv == null || cv == ev || | |
(ev != null && cv.equals(ev))) { | |
oldVal = ev; | |
//value 不为空进行替换 | |
if (value != null) | |
e.val = value; | |
//value 为空且不是头结点进行移除 | |
else if (pred != null) | |
pred.next = e.next; | |
// 是头结点设置数组元素为链表下一个节点 | |
else | |
setTabAt(tab, i, e.next); | |
} | |
break; | |
} | |
pred = e; | |
// 未找到目标位置,直接退出 | |
if ((e = e.next) == null) | |
break; | |
} | |
} | |
// 红黑树 | |
else if (f instanceof TreeBin) { | |
validated = true; | |
TreeBin<K,V> t = (TreeBin<K,V>)f; | |
TreeNode<K,V> r, p; | |
// 找到目标位置 | |
if ((r = t.root) != null && | |
(p = r.findTreeNode(hash, key, null)) != null) { | |
V pv = p.val; | |
// 当 cv 为空或目标值等于 cv 时才进行替换 | |
if (cv == null || cv == pv || | |
(pv != null && cv.equals(pv))) { | |
oldVal = pv; | |
//value 不为空进行替换 | |
if (value != null) | |
p.val = value; | |
//value 为空且不是头结点进行移除,并将数组元素指向根节点 | |
else if (t.removeTreeNode(p)) | |
setTabAt(tab, i, untreeify(t.first)); | |
} | |
} | |
} | |
else if (f instanceof ReservationNode) | |
throw new IllegalStateException("Recursive update"); | |
} | |
} | |
if (validated) { | |
// 如果替换或移除了某个节点,返回旧节点的值 | |
if (oldVal != null) { | |
// 移除节点进行 count 减一 | |
if (value == null) | |
addCount(-1L, -1); | |
return oldVal; | |
} | |
break; | |
} | |
} | |
} | |
return null; | |
} |
# transfer
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { | |
int n = tab.length, stride; | |
// 扩容由多个线程完成,根据当前 CPU 核数,确认步长 | |
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) | |
stride = MIN_TRANSFER_STRIDE; | |
// 初始化要扩容的数组,容量为原来的 2 倍 | |
if (nextTab == null) { | |
try { | |
@SuppressWarnings("unchecked") | |
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; | |
nextTab = nt; | |
} catch (Throwable ex) { | |
sizeCtl = Integer.MAX_VALUE; | |
return; | |
} | |
nextTable = nextTab; | |
// 扩容起始位置是数组尾部 | |
transferIndex = n; | |
} | |
int nextn = nextTab.length; | |
// 标志类 | |
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); | |
// 扩容是否向前推进 | |
boolean advance = true; | |
// 是否所有线程都全部迁移完成 | |
boolean finishing = false; | |
//i 代表当前线程正在迁移的下标,bound 代表本次可以迁移的范围下限 | |
for (int i = 0, bound = 0;;) { | |
Node<K,V> f; int fh; | |
while (advance) { | |
int nextIndex, nextBound; | |
if (--i >= bound || finishing) | |
advance = false; | |
else if ((nextIndex = transferIndex) <= 0) { | |
i = -1; | |
advance = false; | |
} | |
// 上次步长范围内的数据迁移完毕,进行下一个步长 | |
else if (U.compareAndSetInt | |
(this, TRANSFERINDEX, nextIndex, | |
nextBound = (nextIndex > stride ? | |
nextIndex - stride : 0))) { | |
bound = nextBound; | |
i = nextIndex - 1; | |
advance = false; | |
} | |
} | |
if (i < 0 || i >= n || i + n >= nextn) { | |
int sc; | |
// 迁移完成,更新 table 为 nextTab | |
if (finishing) { | |
nextTable = null; | |
table = nextTab; | |
// 扩容预置改为新数组长度的 0.75 倍 | |
sizeCtl = (n << 1) - (n >>> 1); | |
return; | |
} | |
// 当前线程退出扩容,将 sizeCtl - 1 | |
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { | |
// 校验当前 sc 是否和出初始值相等,相等的话说明全部线程迁移完成 | |
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) | |
return; | |
finishing = advance = true; | |
// 从后往前再检查一遍是否全部被迁移完成 | |
i = n; | |
} | |
} | |
// 如果 i 的位置为空,说明一迁移完成,把头节点设置为 fwd 标志 | |
else if ((f = tabAt(tab, i)) == null) | |
advance = casTabAt(tab, i, null, fwd); | |
// 若当前头节点是 fwd,说明迁移完成,向前推进 | |
else if ((fh = f.hash) == MOVED) | |
advance = true; | |
// 处理当前桶的迁移 | |
else { | |
synchronized (f) { | |
if (tabAt(tab, i) == f) { | |
Node<K,V> ln, hn; | |
// 链表 | |
if (fh >= 0) { | |
int runBit = fh & n; | |
Node<K,V> lastRun = f; | |
for (Node<K,V> p = f.next; p != null; p = p.next) { | |
int b = p.hash & n; | |
if (b != runBit) { | |
runBit = b; | |
lastRun = p; | |
} | |
} | |
if (runBit == 0) { | |
ln = lastRun; | |
hn = null; | |
} | |
else { | |
hn = lastRun; | |
ln = null; | |
} | |
for (Node<K,V> p = f; p != lastRun; p = p.next) { | |
int ph = p.hash; K pk = p.key; V pv = p.val; | |
if ((ph & n) == 0) | |
ln = new Node<K,V>(ph, pk, pv, ln); | |
else | |
hn = new Node<K,V>(ph, pk, pv, hn); | |
} | |
// 扩容后两条链表分别放在 i 和 i + n 的位置 | |
setTabAt(nextTab, i, ln); | |
setTabAt(nextTab, i + n, hn); | |
// 原数组的位置 i 标记迁移完毕 | |
setTabAt(tab, i, fwd); | |
advance = true; | |
} | |
// 红黑树 | |
else if (f instanceof TreeBin) { | |
TreeBin<K,V> t = (TreeBin<K,V>)f; | |
TreeNode<K,V> lo = null, loTail = null; | |
TreeNode<K,V> hi = null, hiTail = null; | |
int lc = 0, hc = 0; | |
for (Node<K,V> e = t.first; e != null; e = e.next) { | |
int h = e.hash; | |
TreeNode<K,V> p = new TreeNode<K,V> | |
(h, e.key, e.val, null, null); | |
if ((h & n) == 0) { | |
if ((p.prev = loTail) == null) | |
lo = p; | |
else | |
loTail.next = p; | |
loTail = p; | |
++lc; | |
} | |
else { | |
if ((p.prev = hiTail) == null) | |
hi = p; | |
else | |
hiTail.next = p; | |
hiTail = p; | |
++hc; | |
} | |
} | |
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : | |
(hc != 0) ? new TreeBin<K,V>(lo) : t; | |
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : | |
(lc != 0) ? new TreeBin<K,V>(hi) : t; | |
setTabAt(nextTab, i, ln); | |
setTabAt(nextTab, i + n, hn); | |
setTabAt(tab, i, fwd); | |
advance = true; | |
} | |
} | |
} | |
} | |
} | |
} |
# helpTransfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { | |
Node<K,V>[] nextTab; int sc; | |
// 帮助扩容的前提是,扩容后的数据已经初始化,当前位置已经扩容完成 | |
if (tab != null && (f instanceof ForwardingNode) && | |
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { | |
int rs = resizeStamp(tab.length); | |
while (nextTab == nextTable && table == tab && | |
(sc = sizeCtl) < 0) { | |
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || | |
sc == rs + MAX_RESIZERS || transferIndex <= 0) | |
break; | |
// CAS 设置 sizeCtl,成功的话参与扩容 | |
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { | |
transfer(tab, nextTab); | |
break; | |
} | |
} | |
return nextTab; | |
} | |
return table; | |
} |
# addCount
private final void addCount(long x, int check) { | |
CounterCell[] as; long b, s; | |
// 先尝试 CAS 更新整个 count,失败的话,再利用 CAS 局部更新,依旧失败的话,使用一种比较复杂的方式进行更新 | |
if ((as = counterCells) != null || | |
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { | |
CounterCell a; long v; int m; | |
boolean uncontended = true; | |
if (as == null || (m = as.length - 1) < 0 || | |
(a = as[ThreadLocalRandom.getProbe() & m]) == null || | |
!(uncontended = | |
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { | |
fullAddCount(x, uncontended); | |
return; | |
} | |
if (check <= 1) | |
return; | |
s = sumCount(); | |
} | |
// 需要检查是否扩容 | |
if (check >= 0) { | |
Node<K,V>[] tab, nt; int n, sc; | |
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && | |
(n = tab.length) < MAXIMUM_CAPACITY) { | |
int rs = resizeStamp(n); | |
// 已经有其他线程在扩容 | |
if (sc < 0) { | |
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || | |
sc == rs + MAX_RESIZERS || (nt = nextTable) == null || | |
transferIndex <= 0) | |
break; | |
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) | |
transfer(tab, nt); | |
} | |
// 首次开始扩容 | |
else if (U.compareAndSwapInt(this, SIZECTL, sc, | |
(rs << RESIZE_STAMP_SHIFT) + 2)) | |
transfer(tab, null); | |
s = sumCount(); | |
} | |
} | |
} |
# 并发处理
首先,在变量上,用 volatile 保证可见性;其次,任何涉及数据修改和获取的操作,采用 CAS + Synchronized 的方式,在尽量减少冲突的前提下,保证线程安全性。
下面列举几种处理方式:
方法 | 操作 | 方式 |
---|---|---|
get | 获取数组中的某个元素 | CAS(tabAt) |
put | 获取数组中的某个元素 | CAS(tabAt) |
put | 数组元素初始化赋值 | CAS(casTabAt) |
put | 遍历数组元素所处的链表或红黑树进行插入或覆盖 | Synchronized |
replaceNode | 获取数组中的某个元素 | CAS(tabAt) |
replaceNode | 遍历数组元素所处的链表或红黑树进行替换或删除 | Synchronized |
replaceNode | 数组元素被移除,设置链表的新头节点或树的新根为数组元素 | CAS(setTabAt) |
transfer | 更新 transferIndex 和 sizeCtl | CAS(compareAndSwapInt) |
transfer | 获取数组中的某个元素 | CAS(tabAt) |
transfer | 当旧数组元素为空时设置旧数组元素为 fwd | CAS(casTabAt) |
transfer | 设置新旧数组元素 | CAS(setTabAt) |