学习马士兵老师的公开课,整理的笔记

  1. 火车票销售的问题引出并发问题

题目:有N张火车票,每张票都有一个编号,同时有10个窗口对外售票

实现一:使用ArrayList
因为list的remove操作是非原子性的,所以多个线程在同时remove的时候可能会操作同一张票

代码如下:

public class TicketSeller1 {

	private static List<String> tickets = new ArrayList<>();
	
	static{
		for (int i = 0; i < 10000; i++) {
			tickets.add("票编号:" + i);
		}
	}
	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				while(tickets.size() > 0){
					System.out.println("销售了>>>>>" + tickets.remove(0));
				}
			}).start();
		}
	}
}

输出如下,发生了重售:

销售了>>>>>票编号:7835
销售了>>>>>票编号:7792
销售了>>>>>票编号:7565
销售了>>>>>票编号:9999
销售了>>>>>票编号:9542
销售了>>>>>票编号:9515
销售了>>>>>票编号:9242

实现二:使用Vector或者Collections.synchronizedXXX
虽然remove方法是原子性的但是判断与remove的操作合在一起就不是原子性的所以仍然会出现问题
代码如下:

public class TicketSeller2 {

	private static Vector<String> tickets = new Vector<String>();
	
	static {
		for (int i = 0; i < 10000; i++) {
			tickets.add("票编号:" + i);
		}
	}
	
	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				while(tickets.size() > 0){
					try {
						TimeUnit.MILLISECONDS.sleep(10);
					} catch (Exception e) {
						e.printStackTrace();
					}
					System.out.println("销售了>>>" + tickets.remove(0));
				}
			}).start();
		}
	}
}

运行结果如下

销售了>>>票编号:9993
销售了>>>票编号:9992
Exception in thread "Thread-9" java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 0
	at java.util.Vector.remove(Vector.java:831)
	at thread.demo_024.TicketSeller2.lambda$main$0(TicketSeller2.java:37)
	at java.lang.Thread.run(Thread.java:748)
销售了>>>票编号:9995

实现三:使用同步代码块锁住容器-保证正确性但效率低
就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步 就像这个程序,判断size和进行remove必须是一整个的原子操作
代码如下:

public class TicketSeller3 {

	private static List<String> tickets = new LinkedList<String>();
	
	static {
		for (int i = 0; i < 10000; i++) {
			tickets.add("票编号:" + i);
		}
	}
	
	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				while(true){
					synchronized(tickets){
						if(tickets.size() <= 0) break;
						
						try {
							TimeUnit.MILLISECONDS.sleep(10);
						} catch (Exception e) {
							e.printStackTrace();
						}
						
						System.out.println("销售了>>>" + tickets.remove(0));
					}
				}
			}).start();
		}
	}
}

实现四:使用ConcurrentQueue提高并发性
使用JDK1.5之后提供的并发队列ConcurrentLinkedQueue存储元素,其底层使用CAS实现而非加锁实现的,其效率较高。
并发队列ConcurrentLinkedQueuepoll()方法会尝试从队列头中取出一个元素,若获取不到,则返回null,对其返回值做判断可以实现先取票后判断,可以避免加锁。

代码如下:

public class TicketSeller4 {

	private static Queue<String> tickets = new ConcurrentLinkedQueue<String>();
	
	static {
		for (int i = 0; i < 10000; i++) {
			tickets.add("票编号:" + i);
		}
	}
	
	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				while(true){
					String s = tickets.poll();
					if(s == null) break;
					
					try {
						TimeUnit.MILLISECONDS.sleep(10);
					} catch (Exception e) {
						e.printStackTrace();
					}
					System.out.println("销售了>>>>" + s);
				}
			}).start();
		}
	}
}

2. 并发容器

Map/Set 它们比较类似 set没有value只有key而已


(1)非并发容器
HashMap,TreeMap,LinkedHashMap
(2)并发容器
HashTable,Collections.sychronizedXXX,ConcurrentMap

  • 并发量不大的时候使用HashTable,Collections.sychronizedXXX 他们同步实现的原理类似都是直接给整个容器加上锁,所以效率会比较差

Collections.sychronizedXXX 的用法是使用装饰器模式,调用其构造方法并传入一个XXX的实现类,返回一个同步的XXX容器
代码如下:

public class T03_SynchronizedMap {

	public static void main(String[] args) {
		HashMap<String> hashMap = new HashMap<String>();
		Map<String> synchronizedMap = Collections.synchronizedMap(hashMap); //此时的集合是同步的	
	}
}
  • 并发量比较高的时候使用ConcurrentMap,有两个实现类:
    • ConcurrentHashMap: 使用哈希表实现,key是无序的
    • ConcurrentSkipListMap: 使用跳表实现,key是有序的

其同步的实现原理在JDK1.8前后不同

  • 在JDK1.8以前,其实现同步使用的是分段锁,将整个容器分为16段(Segment),每次操作只锁住操作的那一段,是一种细粒度更高的锁.
  • 在JDK1.8及以后,其实现同步用的是Node+CAS.
public class T01_ConcurrentMap {

	public static void main(String[] args) {
		Map<String, String> map = new ConcurrentHashMap<String, String>();
		//Map<String, String> map = new ConcurrentSkipListMap<String, String>(); //高并发并且排序
		
		//Map<String, String> map = new Hashtable<>();
		//Map<String, String> map = new HashMap<String, String>();
		
		Random random = new Random();
		Thread[] threads = new Thread[100];
		CountDownLatch latch = new CountDownLatch(threads.length);
		long start = System.currentTimeMillis();
		for (int i = 0; i < threads.length; i++) {
			threads[i] = new Thread(()->{
				for(int j=0; j<10000;j++) map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
				latch.countDown();
			});
		}
		
		Arrays.asList(threads).forEach(t->t.start());
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		long end = System.currentTimeMillis();
		System.out.println(end-start);
	}
}

