程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长


+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

2023-06(4)

ConcurrentHashMap源码详解

发布于2021-05-29 19:46     阅读(889)     评论(0)     点赞(2)     收藏(4)


一、前言

HashMap源码解析篇中,我们对HashMap源码部分进行了详细透彻的分析,提出了连环追命16连问,算是把HashMap所涉及到的知识点在源码层面都分析完了。但是HashMap在多线程环境下有线程安全的问题,这篇文章,我们讲下有什么集合可以保证在多线程的环境中线程安全。

二、正文

1.HashMap在多线程环境中存在线程安全的问题,那应该如何处理呢?

通常我们会有三方替代方式:

  • 使用Collections.synchronizedMap(Map)创建线程安全的集合
  • Hashtable
  • ConcurrentHashMap

不过出于对线程并发度的考虑,我们会舍弃前面两种而选择ConcurrentHashMap,他的性能和效率明显高于前两者。

2.Collections.synchronizedMap是怎么实现线程安全的,你有了解过吗?

通过分析源码,synchronizedMap内部维护了一个普通Map对象和一个排斥锁Mutex

在这里插入图片描述

我们调用这个方法需要传入一个Map,可以看到有两个构造器,如果你传入了mutex,则将排斥锁赋值为传入对象。

如果没有,则将对象排斥锁赋值为this,即调用synchronizedMap的对象,就是上面传入的Map。

创建出synchronizedMap后,在操作Map的时候,就会对方法上锁,如图全是锁:

在这里插入图片描述

3.那Hashtable是如何实现线程安全的呢?

跟HashMap相比Hashtable是线程安全的,适合多线程情况下使用,但是它的效率比较低。通过查看其源码可以看到,它在对数据的操作的时候都会上锁,所以效率比较低。

在这里插入图片描述

4.能说说HashMap和Hashtable的区别嘛?

  • 实现方式不同:Hashtable继承Dictionary类,而HashMap继承的是AbstractMap类。

  • 初始容量不同:HashMap初始容量是16,Hashtable初始容量是11,两个的负载因子都是0.75f。

  • 扩容机制不同:当现有容量大于总容量*负载因子时,HashMap扩容规则为当前容量翻倍,Hashtable扩容规则是当前容量翻倍+1。

  • 迭代器不同:HashMap中的iterator迭代器是fail-fast的,而Hashtable的迭代器Enumerator不是fail-fast的。

  • HashMap的key和value都可以是null,Hashtable的key和value都不能为null。

    注:

    (1)Hashtable遍历可以使用两种迭代器,iterator(jdk1.2有的)和Enumerator(jdk1.0有的)这两种迭代器都可以用到,但是Hashtable中iterator是通过Enumerator去实现的。

    (2)Hashtable使用Enumerator迭代器比使用iterator迭代器效率高,毕竟iterator有fail-fast机制。

5.fail-fast是什么?

快速失败(fail-fast)是Java集合的一种机制,在用迭代器遍历集合对象时,如果在遍历过程中对集合进行了修改(增加、删除、修改),则会抛出Concurrent Modifiction Exception。

他的原理是:迭代器在遍历时直接访问集合中的内容,并且在遍历时使用一个modCount变量,集合在遍历过程中如果有任何修改,就会改变modCount的值。

迭代器使用hasNext()/next()遍历下一个元素前,都会检测modCount变量是否为expectedmodCount值,是的话返回遍历,否则抛出异常,终止遍历。

Tip:这里异常抛出条件是检测到modCount != expectedmodCount这个条件。如果集合发生变化时修改的modCount值正好等于expectedmodCount,则不会抛出异常。

因此,不能依赖这个异常是否抛出而进行并发操作的编程,这个异常只建议用于检测并发修改bug。

java.util包下的集合都是快速失败的,不能在多线程下发生并发修改(迭代过程中修改)算是一种安全机制把。

6.为什么HashMap的key和value都可以是null,Hashtable的key和value都不能为null?

(1)首先看下HashMap的源码:

在这里插入图片描述
在这里插入图片描述

