Featured image of post 经典面试题之ConcurrentHashMap

经典面试题之ConcurrentHashMap

一 ConcurrentHashMap 与 HashMap的区别?ConcurrentHashMap线程安

Image

一 ConcurrentHashMap 与 HashMap的区别?

  • ConcurrentHashMap线程安全,而HashMap非线程安全

  • HashMap允许Key和Value为null,而ConcurrentHashMap不允许

  • HashMap不允许通过Iterator遍历的同时通过HashMap修改,而ConcurrentHashMap允许该行为,并且该更新对后续的遍历可见

以上说的比较笼统,我们具体看一下ConcurrentHashMap:

先来看下ConcurrentHashMap的数据结构

1.8之前的 ConcurrentHashMap是在1.7HashMap的基础上实现了线程安全的版本。采用分段锁的概念,使锁更加细化。它默认将Hash表分为16个分段,segments数组的长度最大为65536,最大容量 1 « 30。

Image

JDK1.8 的实现已经摒弃了 Segment 的概念,而是直接用 Node 数组 + 链表 + 红黑树的数据结构来实现,并发控制使用 Synchronized 和 CAS 来操作,整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。

Image

二 concurrentHashMap最大容量?

 1/**
 2     * The largest possible table capacity.  This value must be
 3     * exactly 1<<30 to stay within Java array allocation and indexing
 4     * bounds for power of two table sizes, and is further required
 5     * because the top two bits of 32bit hash fields are used for
 6     * control purposes.
 7     */
 8    private static final int MAXIMUM_CAPACITY = 1 << 30;
 9
10```java
11
12**注意这是** The largest possible table capacity,它是否代表最多能存储到map中的元素数量?答案是否定的。至于为什么,作为思考题,留给你。(关于这个问题在前一个系列关于HashMap的文章中也提到过相似的问题
13
14提示 看一下size方法,为什么n要设计为long?实际元素数量和返回值一样吗? 
15
16```cs
17public int size() {
18        long n = sumCount();
19        return ((n < 0L) ? 0 :
20                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
21                (int)n);
22    }

三 ConcurrentHashMap 也会出现死循环?

是的,当你不当地使用computeIfAbsent 方法时

