# 前言

HashMap 虽然很常用,但是线程不安全,具体哪里会线程不安全呢?可以看看 ConcurrentHash 是如何处理线程安全的,它就是对 HashMap 线程安全的改造。HashTable 也是线程安全的,但是它采用了在方法同步的方式,效率很低。

注:本文仅针对 JDK8。

# 数据结构

数据结构采用 Node 数组 + 链表 + 红黑树。

# Node

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
}

# ForwardingNode

扩容时使用到的结构,标志数组当前位置的元素已经全部迁移完成。

static final class ForwardingNode<K,V> extends Node<K,V> {
	final Node<K,V>[] nextTable;
 
	// 已经迁移完成
	ForwardingNode(Node<K,V>[] tab) {
	  super(MOVED, null, null, null);
	  this.nextTable = tab;
	}
}

# TreeNode

红黑树节点。

static final class TreeNode<K,V> extends Node<K,V> {
  TreeNode<K,V> parent;  
  TreeNode<K,V> left;
  TreeNode<K,V> right;
  TreeNode<K,V> prev;
  boolean red;
}

# TreeBin

红黑树结构。

static final class TreeBin<K,V> extends Node<K,V> {
  TreeNode<K,V> root;
  volatile TreeNode<K,V> first;
  // 等待线程
  volatile Thread waiter;
  // 锁状态
  volatile int lockState;
  // 写锁
  static final int WRITER = 1; 
  // 等待锁
  static final int WAITER = 2; 
  // 读锁
  static final int READER = 4; 
}

# 关键变量

# 常量

// 最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认容量
private static final int DEFAULT_CAPACITY = 16;
// 链表变为红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
// 红黑树变为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 当这个哈希表容量大于该值时,才允许转为红黑树,否则直接扩容
static final int MIN_TREEIFY_CAPACITY = 64;
// 表示数组正在扩容,且当前位置扩容完成
static final int MOVED     = -1; 
// 表示已转为红黑树
static final int TREEBIN   = -2; 
// 该标记的类在 compute 和 computeIf 方法中使用
static final int RESERVED  = -3;

# table

transient volatile Node<K,V>[] table;

存储数据的数组,大小是 2 的倍数,延迟初始化。

# nextTable

private transient volatile Node<K,V>[] nextTable;

扩容用到的数组,table 中的元素都会移动到 nextTable 中。

# baseCount

private transient volatile long baseCount;

基本计数器,主要在没有并发时使用,通过 CAS 更新。

# sizeCtl

private transient volatile int sizeCtl;

如果为负,表示数组正在初始化或调整大小,-1 表示正在初始化,-n 表示有 n - 1 个线程正在进行扩容;如果 table 没有进行初始化,表示 table 初始化大小,如果 table 已经初始化,表示 table 下次扩容的容量。

# transferIndex

private transient volatile int transferIndex;

扩容时的标记,扩容后数组尾部开始,向头部移动,表示最后一个扩容的元素下标。

# 原子操作

@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}

使用了 Unsafe 中提供的原子方法,Unsafe 参考 Java 魔法类:Unsafe 应用解析

# 主要方法

# initTable

private final Node<K,V>[] initTable() {
  Node<K,V>[] tab; int sc;
  while ((tab = table) == null || tab.length == 0) {
    //sizeCtl 小于 0 说明别的线程在初始化,让出 CPU 执行权
    if ((sc = sizeCtl) < 0)
      Thread.yield(); 
    // CAS 设置 sizeCtl 的值为 -1,设置成功则进行初始化
    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
      try {
        if ((tab = table) == null || tab.length == 0) {
          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
          @SuppressWarnings("unchecked")
          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
          table = tab = nt;
          //sizeCtl 设置为 table 容量的 0.75 倍
          sc = n - (n >>> 2);
        }
      } finally {
        sizeCtl = sc;
      }
      break;
    }
  }
  return tab;
}

# put

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // ConcurrentHashMap key 和 value 都不支持空值
    if (key == null || value == null) throw new NullPointerException();
    // 计算哈希
    int hash = spread(key.hashCode());
    // 记录链表元素个数
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        // 1. table 为空,进行初始化,然后继续循环
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 2. table 中目标位置为空,直接进行赋值,然后跳出循环
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;                   
        }
        // 3. 在扩容,帮助扩容,然后继续循环,在扩容后的位置进行插入和替换
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 4. 这种情况下,不需要遍历且没有对元素做改变,不需要加锁,提前判断下,直接返回原值
        else if (onlyIfAbsent 
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
            return fv;
        // 5. 给头结点加锁,并遍历链表或红黑树,进行插入或替换
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 给元素个数加 1
    addCount(1L, binCount);
    return null;
}

put 操作大致流程是:

  1. 数组为空先初始化数组
  2. 数组元素为空直接赋值
  3. 在扩容的话帮助扩容,然后插入到扩容之后的位置
  4. 有元素也没有在扩容的话,锁住元素,然后进行替换或插入,如果是链表且长度大于阈值了就变成红黑树
  5. 最后调整 count(具体细节暂时不太清楚)