可以看出HashMap在put的时候会调用hash()方法来计算key的hashcode值,当key==null是会返回0,因此hash值为0,并不会调用key的hashcode方法。因此,hashMap的key和value都可以为null。

(2)再来看下Hashtable源码:

在这里插入图片描述

可以看出Hashtable的value在等于null的时候会抛出异常,而当计算hash时会调用key.hashCode()方法,当key为null的时候也会抛出异常,所以Hashtable的key和value都不能为空。

(3)Hashable使用的是安全失败(fail-safe)机制,这种机制会使你此次读取到的数据不一定是最新的数据。

如果你使用null值,就无法判断对应的key是不存在还是为空,因为你无法再次使用Contain(key)来对key是否存在进行判断,ConcurrentHashMap同理。

7.安全失败(fail-safe)机制是什么?

采用安全失败机制的集合容器,在用迭代器遍历时不是直接在集合内容上访问,而是先复制原有集合内容,在拷贝的集合上进行遍历。

由于迭代器是对原集合的拷贝进行遍历,所以在遍历过程中对原集合的修改并不能被迭代器检测到,故不会抛出ConcurrentModificationException异常。

java.util.concurrent包下的容器都是安全失败的,可以在多线程下并发使用,并发修改。

8.ConcurrentHashMap底层的数据结构?

ConcurrentHashMap在jdk1.7底层的数据结构是:数组+链表。jdk1.8底层的数据结构是:数组+链表+红黑树。

jdk1.7的实现:

在这里插入图片描述

如图所示,是由Segment数组、HashEntry组成,和HashMap一样,仍然是数组和链表。

Segment是ConcurrentHashMap的一个内部类,主要的组成如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {

    private static final long serialVersionUID = 2249069246763182397L;

    // 和 HashMap 中的 HashEntry 作用一样,真正存放数据的桶
    transient volatile HashEntry<K,V>[] table;
    
    transient int count;
    // 记得快速失败(fail—fast)么?
    transient int modCount;
    // 大小
    transient int threshold;
    // 负载因子
    final float loadFactor;

}

HashEntry跟HashMap差不多,但是不同点是,他使用Volatile修饰他的数据value和下一个节点next,源码如下所示:

在这里插入图片描述

9.Volatile的特性是啥?

  • 实现了可见性:保证了不同线程对这个变量操作时的可见性,即一个线程修改了某个变量的值,这个新值对其他线程来说立即可见。
  • 实现了有序性:禁止进行指令重排序。
  • volatile只能保证单次读/写的原子性。i++这种操作不能保证原子性。

在这就不大篇幅介绍了,具体详细的介绍在多线程章节介绍,大家知道用了安全就对了。

10.ConcurrentHashMap为什么并发度高?

对于jdk1.7来说,ConcurrentHashMap采用了分段锁技术,其中Segment继承了ReentrantLock。

不会像Hashtable那样不管put还是get操作都需要做同步处理,理论上ConcurrentHashMap支持concurrency Level(Segment数组数量)的线程并发。

每当一个线程占用锁访问一个Segment时,不会影响到其他Segment。

也就是说如果容量大小是16他的并发度就是16,可以同时允许16个线程操作16个Segment而且还是线程安全的。

public V put(K key, V value) {
    Segment<K,V> s;
    //这就是为啥他不可以put null值的原因
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          
         (segments, (j << SSHIFT) + SBASE)) == null) 
        // 定位到Segment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

他先定位到Segment,然后再进行put操作。

我们看下put的源码,就知道线程安全是怎么做的了,如下:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry
    HashEntry<K,V> node = tryLock() ? null :
    scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                // 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value。
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                // 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        //释放锁
        unlock();
    }
    return oldValue;
}

首先第一步的时候尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用scanAndLockForPut()自旋获取,源码如下:

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

在scanAndLockForPut()方法中:

(1)尝试自旋获取锁

(2)如果尝试次数到达MAX_SCAN_RETRIES,则改为阻塞锁获取,保证能够获取成功。