Queue

低并发队列

适用于并发量比较低的队列有:VectorSynchronizedList,其中vector类似HashTable,是JDK1.2就提供的类;SynchronizedList类似SynchronizedMap使用装饰器模式,其构造函数接受一个List实现类并返回同步List,在java.util.Collections包下.
它们实现同步的原理都是将所有方法用同步代码块包裹起来.

写时复制CopyOnWriteList
CopyOnWriteArrayList位于java.util.concurrent包下,它实现同步的方式是: 当发生写操作(添加,删除,修改)时,就会复制原有容器然后对新复制出的容器进行写操作,操作完成后将引用指向新的容器.其写效率非常低,读效率非常高

  • 为什么读操作不需要加锁?因为本身CopyOnWriteList就不存在脏读的情况

优点: 读写分离,使得读操作不需要加锁,效率极高
缺点: 写操作效率极低
应用场合: 应用在读少写多的情况,如事件***

高并发队列

方法 抛出异常 返回特殊值 一直阻塞 (非阻塞队列不可用) 阻塞一段时间 (非阻塞队列不可用)
插入元素 add(element) offer(element) put(element) offer(element,time,unit)
移除首个元素 remove() poll() take() poll(time,unit)
返回首个元素 element() peek()

对于高并发队列,若使用不同的方法对空队列执行查询和删除,以及对满队列执行插入,会产生不同行为

  • 抛出异常: 使用add(),remove(),element()方法,若执行错误操作会直接抛出异常
  • 返回特殊值: 若使用offer(),poll(),peek()方法执行错误操作会返回falsenull,并放弃当前错误操作,不抛出异常.
  • 一直阻塞: 若使用put(),take()方法执行错误操作,当前线程会一直阻塞直到条件允许才唤醒线程执行操作.
  • 阻塞一段时间: 若使用offer(),poll()方法并传入时间单位,会将当前方法阻塞一段时间,若阻塞时间结束后仍不满足条件则返回falsenull,并放弃当前错误操作,不抛出异常.

非阻塞队列ConcurrentLinkedQueue
非阻塞队列使用CAS保证操作的原子性,不会因为加锁而阻塞线程.类似于ConcurrentMap

阻塞队列BlockingQueue
阻塞队列的常用实现类有LinkedBlockingQueue,ArrayBlockingQueue,DelayedQueue,TransferQueue,SynchronousQueue. 分别对应于不同的应用场景.

延迟队列DelayedQueue
它是一种无界队列,延迟队列DelayedQueue中存储的元素必须实现Delay接口,其中定义了getDelay()方法;而Delay接口继承自Comparable接口,其中定义了compareTo()方法.各方法作用如下:

  • getDelay(): 规定当前元素的延时,Delay类型的元素必须要等到其延时过期后才能从容器中取出,提前取会取不到.
  • compareTo(): 规定元素在容器中的排列顺序,按照compareTo()的结果升序排列.
public class T08_TransferQueue {

	public static void main(String[] args) throws InterruptedException {
		LinkedTransferQueue<String> strings = new LinkedTransferQueue<String>();
		
		/*new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();*/
		
		//strings.transfer("aaa");
		
		strings.put("aaa");
		
		new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();
	}
}

执行结果:等待时间长的先往外拿走

[1570266175973, 1570266177973, 1570266177473, 1570266176473, 1570266176973]
1570266175973
1570266176973
1570266176473
1570266177473

阻塞消费队列TransferQueue
TransferQueue继承自BlockingQueue,向其中添加元素的方法除了BlockingQueueadd(),offer(),put()之外,还有一个transfer()方法,该方法会使当前线程阻塞直到消费者将该线程消费为止.

  • DelayedQueue的应用场景:服务器端的消息队列

transfer()put()的区别: put()方法会阻塞直到元素成功添加进队列,transfer()方法会阻塞直到元素成功被消费.

TransferQueue特有的方法如下:

  • transfer(E): 阻塞当前线程直到元素E成功被消费者消费

  • tryTransfer(E): 尝试将当前元素送给消费者线程消费,若没有消费者接受则返回false且放弃元素E,不将其放入容器中.

  • tryTransfer(E,long,TimeUnit): 阻塞一段时间等待消费者线程消费,超时则返回false且放弃元素E,不将其放入容器中.

  • hasWaitingConsumer(): 指示是否有阻塞在当前容器上的消费者线程

  • getWaitingConsumerCount(): 返回阻塞在当前容器上的消费者线程的个数

public class T08_TransferQueue {

	public static void main(String[] args) throws InterruptedException {
		LinkedTransferQueue<String> strings = new LinkedTransferQueue<String>();
		
		/*new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();*/
		
		//这种情况就会一直等待消费者的出现而不执行下面的代码
		strings.transfer("aaa");
		
		//strings.put("aaa");
		
		new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();
	}
}

零容量的阻塞消费队列SynchronousQueue

SynchronousQueue是一种特殊的TransferQueue,特殊之处在于其容量为0. 因此对其调用add(),offer()方法都会使程序发生错误(抛出异常或阻塞线程).只能对其调用put()方法,其内部调用transfer()方法,将元素直接交给消费者而不存储在容器中。

public class T09_Synchronized {

	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<String> strings = new SynchronousQueue<String>();
		
		new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();
		
		strings.put("aaa"); //阻塞等待消费者消费
		//strings.add("aaa");
		System.out.println(strings.size());
	}
}