# 流程图

有颜色的地方做了并发处理,找到数组元素使用 tabAt 方法,赋值使用 casTabAt 方法,锁住数组元素使用 synchronized 锁。标红部分均在方法内部使用了 Unsafe 的方法来保证原子性。

# get

public V get(Object key) {
    //tab 等于 table
    //e 是 key 在 table 中元素;
    //p 是 e 链表中找到的 key 相等的元素
    //n 是 table 的数组长度
    //eh 是 e 的哈希值
    //ek 是循环链表 e 时,每个元素的 key
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    // 头结点有值
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //eh 和 e 哈希值相等,可能就是目标值
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 头结点哈希值小于 0,说明正在扩容或者是红黑树
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 遍历链表
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get 方法在获取结点元素值时,也使用原子方法来保证读到的值没有被其他线程修改。

# remove

public V remove(Object key) {
    return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //table 为空或头结点为空,说明没有要替换的
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        // 如果在扩容,先帮助扩容,然后再扩容后的位置进行替换
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 锁住头结点,遍历链表或红黑树进行替换
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 链表
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            // 找到目标位置
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                // 当 cv 为空或目标值等于 cv 时才进行替换
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    //value 不为空进行替换
                                    if (value != null)
                                        e.val = value;
                                    //value 为空且不是头结点进行移除
                                    else if (pred != null)
                                        pred.next = e.next;
                                    // 是头结点设置数组元素为链表下一个节点
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            // 未找到目标位置,直接退出
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    // 红黑树
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        // 找到目标位置
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            // 当 cv 为空或目标值等于 cv 时才进行替换
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                //value 不为空进行替换
                                if (value != null)
                                    p.val = value;
                                //value 为空且不是头结点进行移除,并将数组元素指向根节点
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            if (validated) {
                // 如果替换或移除了某个节点,返回旧节点的值
                if (oldVal != null) {
                    // 移除节点进行 count 减一
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

# transfer

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
  	// 扩容由多个线程完成,根据当前 CPU 核数,确认步长
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
    // 初始化要扩容的数组,容量为原来的 2 倍
    if (nextTab == null) {            
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // 扩容起始位置是数组尾部
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 标志类
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 扩容是否向前推进
    boolean advance = true;
    // 是否所有线程都全部迁移完成
    boolean finishing = false; 
    //i 代表当前线程正在迁移的下标,bound 代表本次可以迁移的范围下限
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 上次步长范围内的数据迁移完毕,进行下一个步长
            else if (U.compareAndSetInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        } 
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 迁移完成,更新 table 为 nextTab
            if (finishing) {
                nextTable = null;
                table = nextTab;
                // 扩容预置改为新数组长度的 0.75 倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 当前线程退出扩容,将 sizeCtl - 1
            if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 校验当前 sc 是否和出初始值相等,相等的话说明全部线程迁移完成
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                // 从后往前再检查一遍是否全部被迁移完成
                i = n; 
            }
        }
        // 如果 i 的位置为空,说明一迁移完成,把头节点设置为 fwd 标志
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 若当前头节点是 fwd,说明迁移完成,向前推进
        else if ((fh = f.hash) == MOVED)
            advance = true;
        // 处理当前桶的迁移
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 链表
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // 扩容后两条链表分别放在 i 和 i + n 的位置
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        // 原数组的位置 i 标记迁移完毕
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 红黑树
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                        (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                        (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

# helpTransfer

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 帮助扩容的前提是,扩容后的数据已经初始化,当前位置已经扩容完成
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // CAS 设置 sizeCtl,成功的话参与扩容
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

# addCount

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 先尝试 CAS 更新整个 count,失败的话,再利用 CAS 局部更新,依旧失败的话,使用一种比较复杂的方式进行更新
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    // 需要检查是否扩容
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            // 已经有其他线程在扩容
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 首次开始扩容
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

# 并发处理

首先,在变量上,用 volatile 保证可见性;其次,任何涉及数据修改和获取的操作,采用 CAS + Synchronized 的方式,在尽量减少冲突的前提下,保证线程安全性。

下面列举几种处理方式:

方法操作方式
get获取数组中的某个元素CAS(tabAt)
put获取数组中的某个元素CAS(tabAt)
put数组元素初始化赋值CAS(casTabAt)
put遍历数组元素所处的链表或红黑树进行插入或覆盖Synchronized
replaceNode获取数组中的某个元素CAS(tabAt)
replaceNode遍历数组元素所处的链表或红黑树进行替换或删除Synchronized
replaceNode数组元素被移除,设置链表的新头节点或树的新根为数组元素CAS(setTabAt)
transfer更新 transferIndex 和 sizeCtlCAS(compareAndSwapInt)
transfer获取数组中的某个元素CAS(tabAt)
transfer当旧数组元素为空时设置旧数组元素为 fwdCAS(casTabAt)
transfer设置新旧数组元素CAS(setTabAt)