专栏名称: 石杉的架构笔记
专注原创、用心雕琢!十余年BAT一线大厂架构经验倾囊相授
目录
相关文章推荐
成都本地宝  ·  成都市中心新地标+1!最新进展→ ·  昨天  
清廉蓉城  ·  中国纪检监察报关注成都:居有所安 ·  2 天前  
成都本地宝  ·  太美了!成都10个赏梅花好去处!大部分免费! ·  3 天前  
成都本地宝  ·  成都能待一整天的6个室内场馆!部分免费! ·  4 天前  
成都本地宝  ·  正式命名!四川再添一座机场! ·  5 天前  
51好读  ›  专栏  ›  石杉的架构笔记

ConcurrentHashMap中有十个提升性能的细节,你都知道吗?

石杉的架构笔记  · 公众号  ·  · 2021-04-23 09:26

正文

点击上方蓝色“ 石杉的架构笔记”,选择“设为星标”

回复“PDF”获取独家整理的学习资料!

长按扫描上方 免费领取


一些题外话

如何在高并发下提高系统吞吐是所有后端开发者追求的目标,Java并发的开创者 Doug Lea在 Java 7 ConcurrentHashMap的设计中给出了一些参考答案,本文详细的总结了 ConcurrentHashMap源码中影响并发性能的十个细节,有常见的自旋锁,CAS的使用,也有延迟写内存,volatile语义退化等不常见的技巧,希望对大家的开发设计有所帮助。

由于 ConcurrentHashMap 的内容比较多,而且 Java 7 Java 8 两个版本的实现相差比较大,如果采用我们上篇中对比的那种行文思路,在有限的篇幅中难免会遗漏一些细节,因此我决定采用两篇文章去详细阐述两个版本中 ConcurrentHashMap 技术细节,不过为了帮助读者体系化的理解,三篇文章(包含 HashMap 的那一篇)整体文章的结构将保持一致。

把书读薄

《阿里巴巴Java开发手册》的作者孤尽对 ConcurrentHashMap 的设计十分推崇,他说:“ ConcurrentHashMap 源码是学习 Java 代码开发规范的一个非常好的学习材料,我建议同学们可以时常去看一看,总会有新的收货的”,相信大家平常也能听到很多对于 ConcurrentHashMap 设计的溢美之词,在展开隐藏在 ConcurrentHashMap 所有小秘密之前,大家在大脑中首先要有这样的一幅图:
img
对于 Java 7 来说,这张图已经能完全把 ConcurrentHashMap 的架构说清楚了:
  1. ConcurrentHashMap 是一个线程安全的 Map 实现,其读取不需要加锁,通过引入 Segment ,可以做到写入的时候加锁力度足够小
  2. 由于引入了 Segment ConcurrentHashMap 在读取和写入的时候需要需要做两次哈希,但这两次哈希换来的是更细力粒度的锁,也就意味着可以支持更高的并发
  3. 每个桶数组中的 key-value 对仍然以链表的形式存放在桶中,这一点和 HashMap 是一致的。

把书读厚

关于 Java 7 ConcurrentHashMap 的整体架构,用上面三两句话就可以概括,这张图应该很快就可以在大家的大脑中留下印象,接下来我们通过几个问题来尝试吸引大家继续看下去,把书读厚:
  1. ConcurrentHashMap 的哪些操作需要加锁?
  2. ConcurrentHashMap 的无锁读是如何实现的?
  3. 在多线程的场景下调用 size() 方法获取 ConcurrentHashMap 的大小有什么挑战? ConcurrentHashMap 是怎么解决的?
  4. 在有 Segment 存在的前提下,应该如何扩容的?
在上一篇文章中我们总结了 HashMap 中最重要的点有四个: 初始化 数据寻址- hash 方法 数据存储- put 方法 , 扩容- resize 方法 ,对于 ConcurrentHashMap 来说,这四个操作依然是最重要的,但由于其引入了更复杂的数据结构,因此在调用 size() 查看整个 ConcurrentHashMap 的数量大小的时候也有不小的挑战,我们也会重点看下Doug Lea在 size() 方法中的设计