1/**
2     * If the specified key is not already associated with a value,
3     * attempts to compute its value using the given mapping function
4     * and enters it into this map unless {@code null}.  The entire
5     * method invocation is performed atomically, so the function is
6     * applied at most once per key.  Some attempted update operations
7     * on this map by other threads may be blocked while computation
8     * is in progress, so the computation should be short and simple,
9     * and must not attempt to update any other mappings of this map.

上面的computeIfAbsent 方法注释也得很清楚了,**应该绝对避免在computeIfAbsent中有递归,或者修改map的任何操作。所以如果你在调用此方法并有上述操作时就会出现死循环问题。**至于为什么会出现这种问题,有兴趣的可以读读其他资料或源代码,本文就不详述了。好在这个问题在java 1.9中已经基本修复了。

https://bugs.openjdk.java.net/browse/JDK-8172951

问题如何规避?既然官方给出这么强烈的提示了,不作死就不会死。或者升级到JDK1.9

四 ConcurrentHashMap 在 JDK 1.8 中,为什么要使用内置锁 synchronized 来代替重入锁 ReentrantLock?

  • 粒度降低了(看下图感觉下锁粒度的变化)

  • JVM 开发团队没有放弃 synchronized,而且基于 JVM 的 synchronized 优化空间更大,更加自然。

  • 在大量的数据操作下,对于 JVM 的内存压力,基于 API 的 ReentrantLock 会开销更多的内存。

Image

Image

JDK1.8的ConcurrentHashMap(TreeBin: 红黑二叉树节点

Node: 链表节点)

Image

五 put() 方法流程?

 1final V putVal(K key, V value, boolean onlyIfAbsent) {
 2        if (key == null || value == null) throw new NullPointerException();
 3        int hash = spread(key.hashCode());
 4        int binCount = 0;
 5        for (Node<K,V>[] tab = table;;) {
 6            Node<K,V> f; int n, i, fh;
 7            if (tab == null || (n = tab.length) == 0)
 8                tab = initTable();
 9            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
10                if (casTabAt(tab, i, null,
11                             new Node<K,V>(hash, key, value, null)))
12                    break;                   // no lock when adding to empty bin
13            }
14            else if ((fh = f.hash) == MOVED)
15                tab = helpTransfer(tab, f);
16            else {
17                V oldVal = null;
18                synchronized (f) {
19                    if (tabAt(tab, i) == f) {
20                        if (fh >= 0) {
21                            binCount = 1;
22                            for (Node<K,V> e = f;; ++binCount) {
23                                K ek;
24                                if (e.hash == hash &&
25                                    ((ek = e.key) == key ||
26                                     (ek != null && key.equals(ek)))) {
27                                    oldVal = e.val;
28                                    if (!onlyIfAbsent)
29                                        e.val = value;
30                                    break;
31                                }
32                                Node<K,V> pred = e;
33                                if ((e = e.next) == null) {
34                                    pred.next = new Node<K,V>(hash, key,
35                                                              value, null);
36                                    break;
37                                }
38                            }
39                        }
40                        else if (f instanceof TreeBin) {
41                            Node<K,V> p;
42                            binCount = 2;
43                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
44                                                           value)) != null) {
45                                oldVal = p.val;
46                                if (!onlyIfAbsent)
47                                    p.val = value;
48                            }
49                        }
50                    }
51                }
52                if (binCount != 0) {
53                    if (binCount >= TREEIFY_THRESHOLD)
54                        treeifyBin(tab, i);
55                    if (oldVal != null)
56                        return oldVal;
57                    break;
58                }
59            }
60        }
61        addCount(1L, binCount);
62        return null;
63    }
  1. 如果没有初始化,就调用 initTable() 方法来进行初始化;

  2. 如果没有 hash 冲突就直接 CAS 无锁插入;

  3. 如果需要扩容,就先进行扩容;

  4. 如果存在 hash 冲突,就加锁来保证线程安全,两种情况:一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入;

  5. 如果该链表的数量大于阀值 8,就要先转换成红黑树的结构,break 再一次进入循环

  6. 如果添加成功就调用 addCount() 方法统计 size,并且检查是否需要扩容。

  • 扩容方法 transfer():默认容量为 16,扩容时,容量变为原来的两倍。

  • helpTransfer():调用多个工作线程一起帮助进行扩容,这样的效率就会更高。

六 ConcurrentHashMap 的并发度是什么?

程序运行时能够同时更新 ConccurentHashMap 且不产生锁竞争的最大线程数。默认为 16,且可以在构造函数中设置。当用户设置并发度时,ConcurrentHashMap 会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32)

七 ConcurrentHashMap的get方法是否要加锁,为什么?

不需要。get没有加锁的话,ConcurrentHashMap是如何保证读到的数据不是脏数据的呢?

get操作全程不需要加锁是因为Node的成员val是用volatile修饰的。

八 ConcurrentHashMap 如何计算 size

size()方法返回的是一个不精确的值

我们先来看一下jdk1.8的代码注释:

大致的意思是:返回容器的大小。这个方法应该被用来代替size()方法,因为 ConcurrentHashMap的容量大小可能会大于int的最大值。返回的值是一个估计值;如果有并发插入或者删除操作,则实际的数量可能有所不同。

 1/**
 2     * Returns the number of mappings. This method should be used
 3     * instead of {@link #size} because a ConcurrentHashMap may
 4     * contain more mappings than can be represented as an int. The
 5     * value returned is an estimate; the actual count may differ if
 6     * there are concurrent insertions or removals.
 7     *(大致的意思是:返回容器的大小。这个方法应该被用来代替size()方法,因为
 8     * ConcurrentHashMap的容量大小可能会大于int的最大值。
 9     * 返回的值是一个估计值;如果有并发插入或者删除操作,则实际的数量可能有所不同。)
10     * @return the number of mappings
11     * @since 1.8
12     */
13    public long mappingCount() {
14        long n = sumCount();
15        return (n < 0L) ? 0L : n; // ignore transient negative values
16    }

