在高并发中如何获得线程安全的集合呢?

List Set Map

Posted by MasterJen on November 22, 2018

Hey Collections

While there is life there is hope.–一息若存,希望不灭.

在高并发场景中,我们首先考虑的就是线程安全问题,那么如何对数据进行安全的处理呢? 如何获得线程安全的集合呢?

一 List

* ArrayList 基于数组的集合 查询快,有序,有下标,但是线程不安全
* LinkedArrayList 基于链表的形式,增删快,无序,查询慢,线程同样不安全
* Vector 线程安全的 ,但是效率低 
* CopyArrayList 高效的读写,方便查询,线程安全  (首选) 

第一种获得线程安全的List集合 Vector 为什么是线程安全的呢? 肯定是加锁了

部分源码: add() 操作

   public synchronized boolean add(E e) {
        modCount++;
        ensureCapacityHelper(elementCount + 1);
        elementData[elementCount++] = e;
        return true;
    }

根据源码可以看到,里面的都是加锁的 所以效率会很低的.

第二种获得 线程安全的List集合 通过 Collections 工具获得

爱码如下

        ArrayList arrayList = new ArrayList();
        List<Object> objects = (List<Object>) Collections.synchronizedList(arrayList);

那么那是怎么对 ArrayList 进行 加锁的呢? 追溯源码

   // 构造方法 静态的
 public static <T> List<T> synchronizedList(List<T> list) {
        return (list instanceof RandomAccess ?
                new SynchronizedRandomAccessList<>(list) :
                new SynchronizedList<>(list));
    }
    
    // 继续   调用 super(List)
    
    SynchronizedList(List<E> list) {
        super(list);
        this.list = list;
    }
    
    //  super  这里发现了  mutex  那么mutex是什么呢?
    SynchronizedCollection(Collection<E> c) {
        this.c = Objects.requireNonNull(c);
        mutex = this;
    }
       //   继续追溯  发现  mutex就是一个锁的标志
      static class SynchronizedCollection<E> implements Collection<E>, Serializable {
            private static final long serialVersionUID = 3053995032091335093L;
    
            final Collection<E> c;  // Backing Collection
            //  其实就是一个锁  
            final Object mutex;     // Object on which to synchronize

当调用这个List里面的方法时 会发现 都有一个 mutex的标志 那么就可以说明 这是一个锁的对象 ,是线程安全的了.

        public E set(int index, E element) {
        //
            synchronized (mutex) {return list.set(index, element);}
        }
        public void add(int index, E element) {
            synchronized (mutex) {list.add(index, element);}
        }      

第三种 获得线程安全的List集合 也是首选的list 集合, CopyOnWriteArrayList

它有什么好处呢? 追溯源码

    add方法
 public boolean add(E e) {
    //  对增删改 进行了上锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        //  原始的 数组  old 
            Object[] elements = getArray();
            int len = elements.length;
            // 创建新的 数组 new
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            // 当写完之后 进行 设置当前的数组 为 new
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    // get 方法  获取当前的 数组
       public E get(int index) {
            return get(getArray(), index);
        } 

由此可以看到 就是当增删改时 总会创建新的数组,然后不影响其 get方法,因为get方法 访问的数组永远是当前的数组,而增删改时 是一个原子操作,当其更新后,会重新进行设置当前的数组,所以不会发生错误.既高效,又方便,实现了写写锁互斥,读写锁的不互斥,读读锁的不互斥.

二 Set

* HashSet HashSet集合不能保证的迭代顺序与元素存储顺序相同.HashSet集合,采用哈希表结构存储数据,保证元素唯一性的方式依赖于:hashCode()与equals()方法. 同样线程不安全
* LinkedHashSet 在HashSet下面有一个子类LinkedHashSet,它是链表和哈希表组合的一个数据存储结构.保证了存储顺序.线程不安全
* TreeSet  需要提供比较器 Comparable or Comparator 用来比较  线程不安全
* CopyOnWriteArraySet         线程安全

第一种 获得 线程集合的Set 通过Collections 的synchronized的方法

       HashSet hashSet = new HashSet();
        Set set = Collections.synchronizedSet(hashSet);
         
         和上述的 List 集合一样  都有个标志 mutex  进行加锁

第二种 CopyOnWriteArraySet 首选

源码追溯

private final CopyOnWriteArrayList<E> al;

/**
 * Creates an empty set.
 */
public CopyOnWriteArraySet() {
//  里面实现的是 CopyOnWriteArrayList  
    al = new CopyOnWriteArrayList<E>();
}           

所以外面是 Set 其实里面还是 List 那么Set是怎么实现去重的呢?

    public boolean add(E e) {
        return al.addIfAbsent(e);
    }    
    
     public boolean addIfAbsent(E e) {
            Object[] snapshot = getArray();
            return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
                addIfAbsent(e, snapshot);
        }
        // 进行了一个去重 的遍历运算 ,所以保证了  唯一性.
          private static int indexOf(Object o, Object[] elements,
                                       int index, int fence) {
                if (o == null) {
                    for (int i = index; i < fence; i++)
                        if (elements[i] == null)
                            return i;
                } else {
                    for (int i = index; i < fence; i++)
                        if (o.equals(elements[i]))
                            return i;
                }
                return -1;
            }

三 Map

1、Hashtable:内部结构是哈希表,是同步的.不支持null作为键和值.

2、HashMap:内部结构是哈希表,不是同步的,支持null作为键和值.

3、TreeMap:内部结构是二叉树,不是同步的,支持null作为键和值.

4 、 ConcurrentHashMap  线程安全的Map  分段锁

ConcurrentHashMap 究竟有何神圣呢? jdk1.7 之前使用的是 Segment 数组 之后 使用的是比较hashCode 进行计算存储.

ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问.有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁.这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证.这可以确保不会出现死锁,因为获得锁的顺序是固定的.    
   
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成.Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据.一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁.

源码追溯

    // 默认 容器 16
   private static final int DEFAULT_CAPACITY = 16;
   // 默认的并发量  也是 16
   private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
   
   /** 
   * The segments, each of which is a specialized hash table 
   */  
   //  用于 存储数据的 Semgent数组  基于数组和链表的形式
   final Segment<K,V>[] segments;

ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁.如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据.ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的.HashEntry代表每个hash链中的一个节点,其结构如下所示

 static final class HashEntry<K,V> {  
     final K key;  
     final int hash;  
     volatile V value;  
     final HashEntry<K,V> next;  
 } 

get操作 ConcurrentHashMap的get操作是直接委托给Segment的get方法,直接看Segment的get方法:

V get(Object key, int hash) {  
// read-volatile 当前桶的数据个数是否为0 
     if (count != 0) { 
         HashEntry<K,V> e = getFirst(hash);  
         while (e != null) {  
             if (e.hash == hash && key.equals(e.key)) {  
                 V v = e.value;  
                 if (v != null)  
                     return v;  
                 return readValueUnderLock(e); 
             }  
             e = e.next;  
         }  
     }  
     returnnull;  
 }     

不对Map进行加锁,那么对分段Semgent中是如何进行加锁的呢 Semgent中的put方法

   final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 上锁
            HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                // 每一个segment对应一个HashEntry数组
                HashEntry<K,V>[] tab = table;
                // 计算对应HashEntry数组的下标

                // 每个segment中数组的长度都是2的N次方,所以这里经过运算之后,取的是hash的低几位数据
                int index = (tab.length - 1) & hash;
                // 定位到HashEntry数组中的某个结点(对应链表的表头结点)
                HashEntry<K,V> first = entryAt(tab, index);
                // 遍历链表
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) { // 如果链表不为空
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else { // 如果链表为空(表头为空)
                        if (node != null)
                            // 将新节点插入链表作为表头
                            node.setNext(first);
                        else
                            // 根据key value 创建结点并插入链表
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        // 判断元素个数是否超过了阈值或者segment中数组的长度超过了MAXIMUM_CAPACITY,如果满足条件则rehash扩容!
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else // 不需要扩容时,将node放到数组(HashEntry[])中对应的位置
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock(); // 解锁
            }
            return oldValue; // 返回旧value值
        }

看完这些后,大家也许知道了 ConrrentHashMap的好处了吧

 * 初始容量默认为16段(Semgent),使用分段锁的设计
 * 不对整个 Map 加锁 而是对每个Semgent 进行加锁
 * 当多个对象存入同一个Semgent时 才需要互斥
 * 最理想的状态为 16个对象分别存入16个 Semgent,并行数量 16
 * 用法和HashMap无异.

如下便完成了获得线程安全集合 总结:

获得 并发中的List 集合 CopyOnWriteArrayList
获得 并发中的Set 集合 CopyOnWriteArraySet
获得 并发中的Map 集合   ConrrentHashMap

至于其优点,信大家都已经知道了吧.