初始化

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 保证ssize是大于concurrencyLevel的最小的2的整数次幂
    while (ssize         ++sshift;
        ssize <<= 1;
    }
    // 寻址需要两次哈希,哈希的高位用于确定segment,低位用户确定桶数组中的元素
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize         ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap         cap <<= 1;
    Segment s0 = new Segment(loadFactor, (int)(cap * loadFactor), (HashEntry[])new HashEntry[cap]);
    Segment[] ss = (Segment[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
初始化方法中做了三件重要的事:
  1. 确定了 segments 的数组的大小 ssize ssize 根据入参 concurrencyLevel 确定,取大于 concurrencyLevel 的最小的2的整数次幂
  2. 确定哈希寻址时的偏移量,这个偏移量在确定元素在 segment 数组中的位置时会用到
  3. 初始化 segment 数组中的第一个元素,元素类型为 HashEntry 的数组,这个数组的长度为 initialCapacity / ssize ,即初始化大小除以 segment 数组的大小, segment 数组中的其他元素在后续 put 操作时参考第一个已初始化的实例初始化
static final class HashEntry<K,V{
    final int hash; 
    final K key;
    volatile V value;
    volatile HashEntry next; 
 
    HashEntry(int hash, K key, V value, HashEntry next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }
    final void setNext(HashEntry n) {
        UNSAFE.putOrderedObject(this, nextOffset, n);
    }
}
这里的 HashEntry HashMap 中的 HashEntry 作用是一样的,它是 ConcurrentHashMap 的数据项,这里要注意两个细节:
细节一:
HashEntry 的成员变量 value next 是被关键字 volatile 修饰的,也就是说所有线程都可以及时检查到其他线程对这两个变量的改变,因而可以在不加锁的情况下读取到这两个引用的最新值
细节二:
HashEntry setNext 方法中调用了 UNSAFE.putOrderedObject ,这个接口是属于 sun 安全库中的 api ,并不是 J2SE 的一部分,它的作用和 volatile 恰恰相反,调用这个 api 设值是使得 volatile 修饰的变量延迟写入主存,那到底是什么时候写入主存呢?
JMM中有一条规定:
对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)
后文在讲 put 方法的时候我们再详细看 setNext 的用法

哈希

由于引入了 segment ,因此不管是调用 get 方法读还是调用 put 方法写,都需要做两次哈希,还记得在上文我们讲初始化的时候系统做了一件重要的事:
  • 确定哈希寻址时的偏移量,这个偏移量在确定元素在 segment 数组中的位置时会用到
没错就是这段代码:
this.segmentShift = 32 - sshift;
这里用32去减是因为 int 型的长度是32,有了 segmentShift ConcurrentHashMap 是如何做第一次哈希的呢?
public V put(K key, V value) {
    Segment s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // 变量j代表着数据项处于segment数组中的第j项
    int j = (hash >>> segmentShift) & segmentMask;
        // 如果segment[j]为null,则下面的这个方法负责初始化之
        s = ensureSegment(j); 
    return s.put(key, hash, value, false);
}
我们以 put 方法为例,变量 j 代表着数据项处于 segment 数组中的第 j 项。如下图所示假如 segment 数组的大小为2的n次方,则 hash >>> segmentShift 正好取了key的哈希值的高n位,再与掩码 segmentMask 相与相当与仍然用key的哈希的高位来确定数据项在 segment 数组中的位置。
image-20210409232020703
hash 方法与非线程安全的 HashMap 相似,这里不再细说。
细节三:
在延迟初始化 Segment 数组时,作者采用了 CAS 避免了加锁,而且 CAS 可以保证最终的初始化只能被一个线程完成。在最终决定调用 CAS 进行初始化前又做了两次检查,第一次检查可以避免重复初始化 tab 数组,而第二次检查则可以避免重复初始化 Segment 对象,每一行代码作者都有详细的考虑。
private Segment ensureSegment(int k) {
    final Segment[] ss = this.segments;
    long u = (k     Segment seg;
    if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry[] tab = (HashEntry[])new HashEntry[cap];
        if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck 再检查一次是否已经被初始化
            Segment s = new Segment(lf, threshold, tab);
            while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) // 使用 CAS 确保只被初始化一次
                    break;
            }
        }
    }
    return seg;
}

