专栏名称: 石杉的架构笔记
专注原创、用心雕琢!十余年BAT一线大厂架构经验倾囊相授
目录
相关文章推荐
上海市场监管  ·  惊呆!男子口吐白沫,一查两肺全白了!竟是因为 ... ·  11 小时前  
上海市场监管  ·  惊呆!男子口吐白沫,一查两肺全白了!竟是因为 ... ·  11 小时前  
南昌晚报  ·  孙颖莎力克张本美和,国乒包揽女单4强 ·  21 小时前  
南昌晚报  ·  孙颖莎力克张本美和,国乒包揽女单4强 ·  21 小时前  
全网快资讯  ·  人性最大的恶,是不懂感恩 ·  3 天前  
51好读  ›  专栏  ›  石杉的架构笔记

Java 8 ConcurrentHashMap源码中竟然隐藏着两个BUG

石杉的架构笔记  · 公众号  ·  · 2021-04-29 09:31

正文

点击上方蓝色“ 石杉的架构笔记”,选择“设为星标”

回复“PDF”获取独家整理的学习资料!

长按扫描上方 免费领取


Java 7 ConcurrenHashMap 源码 我建议大家都看看,那个版本的源码就是 Java 多线程编程的教科书。在 Java 7 的源码中,作者对悲观锁的使用非常谨慎,大多都转换为自旋锁加 volatile 获得相同的语义,即使最后迫不得已要用,作者也会通过各种技巧减少锁的临界区。在上一篇文章中我们也有讲到,自旋锁在临界区比较小的时候是一个较优的选择是因为它避免了线程由于阻塞而切换上下文,但本质上它也是个锁,在自旋等待期间只有一个线程能进入临界区,其他线程只会自旋消耗 CPU 的时间片。 Java 8 ConcurrentHashMap 的实现通过一些巧妙的设计和技巧,避开了自旋锁的局限,提供了更高的并发性能。如果说 Java 7 版本的源码是在教我们如何将悲观锁转换为自旋锁,那么在 Java 8 中我们甚至可以看到如何将自旋锁转换为无锁的方法和技巧。

把书读薄

image

图片来源:https://www.zhenchao.org/2019/01/31/java/cas-based-concurrent-hashmap/

在开始本文之前,大家首先在心里还是要有这样的一张图,如果有同学对 HashMap 比较熟悉,那这张图也应该不会陌生。事实上在整体的数据结构的设计上 Java 8 ConcurrentHashMap HashMap 基本上是一致的。

Java 7 ConcurrentHashMap 为了提升性能使用了很多的编程技巧,但是引入 Segment 的设计还是有很大的改进空间的, Java 7 ConcurrrentHashMap 的设计有下面这几个可以改进的点:

  1. Segment 在扩容的时候非扩容线程对本 Segment 的写操作时都要挂起等待的
  2. ConcurrentHashMap 的读操作需要做两次哈希寻址,在读多写少的情况下其实是有额外的性能损失的
  3. 尽管 size() 方法的实现中先尝试无锁读,但是如果在这个过程中有别的线程做写入操作,那调用 size() 的这个线程就会给整个 ConcurrentHashMap 加锁,这是整个 ConcurrrentHashMap 唯一一个全局锁,这点对底层的组件来说还是有性能隐患的
  4. 极端情况下(比如客户端实现了一个性能很差的哈希函数) get() 方法的复杂度会退化到 O(n)

针对1和2,在 Java 8 的设计是废弃了 Segment 的使用,将悲观锁的粒度降低至桶维度,因此调用 get 的时候也不需要再做两次哈希了。 size() 的设计是 Java 8 版本中最大的亮点,我们在后面的文章中会详细说明。至于红黑树,这篇文章仍然不做过多阐述。接下来的篇幅会深挖细节,把书读厚,涉及到的模块有:初始化, put 方法, 扩容方法 transfer 以及 size() 方法,而其他模块,比如 hash 函数等改变较小,故不再深究。

准备知识

ForwardingNode