再来看下get源码:

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
             (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

get的逻辑比较简单,只需要将key通过hash之后定位到具体的Segment,然后在通过一次hash定位到具体的元素上。

由于HashEntry中的value值是通过volatile关键字修饰的,保证了内存可见性,所以每次获取的值都是最新的。

ConcurrentHashMap的get方法非常高效,因为整个过程不需要加锁。

虽然1.7支持每个Segment分段锁并发访问,但是还是存在一些问题。因为基本上是数据加链表的方式,我们查询的时候还的遍历链表,当链表很长的时候,会导致遍历效率低下,这个和HashMap1.7的HashMap有着同样的问题,所有在jdk1.8中进行了优化。

11.ConcurrentHashMap 1.8的数据结构?

(1)底层的数据结构和HashMap有点像,把之前的HashEntry改为了Node,但是作用不变,把值和next采用了Volatile去修饰,保证了可见性,并且也引入了红黑树,当链表长度大于等于8时转为红黑树。

Node继承于HashMap中的Entry,用于存储数据,源码如下:

在这里插入图片描述

数据结构大概如下:

在这里插入图片描述

(2)在并发安全方面:dk1.8放弃了原先的Segment分段锁,而是采用CAS+Synchronized来保证并发的安全性。

(3)TreeNode继承了Node,但是数据结构换成了树结构,他是红黑树的数据的存储结构,用于红黑树中数据存储,当链表长度大于等于8时会转化为红黑树,他是通过TreeNode作为存储结构代替Node来转化为红黑树,部分源码如下:
在这里插入图片描述

(4)TreeBin从字面含义中可以理解为存储树形结构的容器,而属性结构是指TreeNode,所以TreeBin就是封装了TreeNode的容器,他提供了转换红黑树的一些条件和锁的控制,部分源码如下:

在这里插入图片描述

12.ConcurrentHashMap1.8的存取是如何保证线程安全的?

ConcurrentHashMap在进行put操作还是比较复杂的,大致可以分为以下步骤:

1.根据key计算出hashCode。

2.判断是否需要进行初始化。

3.为当前key定位出Node,如果为空表示当前位置可以写入数据,利用CAS尝试写入,失败则自旋保证成功。

4.如果当前位置的hashcode == MOVED == -1,则需要进行扩容。

5.如果都不满足,表示存在hash冲突,就需要利用Synchronized锁保证安全,这里有两种情况,一种是链表形式就直接遍历到尾部插入,一种是红黑树就是按照红黑树结构插入。

6.如果链表长度大于等8,就转换为红黑树。

7.如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容那个。

下面通过源码,把上面的步骤进行具体表示:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 1.根据key计算出hashCode
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 2.判断是否需要进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 3.为当前key定位出Node,如果为空表示当前位置可以写入数据,利用CAS尝试写入,失败则自旋保证成功
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 4.如果当前位置的hashcode == MOVED == -1,则需要进行扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 5.如果都不满足,表示存在hash冲突,就需要利用Synchronized锁保证安全,这里有两种情况,一种是链表形式就直接遍历到尾部插入,一种是红黑树就是按照红黑树结构插入
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 6.如果链表长度大于等8,就转换为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 7.如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容那个
    addCount(1L, binCount);
    return null;
}

13.CAS是什么?自旋又是什么?

CAS是一种乐观锁的实现方式,是一种轻量级的锁,JUC中很多工具类的实现就是基于CAS的。

CAS操作的流程如下图所示,线程在读取数据时的时候不进行加锁,在准备写回数据时,比较原值是否修改,若没有被修改则写回,若已经被修改,则重新执行读取流程。这是一种乐观的策略,认为并发操作并不是总发生的。

在这里插入图片描述

在说个在实际开发中用到乐观锁的例子:

就比如我们现在要修改数据库中的一条数据,修改前我先拿到他原先的值,然后在SQL中还有加一个判断,原来的值和我手上的值是一样,一样就代表这期间没有被修改过,可以进行修改,不一样就证明被别的线程修改了,就return错误就好。

SQL伪码大概如下:

update a set value = newValue where value = oldValue // oldValue就是我们之前查出来的值

14.CAS就一定能保证没有被别的线程修改过嘛?

