戳这里,加关注哦~
ConcurrentHashMap是个老生常谈的集合类了,我们都知道多线程环境下不能直接使用HashMap,而需要使用ConcurrentHashMap,但有没有了解过ConcurrentHashMap到底是如何实现线程安全的呢?他到底跟传统的Hashtable和SynchronizeMap(没听过SynchronizeMap?他就是Collections.synchronizeMap方法返回的对象)到底好在哪?
ConcurrentHashMap建立在HashMap的基础上实现了线程安全。从散列表的三大要素:哈希函数、哈希冲突、扩容方案、以及线程安全展开详解HashMap的设计。关于HashMap的内容本文不再赘述,读者若对HashMap的底层设计不了解,一定要先去阅读前面的文章。ConcurrentHashMap中蕴含的并发编程智慧是非常值得我们学习的,正如文章开头的两个问题,你会如何解决呢?可能会直接上锁或用更高性能的CAS,但ConcurrentHashMap给了我们更不一样的解决方案。
本文的主要内容是讲解ConcurrentHashMap中的并发设计,重点分析ConcurrentHashMap的四个方法源码:putVal、initTable、addCount、transfer。分析每个方法前会使用图解介绍ConcurrentHashMap的核心思路。源码中我加了非常详细的注释,有时间仍建议读者阅读完源码,ConcurrentHashMap的并发智慧,都蕴含在源码中。那么我们开始吧~
CAS与自旋锁
CAS是ConcurrentHashMap中的一个重点,也是ConcurrentHashMap提升性能的根基所在。在阅读源码中,可以发现CAS无处不在。在介绍ConcurrentHashMap前,必须先介绍一下这两个重点。Java中的运算并不是原子操作,如count++可分为:
如果在第一步之后,count被其他的线程修改了,第三步的赋值会直接覆盖掉其他线程的修改。synchronize可以解决这个问题,但上锁为重量级操作,严重影响性能,CAS是更好的解决方案。
CAS的思路并不复杂。还是上面的例子:当我们需要对变量count进行自增时,我们可以认为没有发生并发冲突,先存储一个count副本,再对count进行自增,然后把副本和count本身进行比较,如果两者相同,则证明没有发生并发冲突,修改count的值;如果不同,则说明count在我们自增的过程中被修改了,把上述整个过程重新来一遍,直到修改成功为止,如下图:
img
那,如果我们在判断count==count_之后,count被修改了怎么办?比较赋值的操作操作系统会保证的原子性,保证不会出现这种情况。在java中常见的CAS方法有:
// 比较并替换 U.compareAndSwapInt(); U.compareAndSwapLong(); U.compareAndSwapObject();
在后续的源码中,我们会经常看到他们。通过这种思路,我们不需要给count变量上锁。但如果并发度过高,处理时间过长,则会导致某些线程一直在循环自旋,浪费cpu资源。自旋锁是利用CAS而设计的一种应用层面的锁。如下代码:
// 0代表锁释放,1代表锁被某个线程拿走了 int lock = 0;while (true ){ if (lock==0){ int lock_ ; if (U.compareAndSwapInt(this,lock_,0,1)){ ... // 获取锁后的逻辑处理 // 最后释放锁 lock = 0; break ; } } }
上面就是很经典自旋锁设计。判断锁是否被其他线程拥有,若没有则尝试使用CAS获得锁;前两步失败都会重新循环再次尝试直到获得锁。最后逻辑处理完成要令lock=0来释放锁。冲突时间短的并发情景下这种方法可以大大提升效率。
CAS和自旋锁在ConcurrentHashMap应用地非常广泛,在源码中我们会经常看到他们的身影。同时这也是ConcurrentHashMap的设计核心所在。
ConcurrentHashMap的并发策略概述
Hashtable与SynchronizeMap采取的并发策略是对整个数组对象加锁,导致性能及其低下。jdk1.7之前,ConcurrentHashMap采用的是锁分段策略来优化性能,如下图:
img
相当于把整个数组,拆分成多个小数组。每次操作只需要锁住操作的小数组即可,不同的segment之间不互相影响,提高了性能。jdk1.8之后,对整个策略进行了重构。锁的不是segment,而是节点,如下图:
img
锁的粒度进一步被降低,并发的效率也提高了。jdk1.8做得优化不只是细化锁粒度,还带来了CAS+synchronize的设计。那么下面,我们针对ConcurrentHashMap的常见方法:添加、删除、扩容、初始化等进行详解他的设计思路。
添加数据:putVal()
ConcurrentHashMap添加数据时,采取了CAS+synchronize结合策略。首先会判断该节点是否为null,如果为null,尝试使用CAS添加节点;如果添加失败,说明发生了并发冲突,再对节点进行上锁并插入数据。在并发较低的情景下无需加锁,可以显著提高性能。同时只会CAS尝试一次,也不会造成线程长时间等待浪费cpu时间的情况。ConcurrentHashMap的put方法整体流程如下(并不是全部流程):
img
首先会判断数组是否已经初始化,如若未初始化,会先去初始化数组;
如果当前要插入的节点为null,尝试使用CAS插入数据;
如果不为null,则判断节点hash值是否为-1;-1表示数组正在扩容,会先去协助扩容,再回来继续插入数据。(协助扩容后面会讲)
最后会执行上锁,并插入数据,最后判断是否需要返回旧值;如果不是覆盖旧值,需要更新map中的节点数,也就是图中的addCount方法。
ConcurrentHashMap是基于HashMap改造的,其中的插入数据、hash算法和HashMap都大同小异,这里不再赘述。思路清晰之后,下面我们看源码分析:
final V putVal(K key, V value, boolean onlyIfAbsent) { // 不允许插入空值或空键 // 允许value空值会导致get方法返回null时有两种情况: // 1. 找不到对应的key2. 找到了但是value为null; // 当get方法返回null时无法判断是哪种情况,在并发环境下containsKey方法已不再可靠, // 需要返回null来表示查询不到数据。允许key空值需要额外的逻辑处理,占用了数组空间,且并没有多大的实用价值。 // HashMap支持键和值为null,但基于以上原因,ConcurrentHashMap是不支持空键值。 if (key == null || value == null) throw new NullPointerException(); // 高低位异或扰动hashcode,和HashMap类似 // 但有一点点不同,后面会讲,这里可以简单认为一样的就可以 int hash = spread(key.hashCode()); // bincount表示链表的节点数 int binCount = 0; // 尝试多种方法循环处理,后续会有很多这种设计 for (Node[] tab = table;;) { Node f; int n, i, fh; // 情况一:如果数组为空则进行初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 情况二:目标下标对象为null else if ((f = tabAt(tab, i = (n - 1) & hash )) == null) { // 重点:采用CAS进行插入 if (casTabAt(tab, i, null,new Node(hash , key, value, null))) break ; } // 情况三:数组正在扩容,帮忙迁移数据到新的数组 // 同时会新数组,下次循环就是插入到新的数组 // 关于扩容的内容后面再讲,这里理解为正在扩容即可 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 情况四:直接对节点进行加锁,插入数据 // 下面代码很多,但逻辑和HashMap插入数据大同小异 // 因为已经上锁,不涉及并发安全设计 else { V oldVal = null; // 同步加锁 synchronized (f) { // 重复检查一下刚刚获取的对象有没有发生变化 if (tabAt(tab, i) == f) { // 链表处理情况 if (fh >= 0) { binCount = 1; // 循环链表 for (Node e = f;; ++binCount) { K ek; // 找到相同的则记录旧值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // 判断是否需要更新数值 if (!onlyIfAbsent) e.val = value; break ; } Node pred = e; // 若未找到则插在链表尾 if ((e = e.next) == null) { pred.next = new Node(hash , key, value, null); break ; } } } // 红黑树处理情况 else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash , key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update" ); } } // 判断是否需要转化为红黑树,和返回旧数值 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break ; } } } // 总数+1;这是一个非常硬核的设计 // 这是ConcurrentHashMap设计中的一个重点,后面我们详细说 addCount(1L, binCount); return null; } // 这个方法和HashMap static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
我们注意到源码中有两个关键方法:初始化数组的initTable(),修改map中节点总数的addCount。这两个方法是如何实现线程安全的呢,我们继续分析。
初始化数组:initTable()
初始化操作的重点是:保证多个线程并发调用此方法,只能有一个线程成功。ConcurrentHashMap采取了CAS+自旋的方法来解决并发问题,整体流程如下图:
img
首先会判断数组是否为null,如果否说明另一个线程初始化结束了,直接返回该数组;
第二步判断是否正在初始化,如果是会让出cpu执行时间,当前线程自旋等待
如果数组为null,且没有另外的线程正在初始化,那么会尝试获取自旋锁,获取成功则进行初始化,获取失败则表示发生了并发冲突,继续循环判断。
ConcurrentHashMap并没有直接采用上锁的方式,而是采用CAS+自旋锁的方式,提高了性能。自旋锁保证了只有一个线程能真正初始化数组,同时又无需承担synchronize的高昂代价,一举两得。在看源码分析之前,我们先来了解一下ConcurrentHashMap中一个关键的变量:sizeCtl 。
sizeCtl默认为0,在正常情况下,他表示ConcurrentHashMap的阈值,是一个正数。当数组正在扩容时,他的值为-1,表示当前正在初始化,其他线程只需要判断sizeCtl==-1 ,就知道当前数组正在初始化。但当ConcurrentHashMap正在扩容时,sizeCtl是一个表示当前有多少个线程正在协助扩容的负数 ,我们下面讲到扩容时再分析。我们直接来看initTable()的源码分析:
private final Node[] initTable () { Node[] tab; int sc; // 这里的循环是采用自旋的方式而不是上锁来初始化 // 首先会判断数组是否为null或长度为0 // 没有在构造函数中进行初始化,主要是涉及到懒加载的问题 while ((tab = table) == null || tab.length == 0) { // sizeCtl是一个非常关键的变量; // 默认为0,-1表示正在初始化,0表示阈值 if ((sc = sizeCtl) Thread.yield(); // 让出cpu执行时间 // 通过CAS设置sc为-1,表示获得自选锁 // 其他线程则无法进入初始化,进行自选等待 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 重复检查是否为空 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked" ) Node[] nt = (Node[])new Node,?>[n]; table = tab = nt; // 设置sc为阈值,n>>>2表示1/4*n,也就相当于0.75n sc = n - (n >>> 2); } } finally { // 把sc赋值给sizeCtl sizeCtl = sc; } break ; } } // 最后返回tab数组 return tab; }
下面我们继续看一下addCount()方法如何实现并发安全。
修改节点总数:addCount()
addCount方法的目标很简单,就是把ConcurrentHashMap的节点总数进行+1,也就是我在文章开头提出的问题。ConcurrentHashMap的作者设计了一套非常严谨的架构来保证并发安全与高性能。
ConcurrentHashMap并不是一个单独的size变量,他把size进行拆分,如下图:
img
这样ConcurrentHashMap的节点数size就等于这些拆分开的size1、size2...的总和。这样拆分有什么好处呢?好处就是每个线程可以单独修改对应的变量。如下图:
img
两个线程可以同时进行自增操作,且完全没有任何的性能消耗,是不是一个非常神奇的思路?而当需要获取节点总数时,只需要把全部加起来即可。在ConcurrentHashMap中每个size被用一个CounterCell对象包装着,CounterCell类很简单:
static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
仅仅只是对value值使用volatile关键字进行修饰。简单来说就是保证当前线程对value的修改其他线程马上可以知道。ConcurrentHashMap使用一个数组来存储CounterCell,如下:
img
那么每个线程如何分配到对应的自己的CounterCell呢?ConcurrentHashMap中采用了类似HashMap的思路,获取线程随机数,再对这个随机数进行取模得到对应的CounterCell。获取到对应的CounterCell之后,当前线程会尝试使用CAS进行修改,如果修改失败,则重新获取线程随机数,换一个CounterCell再来一次,直到修改成功。
以上就是addCount方法的核心思路,但源码的设计会复杂一点,还必须考虑CounterCell数组的初始化、CounterCell对象的创建、CounterCell数组的扩容。ConcurrentHashMap还保留了一个basecount,每个线程会首先使用CAS尝试修改basecount,如果修改失败,才会下发到counterCell数组中。整体的流程如下:
img
当前线程首先会使用CAS修改basecount的值,修改失败则进入数组分配CounterCell修改;
判断CounterCell数组是否为空,如果CounterCell数组为空,则初始化数组
如果CounterCell数组不为空,使用线程随机数找到下标
如果该下标的的counterCell对象还没初始化,则先创建一个CounterCell,这一步在图中我没有标出来。创建了CounterCell之后还需要考虑是否需要数组扩容
如果counterCell对象不为null,使用CAS尝试修改,失败则重新来一次
如果上面两种情况都不满足,则会回去再尝试CAS修改一下basecount
看起来好像挺复杂,但只要抓住size变量分割成多个CounterCell这个核心概念即可,其他的步骤都是细节完善。我们可以看到整个思路完全没有提到synchronize加锁,ConcurrentHashMap的作者采用CAS+自旋锁代替了synchronize,这使得在高并发情况下提升了非常大的性能。思路清晰之后,我们看源码也就简单一些了。那接下来就 read the fucking code:
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 如果数组不为空 或者 数组为空且直接更新basecount失败 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; // 表示没发生竞争 boolean uncontended = true ; // 这里有以下情况会进入fullAddCount方法: // 1. 数组为null且直接修改basecount失败 // 2. hash 后的数组下标CounterCell对象为null // 3. CAS修改CounterCell对象失败 if (as == null || (m = as.length - 1) (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 该方法保证完成更新,重点方法!! fullAddCount(x, uncontended); return ; } // 如果长度<=1不需要扩容(说实话我觉得这里有点奇怪) if (check <= 1) return ; s = sumCount(); } if (check >= 0) { // 扩容相关逻辑,下面再讲 } }
前面源码尝试直接修改basecount失败后,就会进入fullAddCount方法:
private final void fullAddCount(long x, boolean wasUncontended) { int h; // 如果当前线程随机数为0,强制初始化一个线程随机数 // 这个随机数的作用就类似于hashcode,不过他不需要被查找 // 下面每次循环都重新获取一个随机数,不会让线程都堵在同一个地方 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); // wasUncontended表示没有竞争 // 如果为false 表示之前CAS修改CounterCell失败,需要重新获取线程随机数 wasUncontended = true ; } // 直译为碰撞,如果他为true ,则表示需要进行扩容 boolean collide = false ; // 下面分为三种大的情况: // 1. 数组不为null,对应的子情况为CAS更新CounterCell失败或者countCell对象为null // 2. 数组为null,表示之前CAS更新baseCount失败,需要初始化数组 // 3. 第二步获取不到锁,再次尝试CAS更新baseCount for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 第一种情况:数组不为null if ((as = counterCells) != null && (n = as.length) > 0) { // 对应下标的CounterCell为null的情况 if ((a = as[(n - 1) & h]) == null) { // 判断当前锁是否被占用 // cellsBusy是一个自旋锁,0表示没被占用 if (cellsBusy == 0) { // 创建CounterCell对象 CounterCell r = new CounterCell(x); // 尝试获取锁来添加一个新的CounterCell对象 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false ; try { CounterCell[] rs; int m, j; // recheck一次是否为null if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; // created=true 表示创建成功 created = true ; } } finally { // 释放锁 cellsBusy = 0; } // 创建成功也就是+1成功,直接返回 if (created) break ; // 拿到锁后发现已经有别的线程插入数据了 // 继续循环,重来一次 continue ; } } // 到达这里说明想创建一个对象,但是锁被占用 collide = false ; } // 之前直接CAS改变CounterCell失败,重新获取线程随机数,再循环一次 else if (!wasUncontended) // CAS already known to fail wasUncontended = true ; // Continue after rehash // 尝试对CounterCell进行CAS else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break ; // 如果发生过扩容或者长度已经达到虚拟机最大可以核心数,直接认为无碰撞 // 因为已经无法再扩容了 // 所以并发线程数的理论最高值就是NCPU else if (counterCells != as || n >= NCPU) collide = false ; // At max size or stale // 如果上面都是false ,说明发生了冲突,需要进行扩容 else if (!collide) collide = true ; // 获取自旋锁,并进行扩容 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale // 扩大数组为原来的2倍 CounterCell[] rs = new CounterCell[n < for (int i = 0; i rs[i] = as[i]; counterCells = rs; } } finally { // 释放锁 cellsBusy = 0; } collide = false ; // 继续循环 continue ; } // 这一步是重新hash ,找下一个CounterCell对象 // 上面每一步失败都会来到这里获取一个新的随机数 h = ThreadLocalRandom.advanceProbe(h);