static final class ForwardingNode<K,Vextends Node<K,V{
    final Node[] nextTable;
    ForwardingNode(Node[] tab) {
        // MOVED = -1,ForwardingNode的哈希值为-1
        super(MOVED, nullnullnull);
        this.nextTable = tab;
    }
}

除了普通的 Node TreeNode 之外, ConcurrentHashMap 还引入了一个新的数据类型 ForwardingNode ,我们这里只展示他的构造方法, ForwardingNode 的作用有两个:

  • 在动态扩容的过程中标志某个桶已经被复制到了新的桶数组中
  • 如果在动态扩容的时候有 get 方法的调用,则 ForwardingNode 将会把请求转发到新的桶数组中,以避免阻塞 get 方法的调用, ForwardingNode 在构造的时候会将扩容后的桶数组 nextTable 保存下来。

UNSAFE.compareAndSwap***

这是在 Java 8 版本的 ConcurrentHashMap 实现 CAS 的工具,以 int 类型为例其方法定义如下:

/**
* Atomically update Java variable to x if it is currently
* holding expected.
@return true if successful
*/

public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x)
;

相应的语义为:

如果对象 o 起始地址偏移量为 offset 的值等于 expected ,则将该值设为 x ,并返回 true 表明更新成功,否则返回 false ,表明 CAS 失败

初始化

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity 0 || concurrencyLevel <= 0// 检查参数
        throw new IllegalArgumentException();
    if (initialCapacity         initialCapacity = concurrencyLevel;
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor,求不小于size的 2^n的算法,jdk1.8的HashMap中说过
    this.sizeCtl = cap; 
}

即使是最复杂的一个初始化方法代码也是比较简单的,这里我们只需要注意两个点:

  • concurrencyLevel Java 7 中是 Segment 数组的长度,由于在 Java 8 中已经废弃了 Segment ,因此 concurrencyLevel 只是一个保留字段,无实际意义
  • sizeCtl 这个值第一次出现,这个值如果等于-1则表明系统正在初始化,如果是其他负数则表明系统正在扩容,在扩容时 sizeCtl 二进制的低十六位等于扩容的线程数加一,高十六位(除符号位之外)包含桶数组的大小信息

put 方法

public V put(K key, V value) {
    return putVal(key, value, false);
}

put 方法将调用转发到 putVal 方法:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == nullthrow new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node[] tab = table;;) {
        Node f; int n, i, fh;
        // 【A】延迟初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 【B】当前桶是空的,直接更新
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                            new Node(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 【C】如果当前的桶的第一个元素是一个ForwardingNode节点,则该线程尝试加入扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 【D】否则遍历桶内的链表或树,并插入
        else {
            // 暂时折叠起来,后面详细看
        }
    }
    // 【F】流程走到此处,说明已经put成功,map的记录总数加一
    addCount(1L, binCount);
    return null;
}

从整个代码结构上来看流程还是比较清楚的,我用括号加字母的方式标注了几个非常重要的步骤, put 方法依然牵扯出很多的知识点

桶数组的初始化

private final Node[] initTable() {
    Node[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) 0)
            // 说明已经有线程在初始化了,本线程开始自旋
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            // CAS保证只有一个线程能走到这个分支
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node[] nt = (Node[])new Node,?>[n];
                    table = tab = nt;
                    // sc = n - n/4 = 0.75n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 恢复sizeCtl > 0相当于释放锁
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

在初始化桶数组的过程中,系统如何保证不会出现并发问题呢,关键点在于自旋锁的使用,当有多个线程都执行 initTable 方法的时候, CAS 可以保证只有一个线程能够进入到真正的初始化分支,其他线程都是自旋等待。这段代码中我们关注三点即可:

  • 依照前文所述,当有线程开始初始化桶数组时,会通过 CAS sizeCtl 置为-1,其他线程以此为标志开始自旋等待
  • 当桶数组初始化结束后将 sizeCtl 的值恢复为正数,其值等于0.75倍的桶数组长度,这个值的含义和之前 HashMap 中的 THRESHOLD 一致,是系统触发扩容的临界点
  • finally 语句中对 sizeCtl 的操作并没有使用 CAS 是因为 CAS 保证只有一个线程能够执行到这个地方

