• ConcurrentHashMap主要分为JDK<=7跟JDK>=8的两个版本,ConcurrentHashMap的空间利用率更低一般只有10%~20%,接下来分别介绍。

    JDK7

    先宏观说下JDK7中的大致组成,ConcurrentHashMap由Segment数组结构和HashEntry数组组成。Segment是一种可重入锁,是一种数组和链表的结构,一个Segment中包含一个HashEntry数组,每个HashEntry又是一个链表结构。正是通过Segment分段锁,ConcurrentHashMap实现了高效率的并发。缺点是并发程度是有segment数组来决定的,并发度一旦初始化无法扩容。先绘制个ConcurrentHashMap的形象直观图。我是这样给阿里面试官吹 ConcurrentHashMap的-LMLPHP要想理解currentHashMap,可以简单的理解为将数据「分表分库」ConcurrentHashMap是由 Segment 数组 结构和HashEntry 数组 结构组成。

    static final class Segment<K,Vextends ReentrantLock implements Serializable {
         transient volatile HashEntry<K,V>[] table; //包含一个HashMap 可以理解为
    }

    可以理解为我们的每个segment都是实现了Lock功能的HashMap。如果我们同时有多个segment形成了segment数组那我们就可以实现并发咯。

    构造函数详解:

       //initialCapacity 是我们保存所以KV数据的初始值
       //loadFactor这个就是HashMap的负载因子
       // 我们segment数组的初始化大小
          @SuppressWarnings("unchecked")
           public ConcurrentHashMap(int initialCapacity,
                                    float loadFactor, int concurrencyLevel)
     
    {
               if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
                   throw new IllegalArgumentException();
               if (concurrencyLevel > MAX_SEGMENTS) // 最大允许segment的个数,不能超过 1< 24
                   concurrencyLevel = MAX_SEGMENTS;
               int sshift = 0// 类似扰动函数
               int ssize = 1
               while (ssize < concurrencyLevel) {
                   ++sshift;
                   ssize <<= 1// 确保segment一定是2次幂
               }
               this.segmentShift = 32 - sshift;  
               //有点类似与扰动函数,跟下面的参数配合使用实现 当前元素落到那个segment上面。
               this.segmentMask = ssize - 1// 为了 取模 专用
               if (initialCapacity > MAXIMUM_CAPACITY) //不能大于 1< 30
                   initialCapacity = MAXIMUM_CAPACITY;
       
               int c = initialCapacity / ssize; //总的数组大小 被 segment 分散后 需要多少个table
               if (c * ssize < initialCapacity)
                   ++c; //确保向上取值
               int cap = MIN_SEGMENT_TABLE_CAPACITY; 
               // 每个table初始化大小为2
               while (cap < c) // 单独的一个segment[i] 对应的table 容量大小。
                   cap <<= 1;
               // 将table的容量初始化为2的次幂
               Segment<K,V> s0 =
                   new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
                   // 负载因子,阈值,每个segment的初始化大小。跟hashmap 初始值类似。
                   // 并且segment的初始化是懒加载模式,刚开始只有一个s0,其余的在需要的时候才会增加。
               Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
               UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
               this.segments = ss;
           }
       //  整体思想就是通过多次不同方式的位运算来努力将数据均匀的分不到目标table中,都是些扰动函数
       private int hash(Object k) {
           int h = hashSeed;
           if ((0 != h) && (k instanceof String)) {
               return sun.misc.Hashing.stringHash32((String) k);
           }
           h ^= k.hashCode();
           // single-word Wang/Jenkins hash.
           h += (h <<  15) ^ 0xffffcd7d;
           h ^= (h >>> 10);
           h += (h <<   3);
           h ^= (h >>>  6);
           h += (h <<   2) + (h << 14);
           return h ^ (h >>> 16);
       }
       public V get(Object key) {
           Segment<K,V> s;
           HashEntry<K,V>[] tab;
           int h = hash(key); // JDK7中标准的hash值获取算法
           long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // hash值如何映射到对应的segment上
           if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
               //  无非就是获得hash值对应的segment 是否存在,
               for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                        (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                    e != null; e = e.next) {
                   // 看下这个hash值对应的是segment(HashEntry)中的具体位置。然后遍历查询该链表
                   K k;
                   if ((k = e.key)  key || (e.hash  h && key.equals(k)))
                       return e.value;
               }
           }
           return null;
       }
       @SuppressWarnings("unchecked")
       public V put(K key, V value) {
           Segment<K,V> s;
           if (value  null)
               throw new NullPointerException();
           int hash = hash(key);// 还是获得最终hash值
           int j = (hash >>> segmentShift) & segmentMask; // hash值位操作对应的segment数组位置
           if ((s = (Segment<K,V>)UNSAFE.getObject          
                (segments, (j << SSHIFT) + SBASE))  null)
               s = ensureSegment(j); 
           // 初始化时候因为只有第一个segment,如果落在了其余的segment中 则需要现初始化。
           return s.put(key, hash, value, false);
           // 直接在数据中执行put操作。
       }

    其中put操作基本思路跟HashMap几乎一样,只是在开始跟结束进行了加锁的操作tryLock and unlock,然后JDK7中都是先扩容再添加数据的,并且获得不到锁也会进行自旋的tryLock或者lock阻塞排队进行等待(同时获得锁前提前new出新数据)。

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 在往该 segment 写入前,需要先获取该 segment 的独占锁,获取失败尝试获取自旋锁
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            // segment 内部的数组
            HashEntry<K,V>[] tab = table;
            // 利用 hash 值,求应该放置的数组下标
            int index = (tab.length - 1) & hash;
            // first 是数组该位置处的链表的表头
            HashEntry<K,V> first = entryAt(tab, index);
     
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                    if ((k = e.key)  key ||
                        (e.hash  hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            // 覆盖旧值
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    // 继续顺着链表走
                    e = e.next;
                }
                else {
                    // node 是不是 null,这个要看获取锁的过程。没获得锁的线程帮我们创建好了节点,直接头插法
                    // 如果不为 null,那就直接将它设置为链表表头;如果是 null,初始化并设置为链表表头。
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
     
                    int c = count + 1;
                    // 如果超过了该 segment 的阈值,这个 segment 需要扩容
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node); // 扩容
                    else
                        // 没有达到阈值,将 node 放到数组 tab 的 index 位置,
                        // 将新的结点设置成原链表的表头
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            // 解锁
            unlock();
        }
        return oldValue;
    }

    如果加锁失败了调用scanAndLockForPut,完成查找或新建节点的工作。当获取到锁后直接将该节点加入链表即可,「提升」了put操作的性能,这里涉及到自旋。大致过程:

    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
        HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        int retries = -1// negative while locating node
     
        // 循环获取锁
        while (!tryLock()) {
            HashEntry<K,V> f; // to recheck first below
            if (retries < 0) {
                if (e  null) {
                    if (node  null// speculatively create node
                  // 进到这里说明数组该位置的链表是空的,没有任何元素
                 // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0;
                }
                else if (key.equals(e.key))
                    retries = 0;
                else
                    // 顺着链表往下走
                    e = e.next;
            }
        // 重试次数如果超过 MAX_SCAN_RETRIES(单核 1 次多核 64 次),那么不抢了,进入到阻塞队列等待锁
        //    lock() 是阻塞方法,直到获取锁后返回
            else if (++retries > MAX_SCAN_RETRIES) {
                lock();
                break;
            }
            else if ((retries & 1)  0 &&
                     // 进入这里,说明有新的元素进到了链表,并且成为了新的表头
                     // 这边的策略是,重新执行 scanAndLockForPut 方法
                     (f = entryForHash(this, hash)) != first) {
                e = first = f; // re-traverse if entry changed
                retries = -1;
            }
        }
        return node;
    }
    public int size() {
           // Try a few times to get accurate count. On failure due to
           // continuous async changes in table, resort to locking.
           final Segment<K,V>[] segments = this.segments;
           int size;
           boolean overflow; // true if size overflows 32 bits
           long sum;         // sum of modCounts
           long last = 0L;   // previous sum
           int retries = -1// first iteration isn't retry
           try {
               for (;;) {
                   if (retries++  RETRIES_BEFORE_LOCK) {  //  超过2次则全部加锁
                       for (int j = 0; j < segments.length; ++j)
                           ensureSegment(j).lock(); // 直接对全部segment加锁消耗性太大
                   }
                   sum = 0L;
                   size = 0;
                   overflow = false;
                   for (int j = 0; j < segments.length; ++j) {
                       Segment<K,V> seg = segmentAt(segments, j);
                       if (seg != null) {
                           sum += seg.modCount; // 统计的是modCount,涉及到增删该都会加1
                           int c = seg.count;
                           if (c < 0 || (size += c) < 0)
                               overflow = true;
                       }
                   }
                   if (sum  last) // 每一个前后的修改次数一样 则认为一样,但凡有一个不一样则直接break。
                       break;
                   last = sum;
               }
           } finally {
               if (retries > RETRIES_BEFORE_LOCK) {
                   for (int j = 0; j < segments.length; ++j)
                       segmentAt(segments, j).unlock();
               }
           }
           return overflow ? Integer.MAX_VALUE : size;
       }
    // 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。
    private void rehash(HashEntry<K,V> node) {
        HashEntry<K,V>[] oldTable = table;
        int oldCapacity = oldTable.length;
        // 2 倍
        int newCapacity = oldCapacity << 1;
        threshold = (int)(newCapacity * loadFactor);
        // 创建新数组
        HashEntry<K,V>[] newTable =
            (HashEntry<K,V>[]) new HashEntry[newCapacity];
        // 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
        int sizeMask = newCapacity - 1;
        // 遍历原数组,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
        for (int i = 0; i < oldCapacity ; i++) {
            // e 是链表的第一个元素
            HashEntry<K,V> e = oldTable[i];
            if (e != null) {
                HashEntry<K,V> next = e.next;
                // 计算应该放置在新数组中的位置,
                // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
                int idx = e.hash & sizeMask; // 新位置
                if (next  null)   // 该位置处只有一个元素
                    newTable[idx] = e;
                else { // Reuse consecutive sequence at same slot
                    // e 是链表表头
                    HashEntry<K,V> lastRun = e;
                    // idx 是当前链表的头结点 e 的新位置
                    int lastIdx = idx;
                    // for 循环找到一个 lastRun 结点,这个结点之后的所有元素是将要放到一起的
                    for (HashEntry<K,V> last = next;
                         last != null;
                         last = last.next) {
                        int k = last.hash & sizeMask;
                        if (k != lastIdx) {
                            lastIdx = k;
                            lastRun = last;
                        }
                    }
                    // 将 lastRun 及其之后的所有结点组成的这个链表放到 lastIdx 这个位置
                    newTable[lastIdx] = lastRun;
                    // 下面的操作是处理 lastRun 之前的结点,
                    //这些结点可能分配在另一个链表中,也可能分配到上面的那个链表中
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
        // 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
        int nodeIndex = node.hash & sizeMask; // add the new node
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
        table = newTable;
    }
         final void setNext(HashEntry<K,V> n) {
                UNSAFE.putOrderedObject(this, nextOffset, n);
            }
        static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
            return (tab  null) ? null :
                (HashEntry<K,V>) UNSAFE.getObjectVolatile
                (tab, ((long)i << TSHIFT) + TBASE);
        }
       static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                           HashEntry<K,V> e)
     
    {
            UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
        }

    常见问题

    JDK8

    JDK8相比与JDK7主要区别如下:

          static class Node<K,Vimplements Map.Entry<K,V{
                  final int hash;    // key的hash值
                  final K key;       // key
                  volatile V val;    // value
                  volatile Node<K,V> next; 
                   //表示链表中的下一个节点
          }
          static final class TreeNode<K,Vextends Node<K,V{
                  TreeNode<K,V> parent;  
                  // 红黑树的父亲节点
                  TreeNode<K,V> left;
                  // 左节点
                  TreeNode<K,V> right;
                 // 右节点
                  TreeNode<K,V> prev;    
                 // 前节点
                  boolean red;
                 // 是否为红点
          }
    static final class ForwardingNode<K,Vextends Node<K,V{
            final Node<K,V>[] nextTable;
            ForwardingNode(Node<K,V>[] tab) {
                //注意这里
                super(MOVED, nullnullnull);
                this.nextTable = tab;
            }
     //.....
    }
    static final class TreeBin<K,Vextends Node<K,V{
            TreeNode<K,V> root;
            volatile TreeNode<K,V> first;
            volatile Thread waiter;
            volatile int lockState;
            // values for lockState
            static final int WRITER = 1// set while holding write lock
            static final int WAITER = 2// set when waiting for write lock
            static final int READER = 4// increment value for setting read lock
    }

    构造函数

    整体的构造情况基本跟HashMap类似,并且为了跟原来的JDK7中的兼容性还可以传入并发度。不过JDK8中并发度已经有table的具体长度来控制了。

    put

    假设table已经初始化完成,put操作采用 CAS + synchronized 实现并发插入或更新操作,具体实现如下。


    // Node 节点的 hash值在HashMap中存储的就是hash值,在currenthashmap中可能有多种情况哦!
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key  null || value  nullthrow new NullPointerException(); //边界处理
        int hash = spread(key.hashCode());// 最终hash值计算
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) { //循环表
            Node<K,V> f; int n, i, fh;
            if (tab  null || (n = tab.length)  0)
                tab = initTable(); // 初始化表 如果为空,懒汉式
            else if ((f = tabAt(tab, i = (n - 1) & hash))  null) {
            // 如果对应桶位置为空
                if (casTabAt(tab, i, nullnew Node<K,V>(hash, key, value, null))) 
                             // CAS 原子性的尝试插入
                    break;
            } 
            else if ((fh = f.hash)  MOVED) 
            // 如果当前节点正在扩容。还要帮着去扩容。
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) { //  桶存在数据 加锁操作进行处理
                    if (tabAt(tab, i)  f) {
                        if (fh >= 0) { // 如果存储的是链表 存储的是节点的hash值
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 遍历链表去查找,如果找到key一样则选择性
                                if (e.hash  hash &&
                                    ((ek = e.key)  key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next)  null) {// 找到尾部插入
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {// 如果桶节点类型为TreeBin
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) { 
                                 // 尝试红黑树插入,同时也要防止节点本来就有,选择性覆盖
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) { // 如果链表数量
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i); //  链表转红黑树哦!
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount); // 统计大小 并且检查是否要扩容。
        return null;
    }

    涉及到重要函数initTabletabAtcasTabAthelpTransferputTreeValtreeifyBinaddCount函数。

    initTable

    「只允许一个线程」对表进行初始化,如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度Thread.yield。这样,保证了表同时只会被一个线程初始化,对于table的大小,会根据sizeCtl的值进行设置,如果没有设置szieCtl的值,那么默认生成的table大小为16,否则,会根据sizeCtl的大小设置table大小。

    // 容器初始化 操作
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table)  null || tab.length  0) {
            if ((sc = sizeCtl) < 0// 如果正在初始化-1,-N 正在扩容。
                Thread.yield(); // 进行线程让步等待
         // 让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。
         // 它可能会获取到,也有可能被其他线程获取到。
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 
              //  比较sizeCtl的值与sc是否相等,相等则用 -1 替换,这表明我这个线程在进行初始化了!
                try {
                    if ((tab = table)  null || tab.length  0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默认为16
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2); // sc = 0.75n
                    }
                } finally {
                    sizeCtl = sc; //设置sizeCtl 类似threshold
                }
                break;
            }
        }
        return tab;
    }

    unsafe

    ConcurrentHashMap中使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制。

     // 用来返回节点数组的指定位置的节点的原子操作
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    // cas原子操作,在指定位置设定值
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v)
     
    {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    // 原子操作,在指定位置设定值
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

    // 比较table数组下标为i的结点是否为c,若为c,则用v交换操作。否则,不进行交换操作。
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v)
     
    {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    可以看到获得table[i]数据是通过Unsafe对象通过反射获取的,取数据直接table[index]不可以么,为什么要这么复杂?在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的「副本」,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,「保证了每次拿到数据都是最新的」

    helpTransfer

    // 可能有多个线程在同时帮忙运行helpTransfer
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            // table不是空  且 node节点是转移类型,并且转移类型的nextTable 不是空 说明还在扩容ing
            int rs = resizeStamp(tab.length); 
            // 根据 length 得到一个前16位的标识符,数组容量大小。
            // 确定新table指向没有变,老table数据也没变,并且此时 sizeCtl小于0 还在扩容ing
            while (nextTab  nextTable && table  tab && (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 || sc  rs + MAX_RESIZERS || transferIndex <= 0)
                // 1. sizeCtl 无符号右移16位获得高16位如果不等 rs 标识符变了
                // 2. 如果扩容结束了 这里可以看 trePresize 函数第一次扩容操作:
                // 默认第一个线程设置 sc = rs 左移 16 位 + 2,当第一个线程结束扩容了,
                // 就会将 sc 减一。这个时候,sc 就等于 rs + 1。
                // 3. 如果达到了最大帮助线程个数 65535个
                // 4. 如果转移下标调整ing 扩容已经结束了
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 如果以上都不是, 将 sizeCtl + 1,增加一个线程来扩容
                    transfer(tab, nextTab); // 进行转移
                    break;// 结束循环
                }
            }
            return nextTab;
        }
        return table;
    }
    static final int resizeStamp(int n) {
       return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); 
       //RESIZE_STAMP_BITS = 16
    }

    addCount

    主要就2件事:一是更新 baseCount,二是判断是否需要扩容。

    private final void addCount(long x, int check) {
     CounterCell[] as; long b, s;
     // 首先如果没有并发 此时countCells is null, 此时尝试CAS设置数据值。
     if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
         // 如果 counterCells不为空以为此时有并发的设置 或者 CAS设置 baseCount 失败了
         CounterCell a; long v; int m;
         boolean uncontended = true;
         if (as  null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m])  null ||
             !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
             // 1. 如果没出现并发 此时计数盒子为 null
             // 2. 随机取出一个数组位置发现为空
             // 3. 出现并发后修改这个cellvalue 失败了
             // 执行funAddCount
             fullAddCount(x, uncontended);// 死循环操作
             return;
         }
         if (check <= 1)
             return;
         s = sumCount(); // 吧counterCells数组中的每一个数据进行累加给baseCount。
     }
     // 如果需要扩容
     if (check >= 0) {
      Node<K,V>[] tab, nt; int n, sc;
      while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
       int rs = resizeStamp(n);// 获得高位标识符
       if (sc < 0) { // 是否需要帮忙去扩容
        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 ||
         sc  rs + MAX_RESIZERS || (nt = nextTable)  null || transferIndex <= 0)
         break;
        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
         transfer(tab, nt);
       } // 第一次扩容
       else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
        transfer(tab, null);
       s = sumCount();
      }
     }
    }

    我是这样给阿里面试官吹 ConcurrentHashMap的-LMLPHP如果不是第一次扩容则直接将低16位的数字 +1 即可。

    putTreeVal

    这个操作几乎跟HashMap的操作完全一样,核心思想就是一定要决定向左还是向右然后最终尝试放置新数据,然后balance。不同点就是有锁的考虑。

    treeifyBin

    这里的基本思路跟HashMap几乎一样,不同点就是先变成TreeNode,然后是「单向链表」串联。

    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            //如果整个table的数量小于64,就扩容至原来的一倍,不转红黑树了
            //因为这个阈值扩容可以减少hash冲突,不必要去转红黑树
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) { //锁定当前桶
                    if (tabAt(tab, index)  b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            //遍历这个链表然后将每个节点封装成TreeNode,最终单链表串联起来,
                            // 最终 调用setTabAt 放置红黑树
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  nullnull);
                            if ((p.prev = tl)  null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        //通过TreeBin对象对TreeNode转换成红黑树
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

    TreeBin

    主要功能就是链表变化为红黑树,这个红黑树用TreeBin来包装。并且要注意 转成红黑树以后以前链表的结构信息还是有的,最终信息如下:

    TreeBin(TreeNode<K,V> b) {
                super(TREEBIN, nullnullnull);   
                //创建空节点 hash = -2 
                this.first = b;
                TreeNode<K,V> r = null// root 节点
                for (TreeNode<K,V> x = b, next; x != null; x = next) {
                    next = (TreeNode<K,V>)x.next;
                    x.left = x.right = null;
                    if (r  null) {
                        x.parent = null;
                        x.red = false;
                        r = x; // root 节点设置为x 
                    }
                    else {
                        K k = x.key;
                        int h = x.hash;
                        Class<?> kc = null;
                        for (TreeNode<K,V> p = r;;) {
                       // x代表的是转换为树之前的顺序遍历到链表的位置的节点,r代表的是根节点
                            int dir, ph;
                            K pk = p.key;
                            if ((ph = p.hash) > h)
                                dir = -1;
                            else if (ph < h)
                                dir = 1;
                            else if ((kc  null &&
                                      (kc = comparableClassFor(k))  null) ||
                                     (dir = compareComparables(kc, k, pk))  0)
                                dir = tieBreakOrder(k, pk);    
                                // 当key不可以比较,或者相等的时候采取的一种排序措施
                                TreeNode<K,V> xp = p;
                            // 放一定是放在叶子节点上,如果还没找到叶子节点则进行循环往下找。
                            // 找到了目前叶子节点才会进入 再放置数据
                            if ((p = (dir <= 0) ? p.left : p.right)  null) {
                                x.parent = xp;
                                if (dir <= 0)
                                    xp.left = x;
                                else
                                    xp.right = x;
                                r = balanceInsertion(r, x); 
                         // 每次插入一个元素的时候都调用 balanceInsertion 来保持红黑树的平衡
                                break;
                            }
                        }
                    }
                }
                this.root = r;
                assert checkInvariants(root);
            }

    tryPresize

    当数组长度小于64的时候,扩张数组长度一倍,调用此函数。扩容后容量大小的核对,可能涉及到初始化容器大小。并且扩容的时候又跟2的次幂联系上了!,其中初始化时候传入map会调用putAll方法直接put一个map的话,在「putAll」方法中没有调用initTable方法去初始化table,而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断。

    PS:默认第一个线程设置 sc = rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1,这个时候说明扩容完毕了。

         /**
         * 扩容表为指可以容纳指定个数的大小(总是2的N次方)
         * 假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12
         * 计算出来c的值为64, 则要扩容到 sizeCtl ≥ c
         *  第一次扩容之后 数组长:32 sizeCtl:24
         *  第三次扩容之后 数组长:128 sizeCtl:96 退出
         */

        private final void tryPresize(int size) {
            int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
                tableSizeFor(size + (size >>> 1) + 1); // 合理范围
            int sc;
            while ((sc = sizeCtl) >= 0) {
                Node<K,V>[] tab = table; int n;
                    if (tab  null || (n = tab.length)  0) {
                    // 初始化传入map,今天putAll会直接调用这个。
                    n = (sc > c) ? sc : c;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    
                    //初始化tab的时候,把 sizeCtl 设为 -1
                        try {
                            if (table  tab) {
                                @SuppressWarnings("unchecked")
                                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                                table = nt;
                                sc = n - (n >>> 2); // sc=sizeCtl = 0.75n
                            }
                        } finally {
                            sizeCtl = sc;
                        }
                    }
                }
                 // 初始化时候如果  数组容量<=sizeCtl 或 容量已经最大化了则退出
                else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                        break;//退出扩张
                }
                else if (tab  table) {
                    int rs = resizeStamp(n);

                    if (sc < 0) { // sc = siztCtl 如果正在扩容Table的话,则帮助扩容
                        Node<K,V>[] nt;
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 ||
                            sc  rs + MAX_RESIZERS || (nt = nextTable)  null ||
                            transferIndex <= 0)
                            break// 各种条件判断是否需要加入扩容工作。
                         // 帮助转移数据的线程数 + 1
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                     // 没有在初始化或扩容,则开始扩容
                     // 此处切记第一次扩容 直接 +2 
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                          (rs << RESIZE_STAMP_SHIFT) + 2)) {
                            transfer(tab, null);
                    }
                }
            }
        }

    transfer

    这里代码量比较大主要分文三部分,并且感觉思路很精髓,尤其「是其他线程帮着去扩容的骚操作」

    该函数中的finish= true 则说明整张表的迁移操作已经「全部」完成了,我们只需要重置 table的引用并将 nextTable 赋为空即可。否则,CAS 式的将 sizeCtl减一,表示当前线程已经完成了任务,退出扩容操作。如果退出成功,那么需要进一步判断当前线程是否就是最后一个在执行扩容的。

    f ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
       return;

    第一次扩容时在addCount中有写到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示当前只有一个线程正在工作,「相对应的」,如果 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT,说明当前线程就是最后一个还在扩容的线程,那么会将 finishing 标识为 true,并在下一次循环中退出扩容方法。

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    //MIN_TRANSFER_STRIDE 用来控制不要占用太多CPU
            stride = MIN_TRANSFER_STRIDE; // subdivide range    //MIN_TRANSFER_STRIDE=16 每个CPU处理最小长度个数

        if (nextTab  null) { // 新表格为空则直接新建二倍,别的辅助线程来帮忙扩容则不会进入此if条件
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n; // transferIndex 指向最后一个桶,方便从后向前遍历
        }
        int nextn = nextTab.length; // 新表长度
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点
        boolean advance = true;    //是否继续向前查找的标志位
        boolean finishing = false// to ensure sweep(清扫) before committing nextTab,在完成之前重新在扫描一遍数组,看看有没完成的没
         // 第一部分
        // i 指向当前桶, bound 指向当前线程需要处理的桶结点的区间下限【bound,i】 这样来跟线程划分任务。
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
           // 这个 while 循环的目的就是通过 --i 遍历当前线程所分配到的桶结点
           // 一个桶一个桶的处理
            while (advance) {//  每一次成功处理操作都会将advance设置为true,然里来处理区间的上一个数据
                int nextIndex, nextBound;
                if (--i >= bound || finishing) { //通过此处进行任务区间的遍历
                    advance = false;
                }
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;// 任务分配完了
                    advance = false;
                }
                // 更新 transferIndex
               // 为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)
                else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                   // nextIndex本来等于末尾数字,
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // 当前线程所有任务完成 
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {  // 已经完成转移 则直接赋值操作
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);    //设置sizeCtl为扩容后的0.75
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1 表示当前线程任务完成。
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { 
                    // 判断当前线程完成的线程是不是最后一个在扩容的,思路精髓
                            return;
                    }
                    finishing = advance = true;// 如果是则相应的设置参数
                    i = n; 
                }
            }
            else if ((f = tabAt(tab, i))  null// 数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])
                advance = casTabAt(tab, i, null, fwd); // 如果老节点数据是空的则直接进行CAS设置为fwd
            else if ((fh = f.hash)  MOVED) //已经是个fwd了,因为是多线程操作 可能别人已经给你弄好了,
                advance = true// already processed
            else {
                synchronized (f) { //加锁操作
                    if (tabAt(tab, i)  f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) { //该节点的hash值大于等于0,说明是一个Node节点
                        // 关于链表的操作整体跟HashMap类似不过 感觉好像更扰一些。
                            int runBit = fh & n; // fh= f.hash first hash的意思,看第一个点 放老位置还是新位置
                            Node<K,V> lastRun = f;

                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;    //n的值为扩张前的数组的长度
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;//最后导致发生变化的节点
                                }
                            }
                            if (runBit  0) { //看最后一个变化点是新还是旧 旧
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun; //看最后一个变化点是新还是旧 旧
                                ln = null;
                            }
                            /*
                             * 构造两个链表,顺序大部分和原来是反的,不过顺序也有差异
                             * 分别放到原来的位置和新增加的长度的相同位置(i/n+i)
                             */

                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n)  0)
                                        /*
                                         * 假设runBit的值为0,
                                         * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点
                                         * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
                                         */

                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                        /*
                                         * 假设runBit的值不为0,
                                         * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点
                                         * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点
                                         */

                                    hn = new Node<K,V>(ph, pk, pv, hn);    
                            }
                            setTabAt(nextTab, i, ln);    
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) { // 该节点hash值是个负数否则的话是一个树节点
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null// 旧 头尾
                            TreeNode<K,V> hi = null, hiTail = null//新头围
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, nullnull);
                                if ((h & n)  0) {
                                    if ((p.prev = loTail)  null)
                                        lo = p;
                                    else
                                        loTail.next = p; //旧头尾设置
                                    loTail = p;
                                    ++lc;
                                }
                                else { // 新头围设置
                                    if ((p.prev = hiTail)  null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                             //ln  如果老位置数字<=6 则要对老位置链表进行红黑树降级到链表,否则就看是否还需要对老位置数据进行新建红黑树
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd); //老表中i位置节点设置下
                            advance = true;
                        }
                    }
                }
            }
        }
    }

    get

    这个就很简单了,获得hash值,然后判断存在与否,遍历链表即可,注意get没有任何锁操作!

        public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            // 计算key的hash值
            int h = spread(key.hashCode()); 
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空
                if ((eh = e.hash)  h) { // 表中的元素的hash值与key的hash值相等
                    if ((ek = e.key)  key || (ek != null && key.equals(ek))) // 键相等
                        // 返回值
                        return e.val;
                }
                else if (eh < 0// 是个TreeBin hash = -2 
                    // 在红黑树中查找,因为红黑树中也保存这一个链表顺序
                    return (p = e.find(h, key)) != null ? p.val : null;
                while ((e = e.next) != null) { // 对于结点hash值大于0的情况链表
                    if (e.hash  h &&
                        ((ek = e.key)  key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }

    clear

    关于清空也相对简单 ,无非就是遍历桶数组,然后通过CAS来置空。

    public void clear() {
        long delta = 0L;
        int i = 0;
        Node<K,V>[] tab = table;
        while (tab != null && i < tab.length) {
            int fh;
            Node<K,V> f = tabAt(tab, i);
            if (f  null)
                ++i; //这个桶是空的直接跳过
            else if ((fh = f.hash)  MOVED) { // 这个桶的数据还在扩容中,要去扩容同时等待。
                tab = helpTransfer(tab, f);
                i = 0// restart
            }
            else {
                synchronized (f) { // 真正的删除
                    if (tabAt(tab, i)  f) {
                        Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);
                            //循环到链表/者红黑树的尾部
                            while (p != null) {
                                --delta; // 记录删除了多少个
                                p = p.next;
                            } 
                            //利用CAS无锁置null  
                            setTabAt(tab, i++, null);
                        }
                    }
                }
            }
            if (delta != 0L)
                addCount(delta, -1); //调整count
        }

    end

    ConcurrentHashMap是如果来做到「并发安全」,又是如何做到「高效」的并发的呢?

    总结扩容情况发生:

    3. 扩容时候是否可以进行读写。

    套路

    如果这篇文章对您有所帮助,或者有所启发的话,帮忙扫描下方二维码关注一下,我将会为您分享更多硬核干货好文。

    求一键三连:点赞、转发、在看。

    在公众号中回复:面试、代码神器、开发手册、时间管理有超赞的粉丝福利,另外回复:加群,可以跟很多BAT大厂的前辈交流和学习。我是这样给阿里面试官吹 ConcurrentHashMap的-LMLPHP

    本文分享自微信公众号 - 苏三说技术(gh_9f551dfec941)。
    如有侵权,请联系 support@oschina.cn 删除。
    本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

    03-13 18:54