put 方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry node = tryLock() ? null : scanAndLockForPut(key, hash, value); 
    V oldValue;
    try {
        HashEntry[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry first = entryAt(tab, index);
        for (HashEntry e = first;;) {
            if (e != null) {
                K k; // 如果找到key相同的数据项,则直接替换
                if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if  (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount; 
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    // node不为空说明已经在自旋等待时初始化了,注意调用的是setNext,不是直接操作next
                    node.setNext(first); 
                else
                    // 否则,在这里新建一个HashEntry
                    node = new HashEntry(hash, key, value, first);
                int c = count + 1// 先加1
                if (c > threshold && tab.length                     rehash(node);
                else
                    // 将新节点写入,注意这里调用的方法有门道
                    setEntryAt(tab, index, node); 
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}
这段代码在整个 ConcurrentHashMap 的设计中非常出彩,在这短短的40行代码中,Doug Lea就像一位神奇的魔术师,转眼间已经变换了好几种魔法,让人目瞪口呆,感叹其对并发的理解之深,让我们慢慢的解析Doug Lea在这段代码中使用的魔法:
细节四:
CPU的调度是公平的,好不容易轮到的时间片如果因为获取不到锁就将本线程挂起无疑会降低本线程的效率,更何况挂起之后还要重新调度,切换上下文,又是一笔不小的开销。如果可以遇见其他线程占有锁的时间不会很长,采用自旋将会是一个比较好的选择,在这里面也有一个权衡,如果别的线程占有锁的时间过长,反而是挂起阻塞等待性能好一点,我们来看下 ConcurrentHashMap 的做法:
private HashEntry scanAndLockForPut(K key, int hash, V value) {
    HashEntry first = entryForHash(this, hash);
    HashEntry e = first;
    HashEntry node = null;
    int retries = -1// negative while locating node
    while (!tryLock()) { // 自旋等待
        HashEntry f; // to recheck first below
        if (retries 0) {
            if (e == null) { // 这个桶中还没有写入k-v项
                if (node == null// speculatively create node 直接创建一个新的节点
                    node = new HashEntry(hash, key, value, null);
                retries = 0;  
            }
            // key值相等,直接跳出去尝试获取锁
            else if (key.equals(e.key))
                retries = 0;
            else // 遍历链表
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            // 自旋等待超过一定次数之后只能挂起线程,阻塞等待了
            lock();
            break;
        }
        else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { 
            // 如果头节点改变了,则重置次数,继续自旋等待
            e = first = f; 
            retries = -1
        }
    }
    return node;
}
ConcurrentHashMap 的策略是自旋 MAX_SCAN_RETRIES 次,如果还没有获取到锁则调用 lock 挂起阻塞等待,当然如果其他线程采用头插法改变了链表的头结点,则重置自旋等待次数。
细节五:
要知道,如果要从编码的角度提升系统的并发度,一个黄金法则就是减少并发临界区的大小。在 scanAndLockForPut 这个方法的设计上,有个小细节让我眼前一亮,就是在自旋的过程中初始化了一个 HashEntry ,这样做的好处就是线程在拿到锁之后不用初始化 HashEntry 了,占有锁的时间相应减小,进而提升性能。
细节六:
put 方法的开头,有这么一行不起眼的代码:
HashEntry[] tab = table;
看起来好像就是简单的临时变量赋值,其实大有来头,我们看一下 table 的声明:
transient volatile HashEntry[] table;
table 变量被关键字 volatile 修饰, CPU 在处理 volatile 修饰的变量的时候会有下面的行为:
嗅探
每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到处理器缓存里
因此直接读取这类变量的读取和写入比普通变量的性能消耗更大,因此在 put 方法的开头将 table 变量赋值给一个普通的本地变量目的是为了消除 volatile 带来的性能损耗。这里就有另外一个问题:那这样做会不会导致 table 的语义改变,让别的线程读取不到最新的值呢?别着急,我们接着看。
细节七:
注意 put 方法中的这个方法: entryAt() :
static final  HashEntry entryAt(HashEntry[] tab, int i) {
    return (tab == null) ? null : (HashEntry) UNSAFE.getObjectVolatile(tab, ((long)i <}
这个方法的底层会调用 UNSAFE.getObjectVolatile ,这个方法的目的就是对于普通变量读取也能像 volatile 修饰的变量那样读取到最新的值,在前文中我们分析过,由于变量 tab 现在是一个普通的临时变量,如果直接调用 tab[i] 不一定能拿到最新的首节点的。细心的读者读到这里可能会想:Doug Lea是不是糊涂了,兜兜转换不是回到了原点么,为啥不刚开始就操作 volatile 变量呢,费了这老大劲。我们继续往下看。
细节八:
put 方法的实现中,如果链表中没有 key 值相等的数据项,则会把新的数据项插入到链表头写入到数组中,其中调用的方法是:
static final  void setEntryAt(HashEntry[] tab, int i, HashEntry e) {
    UNSAFE.putOrderedObject(tab, ((long)i <}
putOrderedObject 这个接口写入的数据不会马上被其他线程获取到,而是在 put 方法最后调用 unclock 后才会对其他线程可见,参见前文中对JMM的描述:
对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)
这样的好处有两个,第一是性能,因为在持有锁的临界区不需要有同步主存的操作,因此持有锁的时间更短。第二是保证了数据的一致性,在 put 操作的 finally 语句执行完之前, put 新增的数据是不对其他线程展示的,这是 ConcurrentHashMap 实现无锁读的关键原因。
我们在这里稍微总结一下 put 方法里面最重要的三个细节,首先将 volatile 变量转为普通变量提升性能,因为在 put 中需要读取到最新的数据,因此接下来调用 UNSAFE.getObjectVolatile 获取到最新的头结点,但是通过调用 UNSAFE.putOrderedObject 让变量写入主存的时间延迟到 put 方法的结尾,一来缩小临界区提升性能,而来也能保证其他线程读取到的是完整数据。
细节九:
如果 put 真的需要往链表头插入数据项,那也得注意了, ConcurrentHashMap 相应的语句是:
node.setNext(first);
我们看下 setNext 的具体实现:
final void setNext(HashEntry n) {
    UNSAFE.putOrderedObject(this, nextOffset, n);
}
因为 next 变量是用 volatile 关键字修饰的,这里调用 UNSAFE.putOrderedObject 相当于是改变了 volatile 的语义,这里面的考量有两个,第一个仍然是性能,这样的实现性能明显更高,这一点前文已经详细的分析过,第二点是考虑了语义的一致性,对于 put 方法来说因为其调用的是 UNSAFE.getObjectVolatile ,仍然能获取到最新的数据,对于 get 方法,在 put 方法未结束之前,是不希望不完整的数据被其他线程通过 get 方法读取的,这也是合理的。






请到「今天看啥」查看全文