并发容器

  • 为什么需要并发容器

java提供了很多丰富的容器,主要可以分为四大类别List、Map、Set、Queue,但是这些容器的实现类有各种各样的缺点,如我们常见的ArrayList、HashMap是线程不安全的,如Hashtable、Vector是线程安全,但是效率很差,这些线程安全的容器是通过synchronized关键字来实现的,这样做就削弱了程序的并发性,所以并发容器就诞生了。

  • 并发容器的定义

并发容器是专门针对多线程并发设计的,使用了锁分段技术,只对操作位置进行同步操作,但是其它其它线程可以对没有操作的位置继续进行访问。

1、ConcurrentHashMap

1.1数据结构

ConcurrentHashMap是java.util.concurrent(JUC)包下的成员,它支持线程安全、支持多线程操作时并发读写操作,在jdk1.7中,它采用的是Segment分段锁,但是在1.8中,使用的是CAS+synchronized,底层使用的是数组+链表+红黑树

  • jdk1.7

 

 

jdk1.7中的ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成,Segment是一种可重入锁(ReentrantLock),每个Segment都包含一个HashEntry[]数组,每个HashEntry是一个链表结构,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。ConcurrentHashMap使用锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据分配一把锁,当一个线程占用锁访问其中一个段数据的时候,其它段的数据也能被其它线程访问。

  • jdk1.8

 

jdk1.8抛弃了Segment分段锁,采用的是CAS+synchronized,它的改进是为了解决查询链表效率太低的问题,将1.7中的HashEntry改为node,取消的ReentrantLock改为了synchronized,采用的数据结构为数组+链表+红黑树,保证了查询效率。

1.2代码测试

为了验证ConcurrentHashMap效率如何,我们编写一个Demo来测试下,将它与Hashtable比较,Hashtable容器是使用synchronized来保证线程安全的。

对比代码测试

/**
 * @Author: Simon Lang
 * @Date: 2020/5/12 10:40
 */
public class test_concurrentHashMap {
    public static void main(String[] args){
        //使用HashTable测试
        final Map<String,String> map=new Hashtable<String, String>();
        //使用concurrentHashMap测试
//        final Map<String,String> map=new ConcurrentHashMap<String, String>();
        Thread[] array=new Thread[1000];
        final Random r=new Random();
        //定义一个门栓
        final CountDownLatch latch=new CountDownLatch(array.length);
        long begin=System.currentTimeMillis();
        for(int i=0;i<array.length;i++){
            array[i]=new Thread(new Runnable() {
                @Override
                public void run() {
                    for(int j=0;j<10000;j++){
                        map.put("key"+r.nextInt(1000000),"value"+r.nextInt(1000000));
                    }
                    latch.countDown();
                }
            });
        }
        for (Thread t:array) {

            t.start();
        }
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long end=System.currentTimeMillis();
            System.out.println("执行时间为:"+(end-begin));
        }

    }

测试结果

类型消耗时间Hashtable8947msConcurrentHashMap7265ms

在多线程并发情况下,ConcurrentHashMap比HashTable效率高。

2、CopyOnWriteArrayList

CopyOnWriteArrayList是juc包下提供的一个并发安全的ArrayList,它的主要特点是写操作时需要复制集合,会创建一个新的底层数组,读操作时只需要获得数组下标即可,它的工作方式采用的是用空间换时间,适用于读多写少的场景中。

2.1源码分析

它的底层是数据结构是数组

private transient volatile Object[] array;

它仅在写/修改/删除操作时存在线程安全,所以在这些操作中加入了ReentrantLock来保证线程安全,以写操作为例

  public boolean add(E e) {
      //获取锁
        final ReentrantLock lock = this.lock;
      //加锁
        lock.lock();
        try {
            //获得当前集合中保存数据的数组
            Object[] elements = getArray();
            //获取该数组的长度
            int len = elements.length;
            //拷贝当前的数组,并让其长度+1
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            //将加入的元素放在新数组的最后一位
            newElements[len] = e;
            //将数组的引用指向新数组
            setArray(newElements);
            return true;
        } finally {
            //释放锁
            lock.unlock();
        }
    }

图解

 

添加元素时先将原数组复制,然后再将添加的元素放置到新数组的末尾,然后将数组的引用指向新数组。

2.2、对比代码测试

Vector实现线程安全的方法采用的是synchronized关键字,即每个方法执行的时候都要去获取锁,性能会下降,而CopyOnWriteArrayList仅在写/修改/删除操作上加锁,读操作不加锁,我们编写一个Demo来测试下

测试代码

/**
 * @Author: Simon Lang
 * @Date: 2020/5/12 11:38
 */
public class test_copyonWriterlist {
    public static void main(String[] args){
        //测试Vectot
//       final List list=new Vector();
        //测试CopyOnWriterArrayList
        final List<String> list=new CopyOnWriteArrayList<String>();
        Thread[] array=new Thread[100];
        final Random r=new Random();
        //定义一个门栓
        final CountDownLatch latch=new CountDownLatch(array.length);
        long begin=System.currentTimeMillis();
        for (int i=0;i<array.length;i++){
            array[i]=new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j=0;j<1000;j++){
                        list.add("value"+r.nextInt(100000));
                    }
                    latch.countDown();
                }
            });
        }
        for (Thread t:array){
            t.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end=System.currentTimeMillis();
        System.out.println("执行时间:"+(end-begin));
    }
}

