查看历史文章,请点击上方链接关注公众号。
本节介绍一个常用的并发容器 - ConcurrentHashMap,它是HashMap的并发版本,与HashMap相比,它有如下特点:
我们分别来看下。
并发安全
我们知道,HashMap不是并发安全的,在并发更新的情况下,HashMap的链表结构可能形成环,出现死循环,占满CPU,我们看个例子:
public static void unsafeConcurrentUpdate() {
final Map map = new HashMap();
for (int i = 0; i Thread t = new Thread() {
Random rnd = new Random();
@Override
public void run() {
for (int i = 0; i map.put(rnd.nextInt(), 1);
}
}
};
t.start();
}
}
运行上面的代码,在我的机器上,每次都会出现死循环,占满CPU。
使用Collections.synchronizedMap方法可以生成一个同步容器,避免该问题,替换第一行代码即可:
final Map map = Collections.synchronizedMap(new HashMap());
在Java中,HashMap还有一个同步版本Hashtable,它与使用synchronizedMap生成的Map基本是一样的,也是在每个方法调用上加了synchronized,我们就不赘述了。
同步容器有几个问题:
ConcurrentHashMap没有这些问题,它同样实现了Map接口,也是基于哈希表实现的,上面的代码替换第一行即可:
final Map map = new ConcurrentHashMap();
原子复合操作
除了Map接口,ConcurrentHashMap还实现了一个接口ConcurrentMap,接口定义了一些条件更新操作,具体定义为:
public interface ConcurrentMap extends Map {
//条件更新,如果Map中没有key,设置key为value,
//返回原来key对应的值,如果没有,返回null
V putIfAbsent(K key, V value);
//条件删除,如果Map中有key,且对应的值为value,
//则删除,如果删除了,返回true,否则false
boolean remove(Object key, Object value);
//条件替换,如果Map中有key,且对应的值为oldValue,
//则替换为newValue,如果替换了,返回ture,否则false
boolean replace(K key, V oldValue, V newValue);
//条件替换,如果Map中有key,则替换值为value,
//返回原来key对应的值,如果原来没有,返回null
V replace(K key, V value);
}
如果使用同步容器,调用方必须加锁,而ConcurrentMap将它们实现为了原子操作。实际上,使用ConcurrentMap,调用方也没有办法进行加锁,它没有暴露锁接口,也不使用synchronized。
高并发
ConcurrentHashMap是为高并发设计的,它是怎么做的呢?具体实现比较复杂,我们简要介绍其思路,主要有两点:
同步容器使用synchronized,所有方法,竞争同一个锁,而ConcurrentHashMap采用分段锁技术,将数据分为多个段,而每个段有一个独立的锁,每一个段相当于一个独立的哈希表,分段的依据也是哈希值,无论是保存键值对还是根据键查找,都先根据键的哈希值映射到段,再在段对应的哈希表上进行操作。
采用分段锁,可以大大提高并发度,多个段之间可以并行读写。默认情况下,段是16个,不过,这个数字可以通过构造方法进行设置,如下所示:
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)
concurrencyLevel表示估计的并行更新的线程个数,ConcurrentHashMap会将该数转换为2的整数次幂,比如14转换为16,25转换为32。
在对每个段的数据进行读写时,ConcurrentHashMap也不是简单的使用锁进行同步,内部使用了CAS、对一些写采用原子方式,实现比较复杂,我们就不介绍了,实现的效果是,对于写操作,需要获取锁,不能并行,但是读操作可以,多个读可以并行,写的同时也可以读,这使得ConcurrentHashMap的并行度远远大于同步容器。
迭代
我们在66节介绍过,使用同步容器,在迭代中需要加锁,否则可能会抛出ConcurrentModificationException。ConcurrentHashMap没有这个问题,在迭代器创建后,在迭代过程中,如果另一个线程对容器进行了修改,迭代会继续,不会抛出异常。
问题是,迭代会反映别的线程的修改?还是像上节介绍的CopyOnWriteArrayList一样,反映的是创建时的副本?答案是,都不是!我们看个例子:
public class ConcurrentHashMapIteratorDemo {
public static void test() {
final ConcurrentHashMap map = new ConcurrentHashMap();
map.put("a", "abstract");
map.put("b", "basic");
Thread t1 = new Thread() {
@Override
public void run() {
for (Entry entry : map.entrySet()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println(entry.getKey() + "," + entry.getValue());
}
}
};
t1.start();
// 确保线程t1启动
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
map.put("c", "call");
}
public static void main(String[] args) {
test();
}
}
t1启动后,创建迭代器,但在迭代输出每个元素前,先睡眠1秒钟,主线程启动t1后,先睡眠一下,确保t1先运行,然后给map增加了一个元素,程序输出为:
a,abstract
b,basic
c,call
说明,迭代器反映了最新的更新,但我们将添加语句更改为:
map.put("g", "call");
你会发现,程序输出为:
a,abstract
b,basic
这说明,迭代器没有反映最新的更新,这是怎么回事呢?我们需要理解ConcurrentHashMap的弱一致性。
弱一致性
ConcurrentHashMap的迭代器创建后,就会按照哈希表结构遍历每个元素,但在遍历过程中,内部元素可能会发生变化,如果变化发生在已遍历过的部分,迭代器就不会反映出来,而如果变化发生在未遍历过的部分,迭代器就会发现并反映出来,这就是弱一致性。
类似的情况还会出现在ConcurrentHashMap的另一个方法:
//批量添加m中的键值对到当前Map
public void putAll(Map extends K, ? extends V> m)
该方法并非原子操作,而是调用put方法逐个元素进行添加的,在该方法没有结束的时候,部分修改效果就会体现出来。
小结
本节介绍了ConcurrentHashMap,它是并发版的HashMap,通过分段锁和其他技术实现了高并发,支持原子条件更新操作,不会抛出ConcurrentModificationException,实现了弱一致性。
Java中没有并发版的HashSet,但可以通过Collections.newSetFromMap方法基于ConcurrentHashMap构建一个。
我们知道HashMap/HashSet基于哈希,不能对元素排序,对应的可排序的容器类是TreeMap/TreeSet,并发包中可排序的对应版本不是基于树,而是基于Skip List(跳跃表)的,类分别是ConcurrentSkipListMap和ConcurrentSkipListSet,它们到底是什么呢?
(与其他章节一样,本节所有代码位于 https://github.com/swiftma/program-logic)
-- 长文连载,未完待续,敬请关注(长按下图二维码,或公众号搜索"老马说编程")
用心原创,保留所有版权。