由于引入了
segment
,因此不管是调用
get
方法读还是调用
put
方法写,都需要做两次哈希,还记得在上文我们讲初始化的时候系统做了一件重要的事:
确定哈希寻址时的偏移量,这个偏移量在确定元素在
segment
数组中的位置时会用到
没错就是这段代码:
this.segmentShift = 32 - sshift;
这里用32去减是因为
int
型的长度是32,有了
segmentShift
,
ConcurrentHashMap
是如何做第一次哈希的呢?
public V put(K key, V value){ Segment s; if (value == null) thrownew NullPointerException(); int hash = hash(key); // 变量j代表着数据项处于segment数组中的第j项 int j = (hash >>> segmentShift) & segmentMask; // 如果segment[j]为null,则下面的这个方法负责初始化之 s = ensureSegment(j); return s.put(key, hash, value, false); }
我们以
put
方法为例,变量
j
代表着数据项处于
segment
数组中的第
j
项。如下图所示假如
segment
数组的大小为2的n次方,则
hash >>> segmentShift
正好取了key的哈希值的高n位,再与掩码
segmentMask
相与相当与仍然用key的哈希的高位来确定数据项在
segment
数组中的位置。
image-20210409232020703
hash
方法与非线程安全的
HashMap
相似,这里不再细说。
细节三:
在延迟初始化
Segment
数组时,作者采用了
CAS
避免了加锁,而且
CAS
可以保证最终的初始化只能被一个线程完成。在最终决定调用
CAS
进行初始化前又做了两次检查,第一次检查可以避免重复初始化
tab
数组,而第二次检查则可以避免重复初始化
Segment
对象,每一行代码作者都有详细的考虑。
private SegmentensureSegment(int k){ final Segment[] ss = this.segments; long u = (k Segment seg; if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry[] tab = (HashEntry[])new HashEntry[cap]; if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck 再检查一次是否已经被初始化 Segment s = new Segment(lf, threshold, tab); while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) // 使用 CAS 确保只被初始化一次 break; } } } return seg; }
put
方法
final V put(K key, int hash, V value, boolean onlyIfAbsent){ HashEntry node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry[] tab = table; int index = (tab.length - 1) & hash; HashEntry first = entryAt(tab, index); for (HashEntry e = first;;) { if (e != null) { K k; // 如果找到key相同的数据项,则直接替换 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if
(!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) // node不为空说明已经在自旋等待时初始化了,注意调用的是setNext,不是直接操作next node.setNext(first); else // 否则,在这里新建一个HashEntry node = new HashEntry(hash, key, value, first); int c = count + 1; // 先加1 if (c > threshold && tab.length rehash(node); else // 将新节点写入,注意这里调用的方法有门道 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
staticfinalvoidsetEntryAt(HashEntry[] tab, int i, HashEntry e){ UNSAFE.putOrderedObject(tab, ((long)i <}
putOrderedObject
这个接口写入的数据不会马上被其他线程获取到,而是在
put
方法最后调用
unclock
后才会对其他线程可见,参见前文中对JMM的描述:
对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)
这样的好处有两个,第一是性能,因为在持有锁的临界区不需要有同步主存的操作,因此持有锁的时间更短。第二是保证了数据的一致性,在
put
操作的
finally
语句执行完之前,
put
新增的数据是不对其他线程展示的,这是
ConcurrentHashMap
实现无锁读的关键原因。
我们在这里稍微总结一下
put
方法里面最重要的三个细节,首先将
volatile
变量转为普通变量提升性能,因为在
put
中需要读取到最新的数据,因此接下来调用
UNSAFE.getObjectVolatile
获取到最新的头结点,但是通过调用
UNSAFE.putOrderedObject
让变量写入主存的时间延迟到
put
方法的结尾,一来缩小临界区提升性能,而来也能保证其他线程读取到的是完整数据。
细节九:
如果
put
真的需要往链表头插入数据项,那也得注意了,
ConcurrentHashMap
相应的语句是:
因为
next
变量是用
volatile
关键字修饰的,这里调用
UNSAFE.putOrderedObject
相当于是改变了
volatile
的语义,这里面的考量有两个,第一个仍然是性能,这样的实现性能明显更高,这一点前文已经详细的分析过,第二点是考虑了语义的一致性,对于
put
方法来说因为其调用的是
UNSAFE.getObjectVolatile
,仍然能获取到最新的数据,对于
get
方法,在
put
方法未结束之前,是不希望不完整的数据被其他线程通过
get
方法读取的,这也是合理的。