测试结果

类型消耗时间Vector122CopyOnWriteArrayList5771

测试结果表明,在写操作时,CopyOnWriterArrayList消耗的时间明显高于Vector,所以CopyOnWriterArrayList适用于读操作的场景中。

3、Queue

3.1、ConcurrentLinkedQueue

在并发编程中,有两种方法可以实现线程安全的队列,一种使用的是阻塞算法,该算法使用同一个锁(入队出队使用同一把锁)或中两把锁(入队和出队使用不同的锁)实现,另一种是使用非阻塞算法,该算法是使用循环CAS的方法来实现,我们这里主要使用非阻塞算法的方法来实现ConcurrentLinkedQueue。

3.1.1数据结构

ConcurrentLinkedQueue有head结点和tail结点组成,每个结点有由节点元素(item)和下一个结点的引用(next)组成,结点之间就是通过这个next关联起来,组成一个链表结构的队列

volatile E item;
volatile Node<E> next;

结构图

 

3.1.2入队分析

入队就是将入队结点添加的队列的尾部,我们以分别添加4个元素为例来讲解

 

 

入队主要分两个步骤:①将入队结点设置为当前队尾结点的下一个结点②更新tail结点,如果tail结点的next结点不为空,则将入队结点设置为tail结点,如果tail结点的next结点为空,则将入队结点设置为tail的next结点,所以tail结点并不总是尾结点。

入队源码分析

public boolean add(E e) {
    return offer(e);
}
 
public boolean offer(E e) {
    // 如果e为null,则直接抛出NullPointerException异常
    checkNotNull(e);
    // 创建入队节点
    final Node<E> newNode = new Node<E>(e);
 
    // 循环CAS直到入队成功
    // 1、根据tail节点定位出尾节点(last node);2、将新节点置为尾节点的下一个节点;3、casTail更新尾节点
    for (Node<E> t = tail, p = t;;) {
        // p用来表示队列的尾节点,初始情况下等于tail节点
        // q是p的next节点
        Node<E> q = p.next;
        // 判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null
        // 如果p是尾节点
        if (q == null) {
            // 设置p节点的下一个节点为新节点,设置成功则casNext返回true;否则返回false,说明有其他线程更新过尾节点
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                // 如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        // 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
        // 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        // 寻找尾节点
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

整个入队就做两件事情:①定位尾结点②使用CAS算法将入队结点设置为尾结点的next结点,不成功则重试

3.1.3出队分析

出队列就是从队列里返回一个节点元素,并清空该节点对元素的引用

 

 

并不是每次出队时都要更新head结点,当head结点有元素时,直接弹出head结点里的元素,而不会更新head结点。只有当head结点里没有元素时,出队操作才会更新head结点。

3.2、LinkedBlockingQueue

LinkedBlockingQueue是一种用链表实现的有界阻塞队列,队列的默认长度为Integer.MAX_VALUE,此队列是按照先进先出的原则对元素进行排序。

我们编写一个Demo,一个线程不断的向队列中添加元素,另外的几个线程不断的从队列中取出元素,是一种生产者--消费者模式。

测试代码

/**
 * @Author: Simon Lang
 * @Date: 2020/5/12 14:50
 */

/**
 * put:队列满时阻塞
 * take:队列为0时阻塞
 */
public class test_linkedBlockingqueue {
    final BlockingQueue<String> queue=new LinkedBlockingQueue<String>();
    final Random r=new Random();
    public static void main(String[] args){
            final test_linkedBlockingqueue t=new test_linkedBlockingqueue();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
                            t.queue.put("value"+t.r.nextInt(1000));
                            TimeUnit.SECONDS.sleep(1);
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                }
            },"producer").start();
            for (int i=0;i<3;i++){
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        while (true){
                            try {
                                System.out.println(Thread.currentThread().getName()+" - "+t.queue.take());
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                },"consumer"+i).start();
            }
    }
}

部分测试结果

 

3.3、ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列,此队列按照FIFO的原则对元素进行排序。根据调用API(add/put/offer)不同,有不同的特性

我们编写一个Demo分别测试这几种方法

测试代码

/**
 * @Author: Simon Lang
 * @Date: 2020/5/12 15:08
 */
public class test_arrayBlockingqueue {
    final BlockingQueue<String> queue=new ArrayBlockingQueue<String>(3);
    public static void main(String[] args){
        final test_arrayBlockingqueue t=new test_arrayBlockingqueue();
        for (int i=0;i<5;i++){
            //addMethod
//            System.out.println("add method: "+ t.queue.add("value"+i));
//            try {
//                t.queue.put("put"+i);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            System.out.println("put method : "+i);
//            System.out.println("offer method : " +t.queue.offer("value"+i));
            try {
                System.out.println("offer method :"+t.queue.offer("value"+i,1, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(t.queue);
    }
}
  • add方法在容量不足的时候,抛出异常

 

  • put方法在容量不足的时候,阻塞等待

 

  • offer方法

单参数offer方法,不阻塞,当容量不足的时候,返回false,放弃超出容量的数据

 

三参数offer方法offer(value,times,timeout),容量不足的时候,阻塞times时长,如果在阻塞时间内,有容量空闲,新增数据返回true;如果阻塞时长范围内无容量空闲,放弃超出容量的数据,返回false。