1.7中 Segment继承ReentrantLock,这样就很容易对每个Segment加锁了。类似于get或remove这些操作,都只需要在操作前对一个Segment加锁。但是有些操作需要跨段,比如size()、containsValue()和isEmpty()方法,因此为了保证并发效率,允许size返回的是一个近似值而不是精确值。

1.7的 put、remove和get操作只需要关心一个Segment,而size操作需要遍历所有的Segment才能算出整个Map的大小。一个简单的方案是,先锁住所有Sgment,计算完后再解锁。但这样做,在做size操作时,不仅无法对Map进行写操作,同时也无法进行读操作,不利于对Map的并行操作。为更好支持并发操作,**ConcurrentHashMap会在不上锁的前提逐个Segment计算3次size,**如果某相邻两次计算获取的所有Segment的更新次数(每个Segment都与HashMap一样通过modCount跟踪自己的修改次数,Segment每修改一次其modCount加一)相等,说明这两次计算过程中无更新操作,则这两次计算出的总size相等,可直接作为最终结果返回。如果这三次计算过程中Map有更新,则对所有Segment加锁重新计算Size。

jdk 1.8 put方法和remove方法都会通过addCount方法维护Map的size。size方法通过sumCount获取由addCount方法维护的Map的size。

 1 final long sumCount() {
 2        CounterCell[] as = counterCells; CounterCell a;
 3        long sum = baseCount;
 4        if (as != null) {
 5            for (int i = 0; i < as.length; ++i) {
 6                if ((a = as[i]) != null)
 7                    sum += a.value;
 8            }
 9        }
10        return sum;
11    }
12
13 private final void addCount(long x, int check) {
14        CounterCell[] as; long b, s;
15        if ((as = counterCells) != null ||
16            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
17            CounterCell a; long v; int m;
18            boolean uncontended = true;
19            if (as == null || (m = as.length - 1) < 0 ||
20                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
21                !(uncontended =
22                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
23                fullAddCount(x, uncontended);
24                return;
25            }
26            if (check <= 1)
27                return;
28            s = sumCount();
29        }

注意两个属性 :baseCount 和 counterCells。

  • baseCount 一个 volatile 的变量,在 addCount 方法中会使用它,而 addCount 方法在 put 结束后会调用。在 addCount 方法中,会对这个变量做 CAS 加法。

  • counterCells 一种用于分配计数的填充单元。改编自LongAdder和Striped64

总结

  • JDK1.7 和 JDK1.8 对 size 的计算是不一样的。1.7 中是先不加锁计算三次,如果三次结果不一样在加锁

  • JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。

  • JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。

九 用了ConcurrentHashMap 就一定是线程安全的吗?

不一定,ConcurrenetHashMap 只能保证提供的原子性读写操作是线程安全的,换句话说,如果你的读写操作不是原子性的,那么无法保证绝对的线程安全。如果你希望在一整段业务逻辑中,对容器的操作都保持整体一致性的话,需要另外加锁处理。

ConcurrentHashMap 对外提供的方法或能力的限制:

  • 使用了 ConcurrentHashMap,不代表对它的多个操作之间的状态是一致的,是没有其他线程在操作它的,如果需要确保需要手动加锁。

  • 诸如 size、isEmpty 和 containsValue 等聚合方法,在并发情况下可能会反映 ConcurrentHashMap 的中间状态。因此在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。

  • 诸如 putAll 这样的聚合方法也不能确保原子性,在 putAll 的过程中去获取数据可能会获取到部分数据。

Image

关注公众号 获取更多精彩内容

位旅人路过 次翻阅 初次见面