0 参考
- 《并发编程理论与实践》 张杨
- 《java并发编程艺术》
- 《java并发编程之美》
推荐依次阅读。
第一本偏向教你怎么实现并发编程,有很多实例,还解释实例代码的意思。
第二,三本注重java并发的原理,底层原理,需要了解一点jvm和操作系统。
1 引言
阅读书籍时,总是坚持不下去,经常看不到细节,时间稍微久一点就忘记了,决定记录一下,推荐leetcode多线程编程题,持续更新ing
1.1线程基础概念
1.1.1 线程与进程:
对于并发来说,进程是线程的容器,线程共享进程资源,又有局部内存(程序计数器,寄存器,栈),于是产生了并发安全问题。
对于操作系统,进程是资源的分配单位,线程是cpu调度的最小单位。进程代表一个程序的执行,上下文切换相比线程更缓慢。
1.1.2 进程/线程状态模型
对于操作系统来说,进程有三态,五态,七态。
- 三态模型:就绪,执行,阻塞
- 五态模型:三态,创建态,终止态
- 七态模型:五态,挂起操作,就绪和阻塞态可以进行挂起操作
对于java来说,线程有6种状态:
NEW(新建)、RUNNABLE(运行)、BLOCKED(锁池)、TIMED_WAITING(定时等待)、WAITING(等待)、TERMINATED(终止、结束)
1.2 java内存模型
值得注意的是,内存模型与JVM的内存区域没有任何关系。
主内存通过计算机内存模型与处理器交互。
2 并发编程基础
基于《并发编程理论与实践》
JDK1.5以前,synchronized
JDK1.5 并发工具包concurrent,可重入锁,读写锁
JDK1.6 Collection并行框架
JDK1.7 Fork/Join框架
JDK1.8 Lambda,邮戳锁,并发计数器
本节简要并发编程基本理论
2.1 线程创建方式
- 继承Thread类,重写run方法
- 实现Runnable接口,重写run方法
- 实现Callable接口,重写call方法,Future接口接收线程返回值,通常使用FutureTask类。
由于Thread实现了Runnable接口,Runnable接口是一个函数式接口,可以通过Lambda表达式新建Thread,从而实现与资源类解耦合。推荐第二种。Thread thread1 = new Thread(() -> System.out.println("hello")); Thread thread2 = new Thread(new Runnable() { @Override public void run(){ System.out.println("hello"); } }); thread1.start(); thread2.start();
2.2 线程属性
- state 线程的状态 getState()
- name 线程名称 getName()
- id 线程id getId() 长整型
- priority 优先级 setPriority() getPriority()
- daemon 守护线程 setDaemon(true)
2.3 线程管理
- join()方法,抛出InterruptedException异常
阻塞当前进程,等待线程运行完毕再继续执行,3种重载方法。
main(){ thread.start();//开启子线程thread thread.join();//阻塞main线程(当前线程),等待thread线程执行完毕后再继续执行。 }
public final void join() throws InterruptedException public final synchronized void join(long millis) throws InterruptedException // millis等待毫秒数 public final synchronized void join(long millis,int nanos) throws InterruptedException //nanos等待纳秒数
- sleep() 静态方法,抛出InterruptedException异常
让线程暂停运行一段固定时间,让出cpu,不释放锁。
2种重载方法
public static native void sleep(long millis) throws InterruptedException public static native void sleep(long millis,int nanos) throws InterruptedException
- yield() 方法
让出cpu,让当前线程转到就绪态,但不释放锁。
public static native void yield();
interrupt()中断方法
中断当前线程;
早期Thread类还有stop()方法,suspend()挂起,resume()解除挂起,destory(),不推荐使用。线程间通信方法 wait() notify() notifyAll() await() signal() signalAll()
- wait() notify() notifyAll()是object类的方法,对应一个线程类
- await() signal() signalAll()是concurrent包中Condition接口中的方法,对应锁的Condition状态
- wait()和await() 当前线程进入等待阻塞态,释放锁;
- notify()和sinal() 唤醒一个等待的线程,该线程重新获取锁;
- notifyAll()和signalAll() 唤醒所有等待线程,被唤醒的线程竞争获取锁。
2.4 线程安全同步控制
同步控制措施可以包括:
- 将线程间共享变量变为线程私有的变量,不在线程间共享;
- 将状态变量修改位不可变的变量
- 使用同步控制机制
2.4.1 同步锁 synchronized
同一时间只能有一个线程访问synchronized控制的范围,进入到该范围的线程称为持有该锁的线程。
修饰方法,synchronized作用于该方法
修饰代码块,synchronized作用于该代码块
使用synchronized 关注点在于锁的持有者
synchronized(Test.class){ //锁住静态类对象 ... }
2.4.2 可重入锁ReentrantLock
可重入锁一种无阻塞的同步机制,是互斥锁,和同步锁具有基本相同的语义和行为,增加许多功能。
可重入性意味一个线程可以再次或者多次获得该锁。
public class ReentrantLock implements Lock, java.io.Serializable{} //定义 ReentrantLock() //创建一个公平可重入锁 ReentrantLock(boolean fair) //创建一个带有公平策略的可重入锁,false非公平锁
2.4.3 读写锁ReentrantReadWriteLock
读锁是共享锁(share,S锁),写锁是排他锁(X锁),读写锁和可重入锁类似。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {} //定义
读写锁具有以下属性:
1.锁的优先级,通过fair参数。
2.可重入性
3.锁降级,允许将写锁降级位读锁。
4.锁中断,在获取锁时,如果锁被其他线程持有,可以被中断请求
5.支持条件操作,写锁支持条件操作
6.监视系统状态,提供了相应方法监视系统状态,锁是否被持有,是否被竞争。
2.4.4 邮戳锁StampedLock
public class StampedLock implements java.io.Serializable{}
邮戳锁的状态由版本号和模式组成,版本号是long stamp,线程在获取邮戳锁时返回版本号,释放邮戳锁时,版本号作为参数。
邮戳锁有三种模式:写,读,乐观读。
三种模式可以互相转换。
2.4.5 volatile 变量
java线程保存了共享域变量的私有拷贝,volatile关键字使得JVM直接与主内存交互,不再与线程私有内存交互。
volatile禁止指令重排,保证了有序性,直接与主内存交互,保证了可见性,但volatile不保证原子性,CAS算法是原子的,CAS算法全称Compare And Swap,通过比较再交换,从而保证了并发安全,但产生ABA问题,加上版本戳来解决ABA问题。
与synchronized相比,volatile是轻量级的。
2.4.6 原子类 Atomic包
原子基本类型
- AtomicInteger
- AtomicBoolean
- AtomicLong
构造方法(以AtomicInterger为例)public AtomicInteger() {}//初始值 0 public AtomicInteger(int initialValue) {}//初始值
常用方法incrementAndGet() //相当于return ++a 但是原子操作; decrementAndGet() //相当于return --a 但是原子操作; boolean compareAndSet(int expect, int update) //CAS算法:当前值为expect,则原子操作当前值为update getAndDecrement() //return a--; 先返回再减一
原子引用类型
AtomicReference
构造方法
public class AtomicReference<V> implements java.io.Serializable {} // 定义 public AtomicReference(V initialValue) {value = initialValue;} // 有参构造 public AtomicReference() {} // 无参构造
常用方法
public final V get() {return value;} boolean compareAndSet(V expect,V update)// 如果当前值等于预期值expect,则以原子方式将该值设置为给定更新值update V getAndSet(V newValue) //原子操作设置新值并返回旧值 void lazySet(V newValue) //最终设置为给定值 void set(V newValue) //设置为给定值 boolean weakCompareAndSet(V expect,V update) //jdk1.8以前,与compareAndSet一样。 //1.9加了possible语义,可能不是原子的,取决于虚拟机的实现
原子数组类
AtomicReferenceArray
public class AtomicReferenceArray<E> implements java.io.Serializable {}
原子扩展类(解决ABA问题)
- AtomicMarkableReference
该类封装了一个对象的引用reference和布尔型值mark,这两个值可以原子更新,用以设计无锁数据结构public AtomicMarkableReference(V initialRef, boolean initialMark){} //构造方法
- AtomicStampedReference
该类封装了一个对象的引用和整型值stamp,两个值可以原子更新,解决ABA问题
构造方法与常用方法public AtomicStampedReference(V initialRef, int initialStamp){} //构造方法 V getReference() //获得引用对象类型 int getStamp() //获得stamp V get(int[] stampHolder) //返回reference和版本戳stamp boolean weakCompareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) //CAS原子操作reference和版本戳 void set(V newReference, int newStamp) //无条件设置reference和版本戳 boolean attemptStamp(V expectedReference, int newStamp) //CAS更新版本戳
2.5 线程同步障栅
2.5.1 信号量Semaphore
信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。在进入一>关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。为了完成这个过程,需要创建一个信号量VI,然后将Acquire Semaphore VI以及Release Semaphore VI分别放置在每个关键代码段的首末端。确认这些信号量VI引用的是初始创建的信号量。
定义与构造方法:
public class Semaphore implements java.io.Serializable {} //定义 public Semaphore(int permits) {} //permits 许可数 public Semaphore(int permits, boolean fair){} //fair 公平参数
常用方法:
void acquire() //从当前信号量获取一个许可,没有许可可用,则阻塞,抛出InterruptedException异常 void acquireUninterruptibly() //从当前信号量获取一个许可,在有用许可之前阻塞,不抛异常 void acquire(int permits) //获取多个许可 void release() // 释放一个许可 int availablePermits() //获得可用的许可数 int drainPermits() //获取并返回立即可用的所有许可数 boolean tryAcquire() //尝试获取一个许可 int getQueueLength() //返回等待获取许可的线程队列长度 boolean hasQueuedThreads() //查询线程等待队列中是否有线程等待获得许可
2.5.2 障栅CyclicBarrier
CyclicBarrier类是同步辅助类,实现了一个障栅集合点,在所有线程到达障栅前,可以等待,直到所有线程到达继续运行。
日常例子:所有学生到操场集合,然后开始自已活动,下课前分钟后集合返***室,每个学生就是一个线程。集合就是障栅。
构造方法
CyclicBarrier(int parties) //创建一个障栅对象,parties为线程数 CyclicBarrier(int parties, Runnable barrierAction) //barrierAction定义最后一个进入障栅的线程要执行的动作
方法
int await() //在此障栅上的线程调用该方法后等待 int await(long timeout, TimeUnit unit) //所有调用该方法的线程等待一段时间 int getNumberWaiting() //获得当前障栅处等待的线程数目 int getParties() //获得要求启动此障栅的线程数目 boolean isBroken() //查询障栅是否处于损坏状态 void reset() //重置障栅到初始状态
2.5.3 倒计时门闩CountDownLatch
倒计时门闩只有在门前等待的线程达到一定数量,门才会打开,线程才能够继续运行。
与障栅不同的是:
- 不是所有线程都需要等待门闩打开
- 门闩可以由外部事件打开
- 门闩是一次性的,一旦计数器为0,就不能再重用。
构造函数和方法
CountDownLatch(int count) //构造一个倒计时门闩 void await() //使当前线程等待直到门闩计数器为0 boolean await(Long timeout,TimeUnit unit) //在指定时间范围内await() void countDown() //门闩计数器减一,当值为0释放所有线程 long getCont() //返回当前计数
2.5.4 同步队列SynchronousQueue
同步队列是一个没有数据缓冲的阻塞队列,在同步队列上的插入操作必须等待相应的删除执行完成后才能执行,反之亦然。
定义:
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {}
构造方法
public SynchronousQueue() public SynchronousQueue(boolean fair)
方法,同步队列类没有peek方法
public void put(E e) throws InterruptedException //添加元素到队列中,如有必要等待一个线程接收它 public boolean offer(E e) //添加元素到队列中,如有必要等待一个线程接收它 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException public E take() throws InterruptedException //获取并移除队列头,如有必要则等待另一个线程插入它 public E poll() // 获取并移除队列头 public E poll(long timeout, TimeUnit unit) throws InterruptedException public int drainTo(Collection<? super E> c) //移除队列中所有元素,添加到集合中 public int drainTo(Collection<? super E> c, int maxElements)
2.5.5 交换器Exchanger
JDK1.5以后开始提供线程间交换数据的功能,即Exchanger类
public class Exchanger<V> {} //定义 public Exchanger() //无参构造 public V exchange(V x) throws InterruptedException //该方法等待另一个线程到达交换点,交换指定的数据,返回交换后数据。 public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException //有时间限制的等待交换数据。
2.5.6 阶段化处理Phaser
Phase是一个可复用的同步障栅类,和CycliBarrier,CountDownLatch类似。
Phase使用并发任务数进行初始化,任务在Phase类上可以随时注册,并发任务数可以动态增加或者减少。
构造方法
Phaser() //无参构造 Phaser(int parties) //指定线程数的Phaser类 Phaser(Phaser parent) //为当前Phaser类指定一个父Phaser,从而构成树状并发阶段。 Phaser(phaser parent,int parties)
使用Phaser作为同步障栅的任务首先要注册到类Phaser对象。
一个Phaser对象有两种状态:
- 活跃态(Active):当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。
- 终止态(Termination):当所有参与同步的线程都取消注册的时候,Phaser就处于终止态,在终止状态下,Phaser没有任何参与者。当Phaser对象onAdvance()方法返回True时,Phaser对象就处于终止态。当Phaser处于终止态时,同步方法arriveAndAwaitAdvance()会立即返回,而且不会做任何同步操作。
方法
int register() //注册 int bulkRegister(int parties) //增加给定数量的未到达的线程到当前Phaser int arrive() // 到达当前Phaser,不用等待其他线程到达 int arriveAndDeregister() //到达并且从当前Phaser取消注册 int arriveAndAwaitAdvance() //到达当前Phaser,等待其他线程到达 int awaitAdvance(int phase) //在当前Phase的阶段等待进入下一阶段,如果当前阶段号和参数phase不相当或者以及终止,立即返回 int awaitAdvanceInterruptibly(int phase) int awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit) throws InterruptedException,TimeoutException void forceTermination() //强制当前Phase进入终止状态 int getPhase() int getRegisteredParties() int getArrivedParties() getUnarrivedParties() Phaser getParent() Phaser getRoot() boolean isTerminated() boolean onAdvance(int phase, int registeredParties) //重写该方法,实现阶段转换时的动作 void releaseWaiters(int phase) int abortWait(int phase) String toString() //当前Phaser字符串标识和状态信息
2.6 线程执行器与线程池
JDK1.5引入了Executor框架,包括Executor接口和ExecutorService接口,实现这两个接口的ThreadPoolExecutor类
2.6.1 Executor接口和ExecutorService接口
Executor接口提供一个execute方法,用以异步执行Runnable对象
ExecutorService继承Executor接口,提供了线程池的shutdown()方法以及submit(),invokeAll()方法
void execute(Runnable command);
2.6.2 ThreadPoolExecutor线程池
这是线程池最常用地创建方式。
定义
public class ThreadPoolExecutor extends AbstractExecutorService{}
构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
四种构造方法,总共包括7个构造参数,前5个参数的相同的,也是必须。
int corePoolSize //线程池中的线程数量 int maximumPoolSize //线程池中的最大线程数量 long keepAliveTime //当线程数超过了处理核数时的多余的空闲线程等待新任务的最长等待时间,非核心线程空闲存活时长 TimeUnit unit //参数keepAliveTime的shi'j workQueue //任务队列,调用execute()方法提交的Runnable任务队列 ThreadFactory threadFactory //创建新的线程所使用的工厂 RejectedExecutionHandler handler //超出线程队列容量时的拒绝策略
任务队列
- SynchornousQueue:这里表示接到新任务,直接交给线程处理,如果其他的线程都在工作,那就创建一个新的线程来处理这个任务。
- LinkedBlockingQueue:这里表示接到新任务,如果当前线程数小于核心线程数,则新建核心线程处理任务;如果当前线程数等于核心线程数,则进入队列等待。
- ArrayBlockingQueue:这里表示接到新任务,如果没有达到核心线程数,则新建核心线程执行任务,如果达到了,则入队等候,如果队列已满,则新建非核心线程执行任务,又如果总线程数到了 maximumPoolSize,并且队列也满了,则发生错误。
- DelayQueue:这里表示接到新任务,先入队,达到了指定的延时时间,才执行任务。
拒绝策略
- AbortPolicy:默认策略,在拒绝任务时,会抛出RejectedExecutionException。
- CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。
- DiscardOldestPolicy:该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
- DiscardPolicy:该策略默默的丢弃无法处理的任务,不予任何处理
线程池创建线程过程
- 当线程数小于核心线程数时,创建线程。
- 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
- 当线程数大于等于核心线程数,且任务队列已满
- 若线程数小于最大线程数,创建线程
- 若线程数等于最大线程数,抛出异常,拒绝任务
2.6.3 工厂类Executors
《阿里巴巴java开发手册》中不允许使用Executors创建线程池,这里只给出工厂方法
newFixedThreadPool() //固定大小线程池 newSingleThreadExecutor() //只有一个线程的线程池 newCachedTheadPool() //根据需要创建线程的线程池 newSingleThreadScheduledExecutor() //创建只有一个线程的线程池,可以指定周期并周期地执行 newScheduledThreadPool() //创建一个线程池,可以指定周期并周期地执行
FixedThreadPool和SingleThreadPool: 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
CachedThreadPool: 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
2.7 Fork/Join框架
Fork/Join框架实现了任务定义域任务处理的分离,用于解决递归和分治算法问题
Fork/Join通过一个阈值判断是否需要分解子问题,当问题规模超过阈值,则分解成多个子问题。
Fork/Join不能执行IO操作
工作窃取算法
工作窃取算法使得线程始终处于忙碌状态,是Fork/Join框架核心。工作窃取算法是指某个线程从其他队列里窃取任务来执行。利用双端队列Deque作为任务容器,当线程完成了自己全部任务,会从其他线程的另一端获取任务。
工作线程从自己的任务队列获取任务时使用pop()方法,而从其他线程获取任务使用take()方法
2.7.1 ForkJoinPool类
定义
public class ForkJoinPool extends AbstractExecutorService {}
``` java ForkJoinPool() ForkJoinPool(int parallelism) ForkJoinPool(int parallelism, //线程数 ForkJoinWorkerThreadFactory factory, //线程工厂 UncaughtExceptionHandler handler, // 异常处理 boolean asyncMode) //工作模式,true表示FIFO
方法
invoke(ForkJoinTask<T> task) //同步调用方法,用于处理给定任务,等待子任务完成,并返回结果 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) // 同步调用方法,等待子任务全部完成返回结果 void execute(ForkJoinTask<?> task) //异步调用,执行给定任务 void execute(Runnable task) //把Runnabel线程代表的任务交给ForkJoinPool,不会对Runnable对象使用工作窃取算法 <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) //把一个任务提交给ForkJoinPool处理,返回结果
2.7.2任务创建
通过继承RecursiveAction或者RecursiveTask,重写compute()方法,创建任务
2.8 ThreadLocal
ThreadLocal为每一个线程保留一个副本实现线程间数据隔离
构造和方法
ThreadLocal() T setInitialValue() void set(T value) //设置值 T get() //获取值 void remove() //移除值
3 线程安全容器与流式编程
3.1 同步容器
同步容器通过同步锁sychronized关键字实现:
- Vector、Stack、HashTable,jdk1.0就存在
- Collections对不安全容器加上了同步锁:
3.2 并发容器
并发容器比同步容器效率更高,有着各自的实现
- ConcurrentHashMap 分段锁
- CopyOnWriteArrayList 写时复制
- CopyOnWriteArraySet 写时复制
- Queue
- Sorted容器
java Queue有多种,阻塞队列都实现了BlockingQueue接口,代表了线程安全队列Queue queue1 = new ConcurrentLinkedQueue(); //无界非阻塞,基于链表,Concurrent说明线程安全 Queue queue2 = new ArrayBlockingQueue(10); //有界阻塞,基于数组,必须指定队列容量 Queue queue3 = new LinkedBlockingQueue(); //有界阻塞,基于链表 Queue queue4 = new PriorityBlockingQueue(); //无界阻塞,优先级队列 Queue queue5 = new DelayQueue(); //无界阻塞,延时获取元素 Queue queue6 = new SynchronousQueue(); //不存储元素的阻塞队列,每一个put操作阻塞等待一个take Queue queue7 = new LinkedTransferQueue(); //无界阻塞,基于链表, //双向队列 Queue queue9 = new ArrayDeque(); // 基于数组,非阻塞,线程不安全的 Queue queue10 = new ConcurrentLinkedDeque();//无界非阻塞 基于链表 Queue queue11 = new LinkedBlockingDeque(); //无界阻塞,基于链表
Sorted安全容器有两种ConcurrentSkipListMap<Integer, Integer> concurrentSkipListMap = new ConcurrentSkipListMap<>(); ConcurrentSkipListSet<Integer> concurrentSkipListSet = new ConcurrentSkipListSet<>();
concurrentSkipListMap,concurrentSkipListSet基于跳表实现排序,分别可以是TreeMap和TreeSet的线程安全版本,TreeMap和TreeSet是基于红黑树实现排序。
3.3 Lambda流式并发
lambda表达式
java8的新特性,通过并行流可以实现并发
- 匿名:Lambda表达式没有显式的方法名
- 功能:lambda表示式是等价匿名函数
- 优越性: lambda表达式既可以当作变量存储,也可以当作方法参数传递。
- 使用:lambda表达式一般配合函数式接口使用,比如Runnable接口使用
写法:(参数列表) -> {lanmda表达体;} // 箭头 -> 这是常规写法 类::方法名 // 双冒号 :: 方法引用,这是lambda表达式引用一个已经存在的方法
流的操作
- 中继操作:map,filter,distinct,sorted,skip,peek
- 终止操作:forEach,toArray,reduce,min,max,allMatch
一个流可以有多个中继操作,进行数据的映射过滤,但只能有一个终止操作。
通过Collection.stream获得串行流,Collection.paralleStream获得并行流//从控制台读取 1 2 3 4 5 6,将其转换成数组 String[] s=scanner.nextLine().split(" "); int[] ints= Arrays.stream(s).mapToInt(Interger::parseInt).toArray(); //串行流 int[] ints = Arrays.stream(s).parallel().mapToInt(Integer::parseInt).toArray();//并行流
并发编程原理-源码阅读