并发容器
- 为什么需要并发容器
java提供了很多丰富的容器,主要可以分为四大类别List、Map、Set、Queue,但是这些容器的实现类有各种各样的缺点,如我们常见的ArrayList、HashMap是线程不安全的,如Hashtable、Vector是线程安全,但是效率很差,这些线程安全的容器是通过synchronized关键字来实现的,这样做就削弱了程序的并发性,所以并发容器就诞生了。
- 并发容器的定义
并发容器是专门针对多线程并发设计的,使用了锁分段技术,只对操作位置进行同步操作,但是其它其它线程可以对没有操作的位置继续进行访问。
1、ConcurrentHashMap
1.1数据结构
ConcurrentHashMap是java.util.concurrent(JUC)包下的成员,它支持线程安全、支持多线程操作时并发读写操作,在jdk1.7中,它采用的是Segment分段锁,但是在1.8中,使用的是CAS+synchronized,底层使用的是数组+链表+红黑树
- jdk1.7
jdk1.7中的ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成,Segment是一种可重入锁(ReentrantLock),每个Segment都包含一个HashEntry[]数组,每个HashEntry是一个链表结构,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。ConcurrentHashMap使用锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据分配一把锁,当一个线程占用锁访问其中一个段数据的时候,其它段的数据也能被其它线程访问。
- jdk1.8
jdk1.8抛弃了Segment分段锁,采用的是CAS+synchronized,它的改进是为了解决查询链表效率太低的问题,将1.7中的HashEntry改为node,取消的ReentrantLock改为了synchronized,采用的数据结构为数组+链表+红黑树,保证了查询效率。
1.2代码测试
为了验证ConcurrentHashMap效率如何,我们编写一个Demo来测试下,将它与Hashtable比较,Hashtable容器是使用synchronized来保证线程安全的。
对比代码测试
/**
* @Author: Simon Lang
* @Date: 2020/5/12 10:40
*/
public class test_concurrentHashMap {
public static void main(String[] args){
//使用HashTable测试
final Map<String,String> map=new Hashtable<String, String>();
//使用concurrentHashMap测试
// final Map<String,String> map=new ConcurrentHashMap<String, String>();
Thread[] array=new Thread[1000];
final Random r=new Random();
//定义一个门栓
final CountDownLatch latch=new CountDownLatch(array.length);
long begin=System.currentTimeMillis();
for(int i=0;i<array.length;i++){
array[i]=new Thread(new Runnable() {
@Override
public void run() {
for(int j=0;j<10000;j++){
map.put("key"+r.nextInt(1000000),"value"+r.nextInt(1000000));
}
latch.countDown();
}
});
}
for (Thread t:array) {
t.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end=System.currentTimeMillis();
System.out.println("执行时间为:"+(end-begin));
}
}
测试结果
类型消耗时间Hashtable8947msConcurrentHashMap7265ms
在多线程并发情况下,ConcurrentHashMap比HashTable效率高。
2、CopyOnWriteArrayList
CopyOnWriteArrayList是juc包下提供的一个并发安全的ArrayList,它的主要特点是写操作时需要复制集合,会创建一个新的底层数组,读操作时只需要获得数组下标即可,它的工作方式采用的是用空间换时间,适用于读多写少的场景中。
2.1源码分析
它的底层是数据结构是数组
private transient volatile Object[] array;
它仅在写/修改/删除操作时存在线程安全,所以在这些操作中加入了ReentrantLock来保证线程安全,以写操作为例
public boolean add(E e) {
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//获得当前集合中保存数据的数组
Object[] elements = getArray();
//获取该数组的长度
int len = elements.length;
//拷贝当前的数组,并让其长度+1
Object[] newElements = Arrays.copyOf(elements, len + 1);
//将加入的元素放在新数组的最后一位
newElements[len] = e;
//将数组的引用指向新数组
setArray(newElements);
return true;
} finally {
//释放锁
lock.unlock();
}
}
图解
添加元素时先将原数组复制,然后再将添加的元素放置到新数组的末尾,然后将数组的引用指向新数组。
2.2、对比代码测试
Vector实现线程安全的方法采用的是synchronized关键字,即每个方法执行的时候都要去获取锁,性能会下降,而CopyOnWriteArrayList仅在写/修改/删除操作上加锁,读操作不加锁,我们编写一个Demo来测试下
测试代码
/**
* @Author: Simon Lang
* @Date: 2020/5/12 11:38
*/
public class test_copyonWriterlist {
public static void main(String[] args){
//测试Vectot
// final List list=new Vector();
//测试CopyOnWriterArrayList
final List<String> list=new CopyOnWriteArrayList<String>();
Thread[] array=new Thread[100];
final Random r=new Random();
//定义一个门栓
final CountDownLatch latch=new CountDownLatch(array.length);
long begin=System.currentTimeMillis();
for (int i=0;i<array.length;i++){
array[i]=new Thread(new Runnable() {
@Override
public void run() {
for (int j=0;j<1000;j++){
list.add("value"+r.nextInt(100000));
}
latch.countDown();
}
});
}
for (Thread t:array){
t.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end=System.currentTimeMillis();
System.out.println("执行时间:"+(end-begin));
}
}
测试结果
类型消耗时间Vector122CopyOnWriteArrayList5771
测试结果表明,在写操作时,CopyOnWriterArrayList消耗的时间明显高于Vector,所以CopyOnWriterArrayList适用于读操作的场景中。
3、Queue
3.1、ConcurrentLinkedQueue
在并发编程中,有两种方法可以实现线程安全的队列,一种使用的是阻塞算法,该算法使用同一个锁(入队出队使用同一把锁)或中两把锁(入队和出队使用不同的锁)实现,另一种是使用非阻塞算法,该算法是使用循环CAS的方法来实现,我们这里主要使用非阻塞算法的方法来实现ConcurrentLinkedQueue。
3.1.1数据结构
ConcurrentLinkedQueue有head结点和tail结点组成,每个结点有由节点元素(item)和下一个结点的引用(next)组成,结点之间就是通过这个next关联起来,组成一个链表结构的队列
volatile E item;
volatile Node<E> next;
结构图
3.1.2入队分析
入队就是将入队结点添加的队列的尾部,我们以分别添加4个元素为例来讲解
入队主要分两个步骤:①将入队结点设置为当前队尾结点的下一个结点②更新tail结点,如果tail结点的next结点不为空,则将入队结点设置为tail结点,如果tail结点的next结点为空,则将入队结点设置为tail的next结点,所以tail结点并不总是尾结点。
入队源码分析
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
// 如果e为null,则直接抛出NullPointerException异常
checkNotNull(e);
// 创建入队节点
final Node<E> newNode = new Node<E>(e);
// 循环CAS直到入队成功
// 1、根据tail节点定位出尾节点(last node);2、将新节点置为尾节点的下一个节点;3、casTail更新尾节点
for (Node<E> t = tail, p = t;;) {
// p用来表示队列的尾节点,初始情况下等于tail节点
// q是p的next节点
Node<E> q = p.next;
// 判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null
// 如果p是尾节点
if (q == null) {
// 设置p节点的下一个节点为新节点,设置成功则casNext返回true;否则返回false,说明有其他线程更新过尾节点
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
// 如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head
// 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
// 寻找尾节点
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
整个入队就做两件事情:①定位尾结点②使用CAS算法将入队结点设置为尾结点的next结点,不成功则重试
3.1.3出队分析
出队列就是从队列里返回一个节点元素,并清空该节点对元素的引用
并不是每次出队时都要更新head结点,当head结点有元素时,直接弹出head结点里的元素,而不会更新head结点。只有当head结点里没有元素时,出队操作才会更新head结点。
3.2、LinkedBlockingQueue
LinkedBlockingQueue是一种用链表实现的有界阻塞队列,队列的默认长度为Integer.MAX_VALUE,此队列是按照先进先出的原则对元素进行排序。
我们编写一个Demo,一个线程不断的向队列中添加元素,另外的几个线程不断的从队列中取出元素,是一种生产者--消费者模式。
测试代码
/**
* @Author: Simon Lang
* @Date: 2020/5/12 14:50
*/
/**
* put:队列满时阻塞
* take:队列为0时阻塞
*/
public class test_linkedBlockingqueue {
final BlockingQueue<String> queue=new LinkedBlockingQueue<String>();
final Random r=new Random();
public static void main(String[] args){
final test_linkedBlockingqueue t=new test_linkedBlockingqueue();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
t.queue.put("value"+t.r.nextInt(1000));
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
},"producer").start();
for (int i=0;i<3;i++){
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
System.out.println(Thread.currentThread().getName()+" - "+t.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
},"consumer"+i).start();
}
}
}
部分测试结果
3.3、ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的有界阻塞队列,此队列按照FIFO的原则对元素进行排序。根据调用API(add/put/offer)不同,有不同的特性
我们编写一个Demo分别测试这几种方法
测试代码
/**
* @Author: Simon Lang
* @Date: 2020/5/12 15:08
*/
public class test_arrayBlockingqueue {
final BlockingQueue<String> queue=new ArrayBlockingQueue<String>(3);
public static void main(String[] args){
final test_arrayBlockingqueue t=new test_arrayBlockingqueue();
for (int i=0;i<5;i++){
//addMethod
// System.out.println("add method: "+ t.queue.add("value"+i));
// try {
// t.queue.put("put"+i);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("put method : "+i);
// System.out.println("offer method : " +t.queue.offer("value"+i));
try {
System.out.println("offer method :"+t.queue.offer("value"+i,1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(t.queue);
}
}
- add方法在容量不足的时候,抛出异常
- put方法在容量不足的时候,阻塞等待
- offer方法
单参数offer方法,不阻塞,当容量不足的时候,返回false,放弃超出容量的数据
三参数offer方法offer(value,times,timeout),容量不足的时候,阻塞times时长,如果在阻塞时间内,有容量空闲,新增数据返回true;如果阻塞时长范围内无容量空闲,放弃超出容量的数据,返回false。