并不是,经典的ABA问题,CAS就无法判断了。

15.什么是ABA问题,如何解决?

就是一个线程把值改成了B,又有一个线程把这个值改回了A,对于这个时候判断的线程,发下他的值还是A,所以就不知道这个值是否被修改过,其实很多场景如果值最求最后结果的准确性,就是没有关系的。

但是实际过程中还是需要记录修改过程的,比如资金的修改,你每次都应该有记录,方便回溯。所以我们可以用下面的方法解决ABA问题:

(1)用版本好保证就好,就比如说,我们在修改前去查询他原来的值的时候在带一个版本号,每次判断就连值和版本号一起判断,判断成功就给版本号+1.

update a set value = newValue, version = version + 1 where value = oldVale and version = version //判断原来的值和版本号是否匹配,中间别的线程修改,值可能一样,但是版本号100%不一样 

(2)还有其他方式也可以保证,比如时间戳也可以。查询的时候把时间一起查出来,对的上的才修改并且更新值的时候把时间一起更新了,这样也能够保证,方法和版本号有异曲同工之妙,就是大家怎么设计。

16.CAS性能很高,但是我们知道Synchronized性能可不咋地,为什么jdk1.8升级就之后反而用了Synchronized?

synchronized一直都是重量级锁,但是后来Java官方对他进行了升级,他现在采用锁升级的方式去做的。

针对synchronized获取锁的方式,JVM使用了锁升级的方式优化,就是先使用偏向锁优先同一线程然后再次获取所,如果失败了,就升级为CAS轻量级锁,如果失败就会短暂自旋,防止线程被系统挂起。最后如果以上都失败就升级为重量级锁。

所以是一步步升级上去的,最初也是通过很多轻量级的方式锁定的。

17.ConcurrentHashMap 1.8的get操作又是怎么样子的?

  • 计算hash值,定位到该table索引位置,如果是首节点符合就返回
  • 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
  • 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后返回null

在get源码中进行了具体的标识:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算两次hash
    int h = spread(key.hashCode());
    // 读取首节点的Node元素
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果该节点就是首节点就返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来查找,查找到就返回
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 既不是首节点也不是ForwardingNode,那就往下遍历
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

18.ConcurrentHashMap 1.8如何统计元素总数?

为了有一个高性能的size()方法,ConcurrentHashMap使用了单独的方法统计元素总数,元素数量统计在CounterCell数组中:

在这里插入图片描述

CounterCell使用伪共享优化,具有很高的读写性能,ConterCells所有的成员的value相加,就是整个map的大小,这里使用数组,也是为了防止冲突。

如果简单使用一个变量,那么多线程累加一个计数器时,难免要有竞争,现在分散到一个数组中,这种竞争就小了很多,对并发就更加友好了。

累加的主要逻辑如下:

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            //不同线程映射到不同的数组元素,防止冲突
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            //使用CAS直接增加对应的数据
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            //如果有竞争,在这里会重试,如果竞争严重还会将CounterCell[]数组扩容,以减少竞争
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }

三、总结

ConcurrentHashMap是HasnMap的线程安全版,可以保证在多线程中的线程安全,并且jdk1.8相比jdk1.7做了很多的改变和优化,极大的提升了性能。我们在运用API的时候,不能只是会用,而应该知道为什么,总是流于表面,只会CRUD,我们又如何提高呢?

最后引用我很佩服的一个人经常说的话:你知道的越多,你不知道的越多!

文章参考:

https://mp.weixin.qq.com/s/AixdbEiXf3KfE724kg2YIw

https://blog.csdn.net/xingxiupaioxue/article/details/88062163

https://mp.weixin.qq.com/s/f2gcvZIUDhrXVA-Up96kHA

原文链接:https://blog.csdn.net/SeekN/article/details/117264938



所属网站分类: 技术文章 > 博客

作者:怎么没有鱼儿上钩呢

链接:http://www.javaheidong.com/blog/article/207061/69188ca2d4027b0aeee0/

来源:java黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

2 0
收藏该文
已收藏

评论内容:(最多支持255个字符)