添加桶数组第一个元素

static final  Node tabAt(Node[] tab, int i) {
    return (Node)U.getObjectVolatile(tab, ((long)i <}

static final  boolean casTabAt(Node[] tab, int i,
                                    Node c, Node v)
 
{
    return U.compareAndSwapObject(tab, ((long)i <}

put 方法的第二个分支会用 tabAt 判断当前桶是否是空的,如果是则会通过 CAS 写入, tabAt 通过 UNSAFE 接口会拿到桶中的最新元素, casTabAt 通过 CAS 保证不会有并发问题,如果 CAS 失败,则通过循环再进入其他分支

判断是否需要新增线程扩容

final Node[] helpTransfer(Node[] tab, Node f) {
    Node[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
                (sc = sizeCtl) 0) {
            // RESIZE_STAMP_SHIFT = 16
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 这里将sizeCtl的值自增1,表明参与扩容的线程数量+1
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

在这个地方我们就要详细说下 sizeCtl 这个标志位了,临时变量 rs resizeStamp 这个方法返回

static final int resizeStamp(int n) {
    // RESIZE_STAMP_BITS = 16
    return Integer.numberOfLeadingZeros(n) | (1 <1));
}

因为入参 n 是一个 int 类型的值,所有 Integer.numberOfLeadingZeros(n) 的返回值介于0到32之间,如果转换成二进制

  • Integer.numberOfLeadingZeros(n) 的最大值是:00000000 00000000 00000000 00100000
  • Integer.numberOfLeadingZeros(n) 的最小值是:00000000 00000000 00000000 00000000

因此 resizeStampd 的返回值也就介于 00000000 00000000 10000000 00000000 00000000 00000000 10000000 00100000 之间,从这个返回值的范围可以看出来 resizeStamp 的返回值高16位全都是0,是不包含任何信息的。因此在 ConcurrrentHashMap 中,会把 resizeStamp 的返回值左移16位拼到 sizeCtl 中,这就是为什么 sizeCtl 的高16位包含整个 Map 大小的原理。有了这个分析,这段代码中比较长的 if 判断也就能看懂了

if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    sc == rs + MAX_RESIZERS || transferIndex <= 0)
    break;
  • (sc >>> RESIZE_STAMP_SHIFT) != rs 保证所有线程要基于同一个旧的桶数组扩容
  • transferIndex <= 0 已经有线程完成扩容任务了

至于 sc == rs + 1 || sc == rs + MAX_RESIZERS 这两个判断条件如果是细心的同学一定会觉得难以理解,这个地方确实是JDK的一个 BUG ,这个 BUG 已经在 JDK 12 中修复,详细情况可以参考一下Oracle的官网:https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427,这两个判断条件应该写成这样: sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS ,因为直接比较 rs sc 是没有意义的,必须要有移位操作。它表达的含义是

  • sc == (rs << RESIZE_STAMP_SHIFT) + 1 当前扩容的线程数为0,即已经扩容完成了,就不需要再新增线程扩容
  • sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS 参与扩容的线程数已经到了最大,就不需要再新增线程扩容

真正扩容的逻辑在 transfer 方法中,我们后面会详细看,不过有个小细节可以提前注意,如果 nextTable 已经初始化了, transfer 会返回 nextTable 的的引用,后续可以直接操作新的桶数组。

插入新值

如果桶数组已经初始化好了,该扩容的也扩容了,并且根据哈希定位到的桶中已经有元素了,那流程就跟普通的 HashMap 一样了,唯一一点不同的就是,这时候要给当前的桶加锁,且看代码:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == nullthrow new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node[] tab = table;;) {
        Node f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)// 折叠
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 折叠}
        else if ((fh = f.hash) == MOVED)// 折叠
        else {
            V oldVal = null;
            synchronized (f) {
                // 要注意这里这个不起眼的判断条件
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) { // fh>=0的节点是链表,否则是树节点或者ForwardingNode
                        binCount = 1;
                        for (Node e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                                oldVal = e.val; // 如果链表中有值了,直接更新
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node pred = e;
                            if ((e = e.next) == null) {
                                // 如果流程走到这里,则说明链表中还没值,直接连接到链表尾部
                                pred.next = new Node(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    // 红黑树的操作先略过
                }
            }
        }
    }
    // put成功,map的元素个数+1
    addCount(1L, binCount);
    return null;
}

这段代码中要特备注意一个不起眼的判断条件(上下文在源码上已经标注出来了): tabAt(tab, i) == f ,这个判断的目的是为了处理调用 put 方法的线程和扩容线程的竞争。因为 synchronized 是阻塞锁,如果调用 put 方法的线程恰好和扩容线程同时操作同一个桶,且调用 put 方法的线程竞争锁失败,等到该线程重新获取到锁的时候,当前桶中的元素就会变成一个 ForwardingNode ,那就会出现 tabAt(tab, i) != f 的情况。

多线程动态扩容

private final void transfer(Node[] tab, Node[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n)         stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // 初始化新的桶数组
        try {
            @SuppressWarnings("unchecked")
            Node[] nt = (Node[])new Node,?>[n <1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode fwd = new ForwardingNode(nextTab);
    boolean advance = true;
    boolean finishing = false// to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                        nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n <1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 判断是会否是最后一个扩容线程
                if ((sc - 2) != resizeStamp(n) <                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED) // 只有最后一个扩容线程才有机会执行这个分支
            advance = true// already processed
        else { // 复制过程与HashMap类似,这里不再赘述
            synchronized (f) {
               // 折叠
            }
        }
    }
}

在深入到源码细节之前我们先根据下图看一下在 Java 8 ConcurrentHashMap 扩容的几个特点:

  • 新的桶数组 nextTable 是原先桶数组长度的2倍,这与之前 HashMap 一致

  • 参与扩容的线程也是分段将 table 中的元素复制到新的桶数组 nextTable

  • 桶一个桶数组中的元素在新的桶数组中均匀的分布在两个桶中,桶下标相差n(旧的桶数组的长度),这一点依然与 HashMap 保持一致

image-20210424202636495

各个线程之间如何通力协作

先看一个关键的变量 transferIndex ,这是一个被 volatile 修饰的变量,这一点可以保证所有线程读到的一定是最新的值。

private transient volatile int transferIndex;

这个值会被第一个参与扩容的线程初始化,因为只有第一个参与扩容的线程才满足条件 nextTab == null

if (nextTab == null) {            // initiating
    try {
        @SuppressWarnings("unchecked")
        Node[] nt = (Node[])new Node,?>[n <1];
        nextTab = nt;
    } catch (Throwable ex) {      // try to cope with OOME
        sizeCtl = Integer.MAX_VALUE;
        return;
    }
    nextTable = nextTab;
    transferIndex = n;
}

在了解了 transferIndex 属性的基础上,上面的这个循环就好理解了

while (advance) {
    int nextIndex, nextBound;
      // 当bound <= i <= transferIndex的时候i自减跳出这个循环继续干活
    if (--i >= bound || finishing)
        advance = false;
    // 扩容的所有任务已经被认领完毕,本线程结束干活
    else if ((nextIndex = transferIndex) <= 0) {
        i = -1;
        advance = false;
    }
    // 否则认领新的一段复制任务,并通过`CAS`更新transferIndex的值
    else if (U.compareAndSwapInt
                (this, TRANSFERINDEX, nextIndex,
                nextBound = (nextIndex > stride ?
                            nextIndex - stride : 0))) {
        bound = nextBound;
        i = nextIndex - 1;
        advance = false;
    }
}

transferIndex 就像是一个游标,每个线程认领一段复制任务的时候都会通过CAS将其更新为 transferIndex - stride CAS 可以保证 transferIndex 可以按照 stride 这个步长降到0。

最后一个扩容线程需要二次确认?

对于每一个扩容线程, for 循环的变量 i 代表要复制的桶的在桶数组中的下标,这个值的上限和下限通过游标 transferIndex 和步长 stride 计算得来,当 i 减小为负数,则说明当前扩容线程完成了扩容任务,这时候流程会走到这个分支:

// i >= n || i + n >= nextn现在看来取不到
if (i 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) { // 【A】完成整个扩容过程
        nextTable = null;
        table = nextTab;
        sizeCtl = (n <1) - (n >>> 1); 
        return;
    }
    // 【B】判断是否是最后一个扩容线程,如果是,则需要重新扫描一遍桶数组,做二次确认
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        // (sc - 2) == resizeStamp(n) <
        if ((sc - 2) != resizeStamp(n) <            return;
        // 重新扫描一遍桶数组,做二次确认
        finishing = advance = true;
        i = n; // recheck before commit
    }
}

因为变量 finishing 被初始化为 false ,所以当线程第一次进入这个 if 分支的话,会先执行注释为【B】的这个分支,同时因为 sizeCtl 的低16位被初始化为参与扩容的线程数加一,因此,当条件 (sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 满足时,就能证明当前线程就是最后一个扩容线程了,这这时候将 i 置为 n 重新扫描一遍桶数组,并且将 finishing 置为 true 保证当桶数组被扫描结束后能够进入注释为【A】的分支结束扩容。

这里就有一个问题,按照我们前面的分析,扩容线程能够通力协作,保证各自负责的桶数组的分段不重不漏,这里为什么还需要做二次确认么?有一个开发者在 concurrency-interest 这个邮件列表中也关于这件事咨询了 Doug Lea (地址:http://cs.oswego.edu/pipermail/concurrency-interest/2020-July/017171.html),他给出的回复是:

Yes, this is a valid point; thanks. The post-scan was needed in a previous version, and could be removed. It does not trigger often enough to matter though, so is for now another minor tweak that might be included next time CHM is updated.

虽然 Doug 在邮件中的措辞用了could be, not often enough等,但也确认了最后一个扩容线程的二次检查是没有必要的。具体的复制过程与 HashMap 类似,感兴趣的读者可以翻一下 高端的面试从来不会在HashMap的红黑树上纠缠太多 这篇文章。

size() 方法

addCount()方法

// 记录map元素总数的成员变量
private transient volatile long baseCount;

put 方法的最后,有一个 addCount 方法,因为 putVal 执行到此处说明已经成功新增了一个元素,所以 addCount 方法的作用就是维护当前 ConcurrentHashMap 的元素总数,在 ConcurrentHashMap 中有一个变量 baseCount 用来记录 map 中元素的个数,如下图所示,如果同一时刻有n个线程通过CAS同时操作 baseCount 变量,有且仅有一个线程会成功,其他线程都会陷入无休止的自旋当中,那一定会带来性能瓶颈。

image-20210420221407349

为了避免大量线程都在自旋等待写入 baseCount ConcurrentHashMap 引入了一个辅助队列,如下图所示,现在操作 baseCount 的线程可以分散到这个辅助队列中去了,调用 size() 的时候只需要将 baseCount 和辅助队列中的数值相加即可,这样就实现了调用 size() 无需加锁。

image-20210420222306734

辅助队列是一个类型为 CounterCell 的数组:

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

可以简单理解为只是包装了一个 long 型的变量 value ,还需要解决一个问题是,对于某个具体的线程它是如何知道操作辅助队列中的哪个值呢?答案是下面的这个方法:

static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

getProbe 方法会返回当前线程的一个唯一身份码,这个值是不会变的,因此可以将 getProbe 的返回值与辅助队列的长度作求余运算得到具体的下标,它的返回值可能是0,如果返回0则需要调用 ThreadLocalRandom.localInit() 初始化。 addCount 方法中有两个细节需要注意

private final void addCount(long x, int check) {
    CounterCell[] as; long






请到「今天看啥」查看全文