1、简述 Java 内存模型(JMM)
JMM 是不存在的东西,是一种概念和约定!
JVM 中存在一个主内存(Main Memory 或 Java Heap Memory),Java 中所有变量都是存在主存中的,对于所有线程进行共享,而每个线程又存在自己的工作内存(本地内存/Working Memory),工作内容中保存的是主存中某些变量的拷贝,线程对所有变量的操作并非发生在主存区,而是发生在工作内存中,而线程之间是不能直接相互访问,变量在程序中的传递,是依赖主存来完成的。
- 本地内存是一个抽象概念,并不真实存在,它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。
- 主内存就是硬件的内存,而为了获取更好的运行速度,虚拟机及硬件系统可能会让工作内存优先存储于寄存器和高速缓存中。
引申:Java 线程之间是如何通信的?(联想到操作系统中进程间通信方式)
不同线程之间无法直接访问对方工作内存中的变量,线程间的通信一般有两种方式进行,一是通过消息传递,二是共享内存。Java 线程间的通信采用的是共享内存方式。
线程间通信必须要经过主内存:
1)线程 A 把本地内存 A 中更新过的共享变量刷新到主内存中去。
2)线程 B 到主内存中去读取线程 A 之前已更新过的共享变量。
主内存与本地内存之间的交互(8种操作)
- lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
- unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
- read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的 load 动作使用。
- load(载入):作用于工作内存的变量,把 read 操作从主内存中得到的变量值放入工作内存的变量副本中。
- use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
- assign(赋值):作用于工作内存的变量,把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时会执行这个操作。
- store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的 write 的操作。
- write(写入):作用于主内存的变量,它把 store 操作从工作内存中一个变量的值传送到主内存的变量中。
2、Java 内存模型中的可见性、原子性和有序性(并发编程的三个特性)
- 可见性(重点 volatile): 是指线程之间的可见性,一个线程修改的状态对另一个线程是可见的。Java 内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值,这种依赖主内存作为传递媒介的方法来实现可见性的。主要使用三种方法来保证可见性:volatile、synchronized、Lock锁。
- 原子性(重点 各种锁): 是指一个操作或者多个操作要么全部执行,且执行的过程不会被任何因素打断,要么就都不执行。synchronized 块之间的操作就具备原子性。volatile 可以保证变量的可见性,但是不能保证符合操作的原子性。
- 有序性(重点 指令重排): 即程序执行的顺序按照代码的先后顺序执行。Java 内存模型中的程序天然有序性可以总结为一句话:如果在本线程内观察,所有操作都是有序的;如果在一个线程中观察另一个线程,所有操作都是无序的。 前半句是指 “线程内表现为串行语义”,后半句是指 “指令重排” 现象和 “工作内存和主内存同步延迟” 现象。主要使用三种方法来保证可见性:volatile、synchronized、Lock锁。
有序性的语意有几层:
1)最常见的就是保证多线程运行的串行顺序;
2)防止重排序引起的问题;
3)程序运行的先后顺序。比方 JMM 定义的一些 Happens-before 规则。
3、happen-before 原则是什么?
happen-before 表达的意思是:前一个操作对后一个操作是可见的。
定义了哪些指令不能重排。
八大原则:
- 单线程 happen-before 原则:在同一个线程中,书写在前面的操作 happen-before 后面的操作。
- 锁的 happen-before 原则:同一个锁的 unlock 操作 happen-before 此锁的 lock 操作。
- volatile 的 happen-before 原则:对一个 volatile 变量的写操作 happen-before 对此变量的任意操作。
- happen-before 的传递性原则:如果 A 操作 happen-before B 操作,B 操作 happen-before C 操作,那么 A 操作 happen-before C 操作。
- 线程启动的 happen-before 原则:同一个线程的 start 方法 happen-before 此线程的其他方法。
- 线程中断的 happen-before 原则:对线程 interrupt 方法的调用 happen-before 被中断线程的检测到中断发送的代码。
- 线程终结的 happen-before 原则:线程中的所有操作都 happen-before 线程的终止检测。
- 对象创建的 happen-before 原则:一个对象的初始化完成先于他的 finalize 方法调用。
哪些会进行重排序?
在执行程序时为了提高性能,编译器和处理器常常会对指令做重排序。重排序分为三种类型:
- 编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
- 指令级并行的重排序。现代处理器采用了指令级并行技术(ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
- 内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序中执行。
4、线程的状态有哪些?
(1)新建状态(NEW):线程创建之后。
(2)可运行状态(RUNNING):可能正在运行,也可能正在等待 CPU 时间片。
(3)阻塞(BLOCKED):等待获取一个排他锁,如果其他线程释放了锁就会结束此状态。
(4)无限期等待(WAITING):等待其他线程显式地唤醒,否则不会被分配 CPU 时间片。
(5)限期等待(TIME_WAITING):无需等待其他线程显式地唤醒,在一定时间之后会被系统自动唤醒
(6)终止(TERMINATED):可以是线程结束任务之后自己结束,或者产生了异常而结束。
- 线程创建之后它将处于 NEW(新建) 状态。
- 调用 start() 方法后开始运行,线程这时候处于 READY(就绪) 状态。
- 可运行状态的线程获得了 cpu 时间片后就处于 RUNNING(运行) 状态(操作系统隐藏 JVM 中的 READY 和 RUNNING 状态,它只能看到 RUNNABLE 状态)。
- 当线程执行 wait() 方法之后,线程进入 WAITING(等待) 状态,进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态。
- TIME_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制,比如通过 sleep(long millis)方法或 wait(long millis)方法可以将 Java 线程置于 TIME_WAITING 状态。当超时时间到达后 Java 线程将会返回 RUNNABLE 状态。
- 当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入到 BLOCKED(阻塞) 状态。
- 线程在执行 Runnable 的 run() 方法之后将会进入到 TERMINATED(终止) 状态。
5、并发级别有哪些?
阻塞、无饥饿、无障碍、无锁、无等待。
-
阻塞(Blocking):
一个线程是阻塞的,那么在其他线程释放资源之前,当前线程无法继续执行。当我们使用 synchronized 关键字,或者重入锁时就会产生阻塞的线程。无论是 synchronized 或者重入锁,都会试图在执行后续代码前,得到临界区的锁,如果得不到,线程就会被挂起等待,直到占有了所需资源为止。 -
无饥饿(Starvation-Free):
这个取决于线程之间是否有优先级的存在,如果系统允许高优先级的线程插队。这样可能导致低优先级线程产生饥饿。 -
无障碍(Obstruction-Free):
无障碍是一种最弱的非阻塞调度。相对来说非阻塞的调度就是一种乐观的策略。
从这个策略中也可以看到,当临界区中存在严重的冲突时,所有的线程可能都会不断地回滚自己的操作,而没有一个线程可以走出临界区。这种情况会影响系统的正常执行。
这也是利用 CAS 理论的思想:一种可行的无障碍实现可以依赖一个 “一致性标记” 来实现。线程在操作之前,先读取并保存这个标记,在操作完成后,再次读取,检查这个标记是否被更改过,如果两者是一致的,则说明资源访问没有冲突。如果不一致,则说明资源可能在操作过程中与其他写线程冲突,需要重试操作。而任何对资源有修改操作的线程,在修改数据前,都需要更新这个一致性标记,表示数据不再安全。 -
无锁(Lock-Free):
无锁的并行都是无障碍的。在无锁的情况下,所有的线程都能尝试对临界区进行访问,但不同的是,无锁的并发保证必然有一个线程能够在有限步内完成操作离开临界区,一个典型的特点是可能会包含一个无穷循环。在这个循环中,线程会不断尝试修改共享变量。如果修改成功,程序退出,否则继续尝试修改。但无论如何,无锁的并行总能保证有一个线程是可以胜出的。至于临界区中竞争失败的线程,它们则不断重试,直到自己获胜。如果总是尝试不成功,则会出现类似饥饿的现象,线程会停止不前。 -
无等待(Wait-Free):
无锁只要求有一个线程可以在有限步内完成操作,而无等待则在无锁的基础上更进一步进行扩展。它要求所有的线程都必须在有限步内完成,这样就不会引起饥饿问题。如果限制这个步骤上限,还可以进一步分解为有界无等待和线程数无关的无等待几种,它们之间的区别只是对循环次数的限制不同。一种典型的无等待结构就是 RCU(Read-Copy-Update)。它的基本思想是,对数据的读可以不加控制。因此,所有的读线程都是无等待的,它们既不会被锁定等待也不会引起任何冲突。但在写数据的时候,先取得原始数据的副本,接着只修改副本数据(这就是为什么读可以不加控制),修改完成后,在合适的时候回写数据。
6、创建线程的几种方式。
-
继承 Thread 类创建线程
(1)定义 Thread 类的子类,并重写该类的 run 方法,该 run 方法的方法体就代表了线程要完成的任务。因此把 run() 方法称为执行体。
(2)创建 Thread 子类的实例,即创建了线程对象。
(3)调用线程对象的 start() 方法来启动该线程。 -
实现 Runnable 接口创建对象
(1)定义 runnable 接口的实现类,并重写该接口的 run() 方法,该 run() 方法的方法体同样是该线程的线程执行体。
(2)创建 Runnable 实现类的实例,并以此实例作为 Thread 的 target 来创建 Thread 对象,该 Thread 对象才是真正的线程对象。
(3)调用线程对象的 start() 方法来启动该线程。 -
使用 Callable 和 Future 创建线程
与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask 进行封装。
(1)创建 Callable 接口的实现类,并实现 call() 方法,该 call() 方法将作为线程执行体,并且有返回值。
(2)创建 Callable 实现类的实例,使用 FutureTask 类来包装 Callable 对象,该 FutureTask 对象封装了该 Callable 对象的 call() 方法的返回值。(FutureTask 是一个包装器,它通过接受 Callable 来创建,它同时实现了 Future 和 Runnable 接口。)
(3)调用 FutureTask 对象作为 Thread 对象的 target 创建并启动新线程。
(4)调用 FutureTask 对象的 get() 方法来获取子线程执行结束后的返回值。
public static void main(String args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}
- 使用线程池,例如用 Executor 框架(工厂方法)
创建线程的方式的对比
-
采用实现 Runnable、Callable 接口的方式创建多线程时,
线程类只是实现了 Runnable 接口或 Callable 接口,还可以继承其他类。在这种方式下,多个线程可以共享同一个 target 对象,所以非常适合多个相同线程来处理同一份资源的情况,从而可以将 CPU、代码和数据分开。但是,缺点是编程稍微复杂,如果要访问当前线程,则必须使用 Thread.currentThread() 方法。 -
使用继承 Thread 类的方式创建多线程时,
如果需要访问当前线程,则无需使用 Thread.currentThread() 方法,直接使用 this 即可获得当前线程。缺点是线程类已经继承了 Thread 类,所以不能再继承其他父类。 -
Runnable 和 Callable 的区别
1)Callable 规定(重写)的方法是 call(),Runnable 规定(重写)的方法是 run()。
2)Callable 的任务执行后可返回值,而 Runnable 的任务是不能返回值的。
3)call 方法可以抛出异常, run 方法不可以。
4)运行 Callable 任务可以拿到一个 Future 对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检测计算的结果。通过 Future 对象可以了解任务执行情况,可取消任务的执行,还可以获取执结果。
7、线程基本操作与线程协作
基本操作:
- Daemon:
当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。 - sleep():
Thread.sleep(millisec) 会休眠当前正在执行的线程,millisec 单位为毫秒。 - yield():
对静态方法 Thread.yield() 的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其他线程来执行。
线程协作:
-
join() 和 yeild():
在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。比如有线程 a 和 b,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。 -
wait()、notify()、notifyAll():
使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其他线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。
wait() 和 sleep() 的区别
(1)wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;
(2)wait() 会释放锁,sleep() 不会。
- await()、signal()、signalAll():
Java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其他线程调用 signal() 或 signalAll() 方法唤醒等待的线程。相比于 wait() 这种等待的方式,await() 可以指定等待的条件,因此更加灵活。使用 Lock 来获取一个 Condition 对象。
8、线程同步的几种方式?
同步是为了在多线程环境下安全访问临界区(共享区域),程序不会产生设计之外的错误结果(举个例子,如果一个银行账户同时被两个线程操作,一个取 100 块,一个存钱 100 块。假设账户原本有 0 块,如果取钱线程和存钱线程同时发生,会出现什么结果呢?)。
-
Synchronized:
同步方法:即有 synchronized 关键字修饰的方法。由于 java 的每个对象都有一个内置锁,当用此关键字修饰方法时,内置锁会保护整个方法。在调用该方法前,需要获得内置锁,否则就处于阻塞状态。
同步代码块:即有 synchronized 关键字修饰的语句块。被该关键字修饰的语句块会自动被加上内置锁,从而实现同步。( 同步是一种高开销的操作,因此应该尽量减少同步的内容。 通常没有必要同步整个方法,使用 synchronized 代码块同步关键代码即可。引申:锁的优化之减少锁的粒度) -
ReentrantLock:
ReentrantLock 类是可重入、互斥、实现了 Lock 接口的锁,它与 synchronized 具有相同的基本行为和语义,并且扩展了其能力。(引申:Synchronized 和 ReentrantLock 区别) -
使用 ThreadLocal 原理:
底层原理:
(1)ThreadLocal 仅仅是个变量访问的入口;
(2)每一个 Thread 对象都有一个 ThreadLocalMap 对象,这个 ThreadLocalMap 持有对象的引用;
(3)ThreadLocalMap 以当前的 threadLocal 对象为 key,以真正的存储对象为 value。get() 方法时通过 threadLocal 实例就可以找到绑定在当前线程上的副本对象。
public void set(T value) {
Thread t = Thread.currentThread(); // 1、首先获取当前线程对象
ThreadLocalMap map = getMap(t); // 2、获取该线程对象的 ThreadLocalMap
if (map != null)
map.set(this, value); // 3、如果 map 不为空,执行 set 操作,以当前 threadLocal 对象为 key,实际存储对象为 value 进行 set 操作。
else
createMap(t, value); // 如果 map 为空,则为该线程创建 ThreadLocalMap
}
-
volatile 关键字:
volatile 是变量修饰符,其修饰的变量具有可见性。 -
并发包中的锁:
读写锁、CountDownLatch、信号量等或者采用并发集合 ConcurrentHashMap,原子类 Atomic、阻塞队列等。
9、Java 各种锁:悲观锁、乐观锁、自旋锁、偏向锁、轻量/重量级锁、读写锁、可重入锁
1、悲观锁和乐观锁
悲观锁和乐观锁并不是特指某个锁,而是在并发情况下的两种不同策略,是一种宏观的描述。
-
悲观锁(Pessimistic Lock)
认为冲突一定发生,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿数据就被阻塞,直到悲观锁被释放。 -
乐观锁(Optimistic Lock)
认为冲突不会发生,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是如果想要更新数据,则会在更新前检查在读取至更新这段时间别人有没有修改过这个数据。如果修改过,则重新读取,再次尝试更新,循环上述步骤直到更新成功(当然也允许更新失败的线程放弃操作)。(引申:乐观锁的一种实现方式 CAS)
悲观锁阻塞事务,乐观锁回滚重试,它们各有优缺点,不要认为一种一定号于另一种。乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行重试,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。
2、synchronized 锁升级:无锁->偏向锁->轻量级锁->重量级锁(与锁的优化一起学习)
synchronized 会从无锁升级为偏向锁,再升级为轻量级锁,最后升级为重量级锁,这个过程叫做锁膨胀。
-
偏向锁:
初次执行到 synchronized 代码块的时候,锁对象变成偏向锁(通过 CAS 修改对象头里的锁标志位),字面意思是 “偏向于第一个获得它的线程” 的锁。执行完同步代码块后,线程并不会主动释放偏向锁。当第二次到达同步代码块时,线程会判断此时持有锁的线程是否就是自己(持有锁的线程 ID 也在对象头里),如果是则正常往下执行。由于之前没有释放锁,这里也就不需要重新加锁。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高。(引申:Java 的对象头结构组成) -
轻量级锁(自旋锁):
一旦有第二个线程加入锁竞争,偏向锁就升级为轻量级锁(自旋锁)。只有当某线程尝试获取锁的时候,发现该锁已经被占用,只能等待其释放,这才发生了锁竞争。在锁竞争下,没有抢到锁的线程将自旋,即不停地循环判断锁是否能够被成功获取。长时间的自旋操作是非常消耗资源的,一个线程持有锁,其他线程就只能在原地空耗 CPU,执行不了任何有效的任务,这种现象叫忙等(busy-waiting)。如果多个线程用一个锁,但是没有发生锁竞争,或者发生了很轻微的锁竞争,那么 synchronized 就用轻量级锁,允许短时间的忙等现象。这是一种折衷的想法,短时间的忙等,换取线程在用户态和内核态之间的切换的开销。 -
重量级锁:
如果锁竞争情况严重,某个达到最大自旋次数的线程,会将轻量级锁升级为重量级锁。当后续线程尝试获取锁时,发现被占用的锁是重量级锁,则直接将自己挂起(而不是忙等),等待将来被唤醒。
3、公平锁和非公平锁
1)如果多个线程申请一把公平锁,先申请的先得到,非常公平。如果是非公平锁,后申请的线程可能先获取到锁,是随机或者按照其他优先级排序的。
2)ReentrantLock,通过构造函数传参可以指定该锁是否是公平锁,默认是非公平锁。一般情况下,非公平锁的吞吐量比公平锁大,如果没有特许要求,优先使用非公平锁。
3)sybchronized ,是非公平锁,没有任何办法使其变成公平锁。
4、读写锁、共享锁、互斥锁
1)读写锁其实是一对锁,一个读锁(共享锁)和一个写锁(互斥锁、排他锁)。
2)如果读取值是为了更新它,那么就直接加写锁,持有写锁的时候别的线程无论读还是写都需要等待;
3)如果我读取数据仅为了查询,那就明确地加一个读锁,其他线程如果也要加读锁,不需要等待,可以直接获取(读锁计数器+1)。JDK 提供地唯一一个 ReadWriteLock 接口实现类是 ReentrantReadWriteLock。
5、可重入锁和不可重入锁
-
不可重入锁:
如果当前线程执行中已经获取了锁,如果再次获取该锁时,就会获取不到被阻塞。 -
可重入锁:
如果当前线程执行中已经获取了锁,并且可以再次获取该锁。ReentrantLock 和 synchronized 都是可重入锁。
(引申:如何实现可重入锁?加上对锁次数的统计,是自己线程获得锁,当下次获取的时候,次数加 1 )
总结:
Java 里使用的各种锁,几乎全都是悲观锁。 Synchronized 、 JDK 提供的 Lock 实现类,其实只要有 “锁对象” 出现,一定是悲观锁。乐观锁不是锁,而是一个在循环里尝试 CAS 的算法。那 JDK 并发包里到底有没有乐观锁?JUC 包里面的原子类都是利用乐观锁实现的。(引申:原子类的源码实现过程)
10、ReenTrantLock 和 synchronized 有什么区别?
-
锁的实现
synchronized 是 JVM 实现的,而 ReentrantLock 是 JDK 实现的。 -
性能
新版本 Java 对 synchronized 进行了很多优化,例如自旋锁等,synchronized 与 ReentrantLock 大致相同。 -
等待可中断
当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。ReentrantLock 可中断,而 synchronized 不行。 -
公平锁
synchronized 中的锁时非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的。 -
锁绑定多个条件
一个 ReentrantLock 可以同时绑定多个 Condition 对象 -
都是可重入锁
-
判断锁的状态
synchronized 无法判断获取锁的状态;ReentrantLock 可以判断是否获取到了锁。 -
释放锁
synchronized 会自动释放锁;ReentrantLock 必须要手动释放锁,如果不释放锁会导致死锁。
生产者消费者的实现
- synchronized 实现
public class Main() {
public static void main(String[] args) {
Data data = new Data();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "A").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "B").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "C").start();
}
}
class Data {
private int number = 0;
// +1
public synchronized void increment() throws InterruptedException {
while (number != 0) { // 不能用 if 来判断,因为可能会出现虚假唤醒
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我 +1 完毕了
this.notifyAll();
}
// -1
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我 -1 完毕了
this.notifyAll();
}
}
- Lock 实现
public class Main() {
public static void main(String[] args) {
Data data = new Data();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "A").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "B").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "C").start();
}
}
class Data {
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// +1
public void increment() throws InterruptedException {
lock.lock();
try {
while (number != 0) { // 不能用 if 来判断,因为可能会出现虚假唤醒
//等待
condition.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我 +1 完毕了
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// -1
public void increment() throws InterruptedException {
lock.lock();
try {
while (number == 0) {
//等待
condition.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
// 通知其他线程,我 +1 完毕了
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
如果要求精准唤醒线程(ABC轮流打印)
public class Main() {
public static void main(String[] args) {
Data data = new Data();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
data.printA();
}
}
}, "A").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
data.printB();
}
}
}, "B").start();
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0; i<10; i++) {
data.printC();
}
}
}, "C").start();
}
}
class Data {
private int number = 1; // 1A 2B 3C
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
public void printA() {
lock.lock();
try {
while (number != 1) {
//等待
condition.wait();
}
number = 2;
System.out.println(Thread.currentThread().getName() + "=>" + "AAA");
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (number != 2) {
//等待
condition.wait();
}
number = 3;
System.out.println(Thread.currentThread().getName() + "=>" + "BBB");
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (number != 3) {
//等待
condition.wait();
}
number = 1;
System.out.println(Thread.currentThread().getName() + "=>" + "CCC");
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
11、volatile 和 synchronized 有什么区别?
分别针对什么问题
-
volatile:
编译器为了加快程序运行的速度,对一些变量的写操作会先在寄存器或者是 CPU 缓存上进行,最后才写入内存。而在这个过程,变量的新值对其他线程是不可见的,而 volatile 的作用就是使它修饰的变量的读写操作都必须在内存中进行。 -
synchronized:
解决的是多个线程之间访问资源的同步性,synchronized 关键字可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。
区别:
(1)volatile 本质是在高速 JVM 当前变量在寄存器中的值是不确定的,需要从主存中读取;synchronized 则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住。
(2)volatile 仅能使用在变量级别;synchronized 仅使用在语句块、方法。
(3)volatile 仅能实现变量修改的可见性,但不具备原子性;synchronized 则可以保证变量的修改可见性和原子性。(引申:volatile中System.out.println()影响可见性问题)
(4)保证可见性的机制不同,volatile 使用机器指令(偏硬件)"lock;"的方式迫使其他线程工作内存中的数据失效,不得到主内存中进行再次加载;synchronized 借助于JVM指令 monitor enter 和 monitor exit 对通过排他的方式使得同步代码串行化,在 monitor exit时所有共享资源都将会被刷新到主内存中。
(5)volatile 不会造成线程的阻塞;synchronized 可能会造成线程的阻塞。
(6)volatile 标记的变量不会被编译器优化,而 synchronized 标记的变量可以被编译器优化。
引申:多线程对一个 volatile 变量进行 ++,最终这个变量结果是准确的吗?不准确的,因为 volatile 不保证原子性,最终结果是小于真实值的。原子类 Atomic 可以保证变量的原子性,那么他的原理是怎么实现的?采用 CAS 的无锁机制。
12、synchronized 关键字的所有问题
1、简述
synchronized 关键字解决的是多个线程之间访问资源的同步性,synchronized 关键字可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。另外,在 Java 早期版本中,synchronized 属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的 MutexLock 来实现的,Java 的线程是映射到操作系统的原生线程之上的。如果要挂起或者唤醒一个线程,都需要操作系统帮忙完成,而操作系统实现线程之间的切换时需要从用户态转换到内核态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,这也是为什么早期 synchronized 效率低的原因。
在 Java 6 之后官方从 JVM 层面对 synchronized 较大优化,所以现在的 synchronized 锁效率也优化得很不错了。JDK1.6 对锁的实现引入了大量的优化,如自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销。
2、使用
8锁问题,就是关于锁的8个问题
- 1、标准情况下,两个线程先打印 发短信 还是 打电话? -> 发短信 打电话
在标准情况下,先启动线程 A ,调用 sendSms() 方法;再启动线程 B,调用 call() 方法。结果是先发短信,再打电话。
public class Main {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(new Runnable() {
@Override
public void run() {
phone.sendSms();
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1); // 延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
phone.call();
}
}, "B").start();
}
}
class Phone {
// synchronized 锁的对象是方法的调用者
// 两个方法用的是同一个锁,谁先拿到谁执行。(phone 的锁,只有一个锁)
public synchronized void sendSms() {
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
- 2、sendSms 延迟 4 秒,两个线程先打印 发短信 还是 打电话? -> 发短信 打电话
如果 1 成立,且原因是先调用了sendSms()方法,那么加上延时4秒后,应该是先打电话,再发短信。但实际结果还是 先发短信,再打电话。
原因是:
(1)synchronized 是给 Phone 的实例对象 phone 加上了锁,即两个方法用的是同一个锁,谁先抢到谁执行。
(2)线程 A 启动比 线程 B 要早一秒,即 sendSms() 方法先获取了锁,执行完动作后(包括延时动作),再释放锁给线程 B 执行。
public class Main {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(new Runnable() {
@Override
public void run() {
phone.sendSms();
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1); // 延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
phone.call();
}
}, "B").start();
}
}
class Phone {
// synchronized 锁的对象是方法的调用者
// 两个方法用的是同一个锁,谁先拿到谁执行。(phone 的锁,只有一个锁)
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4); // 延时4秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
}
- 3、增加了一个普通方法后,先 发短信 还是 hello? -> hello 发短信
因为 hello() 方法不是同步方法,所以不受锁的影响。
先启动线程A ,调用 sendSms()方法,延时后输出 发短信;线程 B 调用后直接输出 hello ,不需要等待锁。
public class Main {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(new Runnable() {
@Override
public void run() {
phone.sendSms();
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1); // 延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
phone.hello();
}
}, "B").start();
}
}
class Phone {
// synchronized 锁的对象是方法的调用者
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4); // 延时4秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
// 这里没有锁,不是同步方法,不受锁的影响。
public void hello() {
System.out.println("hello");
}
}
- 4、两个对象,两个同步方法,发短信 还是 打电话? -> 打电话 发短信
phone1 和 phone2 是两个对象,所以对应有两个锁,互不影响,各自执行。
先启动线程 A,调用 sendSms()方法,延时后打印 发短信;线程 B 调用后输出 打电话,
因为线程 A 存在延时,所以是先输出 打电话,再输出 发短信。
public class Main {
public static void main(String[] args) {
// 两个对象,两个调用者,两把锁。拿的锁不一样,按时间来。
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(new Runnable() {
@Override
public void run() {
phone1.sendSms();
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1); // 延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
phone2.call();
}
}, "B").start();
}
}
class Phone {
// synchronized 锁的对象是方法的调用者
public synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4); // 延时4秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
// 这里没有锁,不是同步方法,不受锁的影响。
public void hello() {
System.out.println("hello");
}
}
- 5、增加两个静态的同步方法,只有一个对象,先打印 发短信 还是打电话? -> 发短信 打电话
public class Main {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(new Runnable() {
@Override
public void run() {
phone.sendSms();
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1); // 延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
phone.call();
}
}, "B").start();
}
}
// Phone唯一的一个 Class 对象,即 Class<Phone> phoneClass = Phone.class;
class Phone {
// synchronized 锁的对象是方法的调用者
// static 静态方法
// 类一加载就有了。锁的是 Class
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4); // 延时4秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
- 6、两个对象。增加两个静态的同步方法,先打印 发短信 还是 打电话? -> 发短信 打电话
5 和 6 的结果都是:先 发短信,再 打电话。
synchronized 对 static 加锁实际上是对 Class 模板加锁,
两个方法都被 static 修饰了,所以用的是同一个锁。
public class Main {
public static void main(String[] args) {
// 两个对象的 Class 类模板只有一个,static,锁的是 Class
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(new Runnable() {
@Override
public void run() {
phone1.sendSms();
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1); // 延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
phone2.call();
}
}, "B").start();
}
}
// Phone唯一的一个 Class 对象,即 Class<Phone> phoneClass = Phone.class;
class Phone {
// synchronized 锁的对象是方法的调用者
// static 静态方法
// 类一加载就有了。锁的是 Class
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4); // 延时4秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
- 7、一个静态的同步方法,一个普通的同步方法,一个对象,先打印 发短信 还是 打电话? -> 打电话 发短信
public class Main {
public static void main(String[] args) {
// 两个对象的 Class 类模板只有一个,static,锁的是 Class
Phone phone = new Phone();
new Thread(new Runnable() {
@Override
public void run() {
phone.sendSms();
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1); // 延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
phone.call();
}
}, "B").start();
}
}
class Phone {
// 静态的同步方法 锁的是 Class 类模板
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4); // 延时4秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
// 普通同步方法 锁的是调用者
public synchronized void call() {
System.out.println("打电话");
}
}
- 8、一个静态的同步方法,一个普通的同步方法,两个对象,先打印 发短信 还是 打电话? -> 打电话 发短信
new this 具体的一个实例
static Class 类的模板
static 修饰的方法用的锁是 Class 锁,普通同步方法用的是对象锁,所以两个方法用的不是同一个锁。
public class Main {
public static void main(String[] args) {
// 两个对象的 Class 类模板只有一个,static,锁的是 Class
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(new Runnable() {
@Override
public void run() {
phone1.sendSms();
}
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1); // 延时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
phone2.call();
}
}, "B").start();
}
}
class Phone {
// 静态的同步方法 锁的是 Class 类模板
public static synchronized void sendSms() {
try {
TimeUnit.SECONDS.sleep(4); // 延时4秒
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
// 普通同步方法 锁的是调用者
public synchronized void call() {
System.out.println("打电话");
}
}
总结:synchronized 关键字最主要的三种使用方式
(1)普通同步方法,锁是当前实例的实例对象;
(2)静态同步方法,锁是当前类的 class 对象;
(3)同步方法块,锁是自己定义的锁对象。
3、底层原理
Java 对象头和 monitor 是实现 synchronized 的基础。对象头内保存着无锁状态、偏向锁、轻量级锁、重量级锁等状态和标志位,monitor 来标识一个对象的锁定状态(线程与对象之间锁定关系)。
(1)同步代码块:
使用monitorenter 和 monitorexit 实现。Monitorenter 指向同步代码块的开始位置,monitorexit 指向结束位置。任何对象都有一个 monitor 和它关联,一个 monitor 被持有之后,将处于锁定状态。线程执行到 monitorenter 后会尝试获取对应 monitor,即对象的锁。monitorexit 释放 monitor。
(2)同步方法:
使用方法修饰符 ACC_SYNCHRONIZED 实现。
(3)可重入性:
重入锁实现可重入性原理或机制是:每一个锁关联一个线程持有者和计数器,当计数器为 0 时表示该锁没有被任何线程持有,那么任何线程都可能获得该锁而调用相应的方法;当某一线程请求成功后,JVM 会记下锁的持有线程,并且将计数器置为 1;此时其他线程请求该锁,则必须等待;而该持有锁的线程如果再次请求这个锁,就可以再次拿到这个锁,同时计数器会递增;当线程退出同步代码块时,计数器会递减,如果计数器为 0,则释放该锁。
(4)锁的优化
-
减少锁的持有时间;
-
减少锁粒度(只对需要的一部分进行加锁),相对是锁的粗化;
-
读写分离锁(读和读之间不阻塞);
-
JVM 对锁的优化(这是 synchronized 优化)
1、偏向锁:第一个线程请求锁并拿到锁之后,下次再请求锁不需要进行同步操作,直接拿到锁。当有第二个线程竞争的时候,偏向锁失效。
2、轻量级锁:偏向锁失效后,使用轻量级锁。将对象头部作为指针,指向锁记录,如果成功就获得轻量级锁,如果失败升级为自旋锁。
3、自旋锁:轻量级锁失效之后,JVM 会假设在不久的时间内,可以拿到锁。所以让线程进行几个空循环,如果可以拿到锁则进入临界区,否则阻塞等待锁。
4、锁消除:取出不可能存在共享资源竞争的锁。节省不必要的锁请求的时间。
13、Volatile 的三个特点
保证可见性、不保证原子性、禁止指令重排
- 保证可见性
当对volatile变量执行写操作后,JMM会把工作内存中的最新变量值强制刷新到主内存,写操作会导致其他线程中的缓存无效。这样,其他线程使用缓存时,发现本地工作内存中此变量无效,便从主内存中获取,这样获取到的变量便是最新的值,实现了线程的可见性。
/**
* 不加 volatile:
* 现象1:主线程不加 Thread.sleep(10);线程1将flag改为true,主线程读取不到,主线程flag=false,主线程一直循环。(这种情况如果电脑性能好的话,可能出现不了,多找几台电脑试试)
* 现象2:主线程加 Thread.sleep(10);那么主线程可以读取到线程1改变的flag值,说明线程1将flag值刷新到了公共内存中,此时两者都为true。
*
* 加volatile:
* 此时主线程不加 Thread.sleep(10);两个线程都会结束;flag 都为true,说明volatile关键字是让变量变成线程之间可见。
*/
public class Main implements Runnable {
// private boolean flag = false;
private volatile boolean flag = false;
public boolean getFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
flag = true;
System.out.println("其他线程 flag = " + getFlag());
}
public static void main(String[] args) {
Main m = new Main();
new Thread(m, "1").start();
while (true) {
// Thread.sleep(10); // 不加这个,有的电脑就读取不到公共内存中的数据,主线程一直循环
// 就算不加volatile,可见性不一定能保证,但并不意味着线程之间就“绝对不可见”,
// 因为在cpu空闲的时候也会将变量的值从线程缓存刷新到主存中,有时候while(true)
if (m.getFlag()) {
System.out.println("主线程 flag = " + m.getFlag());
break;
}
}
}
}
- 不保证原子性
原子性:不可分割
线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败。
如果不加 lock 和 synchronized,怎么保证原子性?
可以使用原子类,来解决原子性问题。
public class Main {
// private volatile static int num = 0; // volatile不保证原子性
// 原子类的 Integer
private volatile static AtomicInteger num = new AtomicInteger();
public static void add() {
// num++; // 不是一个原子性操作
// 过程:1、获得这个值;2、+1;3、写回这个值
num.getAndIncrement(); // AtomicInteger + 1 方法,CAS
}
public static void main(String[] args) {
for (int i=1; i<=20; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j=0; i<1000; j++) {
add();
}
}
}).start();
}
while (Thread.activeCount() > 2) { // main gc
Thread.yield();
}
System.out.println(num); // 结果小于20000
}
}
这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类
- 禁止指令重排
什么是指令重排:你写的程序,计算机并不是按照你写的那样去执行的。
源代码 -> 编译器优化的重排 -> 指令并行也可能会重排 -> 内存系统也会重排 -> 执行
处理器在进行指令重排的时候,考虑:数据之间的依赖性!
int x = 1; // 1
int x = 2; // 2
x = x + 5; // 3
y = x * x; // 4
所期望的:1234 但是可能执行的时候会变成 2134 1324
可能造成影响的结果:a b x y 这四个值默认都是0;
线程A | 线程B |
---|---|
x = a | y = b |
b = 1 | a = 2 |
正常的结果:x=0;y=0;但是可能由于指令重排
线程A | 线程B |
---|---|
b = 1 | a = 2 |
x = a | y = b |
指令重排导致的诡异结果:x=2;y=1;
volatile 可以避免指令重排:
内存屏障。CPU指令。作用:
1、保证特定的操作的执行顺序。(禁止上面指令和下面指令顺序交换)
2、可以保证某些变量的内存可见性(利用这些特性 volatile 实现了可见性)
14、简述线程池
1、为什么要使用线程池?(创建销毁开销大)
- 降低资源消耗。
通过重复利用已经创建的线程降低线程创建和销毁造成的消耗。 - 提高响应速度。
当任务到达时,任务可以不需要的等到线程创建就能立即执行。 - 提高线程的可管理性。
线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
2、JUC中的线程池体系
ThreadPoolExecutor 类和 ScheduledThreadPoolExecutor 实现了 ExecutorService 接口和 Executor 接口,并由 Executors 类扮演线程池工厂的角色(工厂设计模式)。
3、ThreadPoolExecutor 的创建参数
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
int maximumPoolSize, // 最大核心线程池大小
long keepAliveTime, // 超时了没有人调用就会释放
TimeUnit unit, // 超时单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂,创建线程的,一般不用动
RejectedExecutionHandler handler // 拒绝策略
) {
(1)corePoolSize:
核心运行的线程个数,当向线程池提交一个任务时,如果线程池已创建的线程数小于 corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于或等于 corePoolSize ,(除了利用提交新任务来创建和启动线程(按需构造),也可以通过 prestartCoreThread() 或 prestartAllCoreThread() 方法来提前启动线程池中的基本线程。)
(2)maximumPoolSize:
最大线程个数,当大于这个值就会将准备新加的异步任务由一个丢弃处理机制来处理,大于 corePoolSize 且小于 maximumPoolSize 则新建 Thread 执行,但是当通过 newFixedThreadPool 创建的时候,corePoolSize 和 maximumPoolSize 是一样的,而 corePoolSize 是先执行的,所以会先被放入等待队列而不会执行到下面的丢弃处理中。另外,对于无界队列,可忽略该参数。
(3)workQueue:
任务等待队列,当达到 corePoolSize 的时候就向该等待队列放入线程信息(默认为一个 LinkedBlockingQueue)。
(4)keepAliveTime:
默认为0,当线程没有任务处理后空闲线程保持多长时间,不推荐使用。
(5)threadFactory:
是构造 Thread 的方法,一个接口类,可以使用默认的 default 实现,也可以自己去包装和传递,主要实现 newThread 方法即可。
(6)defaultHandler:
当参数 maximumPoolSize 达到后丢弃处理的方法实现,java 提供了 5 种丢弃处理的方法, java 默认使用的是 AbortPolicy,作用是当出现这种情况的时候抛出一个异常。
引申:线程池参数如何设置?
(1)如果任务为耗时 IO型,比如读取数据库、文件读写以及网络通信的话,这些任务不会占据很多 cpu 的资源但是会比较耗时:线程数量设置为 2 倍 CPU 数以上,充分的来利用 CPU 资源。
(2)如果任务为 CPU 密集型 的话,比如大量计算、解压、压缩等这些操作都会占据大量的 cpu,这种情况一般设置线程数为:1 倍 cpu + 1。为啥要加 1,很多说法是备份线程。
(3)如果既有 IO 密集型任务,又有 CPU 密集型任务,该怎么设置线程大小?这种的话最好分开用线程池处理,IO 密集的用 IO 密集型线程池处理,CPU 密集型的用 CPU 密集型处理。
4、Executors 工厂类实现的线程池
(通过创建不同的 ThreadPoolExecutor 对象)
-
newFixedThreadPool 固定大小的线程池
它是一种固定大小的线程池:
corePoolSize 和 maximumPoolSize 都为用户设定的线程数量 n Threads;
keepAliveTime 为 0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里 keepAliveTime 无效;
阻塞队列采用了 LinkedBlockingQueue,它是一个无界队列;
由于阻塞队列是一个无界队列,因此永远不能拒绝任务;
由于采用了无界队列,实际线程数量将永远维持在 n Threads,因此 maximumPoolSize 和 keepAliveTime 将无效。 -
newCachedThreadPool 可伸缩大小的线程池
它是一个可以无限扩大的线程池;
它比较适合处理执行时间比较小的任务;
corePoolSize 为 0,maximumPoolSize 为无限大,意味着线程数量可以无限大;
keepAilveTime 为 60s,意味着线程空闲时间超过 60s就会被杀死;
采用 SynchronousQueue 装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。 -
newSingleThreadExecutor 线程池里面只有单个线程
它只会创建一条工作线程处理任务;
采用的阻塞队列为 LinkedBlockingQueue; -
ScheduledThradPool 可调度的线程池
实现周期性线程调度,比较常用。
scheduleAtFixedRate 或 scheduleWithFixedDelay 区别:
scheduleAtFixedRate 表示以固定频率执行的任务,如果当前任务耗时较多,超过定时周期 period,则当前任务结束后会立即执行;
scheduleWithFixedDelay 表示以固定延时执行任务,延时是相对当前任务结束为起点计算开始时间。
以上创建线程池的缺点:
(1)FixedThreadPool 和 SingleThreadExecutor => 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而引起 OOM 异常;
(2)CachedThreadPool => 允许创建的线程数为 Integer.MAX_VALUE,可能会创建大量的线程,从而引起 OOM 异常。
一般来说,线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
5、线程池中使用的 BlockingQueue
直接提交队列、有界队列、无界队列、优先级队列。
(1)直接提交队列:
设置为 SynchronousQueue 队列,SynchronousQueue 是一个特殊的 BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。
(2)有界的任务队列:
有界的任务队列可以使用 ArrayBlockingQueue 实现。如果有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到 corePoolSize 时,则会将新的任务加入到等待队列中。如果等待队列已经满了,即超过 ArrayBlockingQueue 初始化的容量,则继续创建线程,直到线程数量达到 maximumPoolSize 设置的最大线程数量,如果大于 maximumPoolSize,则执行拒绝策略。
(3)无界的任务队列:
有界任务队列可以使用 LinkedBlockingQueue 实现。使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你 corePoolSize 设置的数量,也就是说在这种情况下 maximumPoolSize 这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到 corePoolSize 后,就不会再增加了;如果后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
(4)优先任务队列:
优先任务队列通过 PriorityBlockingQueue 实现。通过运行的代码我们可以看出 PriorityBlockingQueue 它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过 corePoolSize 的数量,只不过其他队列一般是按照先进先出的规则处理任务,而 PriorityBlockingQueue 队列可以自定义规则根据任务的优先级顺序先后执行。
15、线程池的增长策略。(任务调度)
当一个任务通过 execute(Runnable) 方法欲添加到线程池时:
1、如果此时线程池中的数量小于 corePoolSize,即使线程池中的线程都处于空闲状态。也要创建新的线程来处理被添加的任务。
2、如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue 未满,那么任务被放入缓冲队列。
3、如果此时线程池中的数量大于 corePoolSize,缓冲队列 workQueue 满,并且线程池中的数量小于 maximumPoolSize,创建新的线程来处理被添加的任务。
4、如果此时线程池中的数量大于 corePoolSize,缓冲队列 workQueue 满,并且线程池中的数量等于 maximumPoolSize,那么通过 handler 所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程 corePoolSize、任务队列 workQueue、最大线程 maximumPoolSize,如果三者都满了,使用 handler 处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize 时,如果某线程空闲时间超过 keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
具体增长策略要看你使用什么 workQueue。
16、线程池的拒绝策略
1、AbortPolicy(默认):这种策略直接抛出异常,丢弃任务(当都满了)。
2、CallerRunsPolicy:策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就是直接使用调用该 execute 的线程本身来执行。很有可能造成当前线程也被阻塞。
3、DiscardPolicy:不能执行的任务将被删除,这种策略和 AbortPolicy 几乎一样,也是丢弃任务,只不过他不抛出异常。
4、DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
17、实现一个阻塞队列
18、ArrayBlockingQueue 和 LinkedBlockingQueue 的区别
(1)队列大小有所不同,ArrayBlockingQueue 是有界的初始化必须指定大小,而 LinkedBlockingQueue 可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
(2)数据存储容器不同,ArrayBlockingQueue 采用的是数组作为数据存储容器,而 LinkedBlockingQueue 采用的则是以 Node 节点作为连接对象的链表。
(3)由于 ArrayBlockingQueue 采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而 LinkedBlockingQueue 则会生成一个额外的 Node 对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于 GC 可能存在较大影响。
(4)两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue 实现的队列中的锁是没有分离的,即添加操作和移除采用的同一个 ReenterLock 锁,而 LinkedBlockingQueue 实现的队列中的锁是分离的,其添加采用的是 putLock,移除采用的则是 takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
19、ConcurrentHashMap 实现原理
20、快速失败和安全失败
-
快速失败(fail-fast)
在用迭代器遍历一个集合对象时,如果遍历过程中对集合对象的内容进行了修改(增加、删除、修改),则会抛出 Concurrent Modification Exception。
场景:Java.util 包下的集合类都是快速失败的,不能在多线程下发生并发修改(迭代过程中被修改)。 -
安全失败(fail-safe)
采用安全失败机制的集合容器,在遍历时不是直接在集合内容上访问的,而是先复制原有集合内容,在拷贝的集合上进行遍历。
原理:由于迭代时是对原集合的拷贝进行遍历,所以在遍历过程中对原集合所作的修改并不能被迭代器检测到,所以不会触发 Concurrent Modification Exception。
缺点:基于拷贝内容的优点是避免了 Concurrent Modification Exception,但同样地,迭代器并不能访问到修改后的内容,即:迭代器遍历的是开始遍历那一刻拿到的集合拷贝,在遍历期间集合发生的修改迭代器是不知道的。
场景:Java.util.concurrent 包下的容器都是安全失败,可以在多线程下并发使用,并发修改。
21、CAS
CAS 是 compare and swap 的缩写,即比较交换。cas 是一种基于锁的操作,而且是乐观锁。在Java 中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加 version 来获取数据,性能较悲观锁有很大的提高。
CAS 的思想很简单,三个参数,一个当前内存值 V、旧的预期值 A、即将更新的值 B,当且仅当预期值 A 和内存值 V 相同时,将内存值修改为 B 并返回 true,否则什么都不做,并返回 false。(在 zookeeper 中版本号设计遵循 cas,很多地方都依赖,比如原子类、zk、AQS)
ABA 问题:
如果一个变量 V 初次读取的时候是 A 值,并且在准备赋值的时候检查到它仍然是 A 值,那我们就能说它值没有被其他线程改变过吗?如果在这段期间它的值曾被改成 B,后来又改成了 A,那么 CAS 操作就会误认为它没有改变过,这个漏洞称为 ABA 问题。解决的核心思想是加上时间戳来标识不同阶段的数值。比如 JUC 包为了解决这个问题,提供了一个带有标记的原子引用类“AtomicStampedReference”,它可以通过控制变量值的版本来保证 CAS 的正确性,如果需要解决 ABA 问题,改用传统的互斥同步(典型的就是 synchronized 和 Lock)可能会比原子类更高效。
22、AQS(AbstractQueuedSynchronizer)原理
- AQS 核心实现思想
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。该队列由一个一个的 Node 结点组成,每个 Node 结点维护一个 prev 引用和 next 引用,分别指向自己的前驱和后继结点。AQS 维护两个指针,分别指向队列头部 head 和尾部 tail。
本质就是个双端双向链表。
当线程获取资源失败(比如 tryAcquire 时试图设置 state 状态失败),会被构造成一个结点加入 CLH 队列中,同时当前线程会被阻塞在队列中(通过 LockSupport.park 实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。
-
AQS 源码层面分析
(1)AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。private volatile int state; // 共享变量,使用 volatile 修饰保证线程可见性
(2)状态信息通过 procted 类型的 getState,setState,compareAndSetState 进行操作
// 返回同步状态的当前值 protected final int getState() { return state; } // 设置同步状态的值 protected final void setState(int newState) { stae = newState; } // 原子(CAS 操作)地将同步状态值设置为给定值 update 如果当前同步状态的值等于 expect(期望值) protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
AQS 使用了模板设计模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:
isHeldExclusively(); // 该线程是否正在独占资源。只有用到 condition 才需要去实现它。 tryAcquire(int); // 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。 tryRelease(int); // 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。 tryAcquireShared(int); // 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且由剩余资源。 tryReleaseShared(int); // 共享方式。尝试释放资源,成功则返回 true,失败则返回 false。
(3)具体例子
以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state+1。此后,其他线程再 tryAcquire() 时就会失败,直到 A 线程 unlock() 到 state=0(即释放锁)为止,其他线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但是要注意,获取多少次就要释放多少次,这样才保证 state 是能回到零态的。 -
总结
AQS 是 JUC 中很多同步组件的构建基础,比如 ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue、FutureTask 等等皆是基于 AQS 的,简单来讲,它内部实现主要是状态变量 stat 和一个 FIFO 队列来完成,同步队列的头结点是当前获取到同步状态的结点,获取同步状态 state 失败的线程,会被构造成一个结点(或共享式或独占式)加入到同步队列尾部(采用自旋 CAS 来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头结点的后继结点,使其加入对同步状态的争夺中。
AQS 定义了顶层的处理实现逻辑,我们在使用 AQS 构建符合我们需要的同步组件时,只需要重写 tryAcquire、tryAcquireShared、tryRelease、tryReleaseShared 几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由 AQS 为我们完成了,这就是非常典型的模板方法的应用。AQS 定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。
23、JUC 同步器工具(CountDownlatch、CyclicBarrier、Semaphore)
CountDownLatch
-
概念
CountDownLatch 是一个同步工具类,用来协调多个线程之间的同步,或者说起线程之间的通信(而不是用作互斥的作用)。
CountDownLatch 能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为 0 时,表示所有的线程都已经完成一些任务,然后在 CountDownLatch 上等待的线程就可以恢复执行接下来的任务。 -
CountDownLatch 的用法
(1)某一线程在开始运行前等待 n 个线程执行完毕。将 CountDownLatch 的计数器初始化为 new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减 1 countdownLatch.countDown(),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
(2)实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch(1),将其计算器初始化为 1,多个线程在开始执行任务前首先 countdownlatch.await(),当主线程调用 countDown()时,计数器变为 0,多个线程同时被唤醒。public class Main { public static void main(String[] args) throws InterruptedException { // 总数是6,必须要执行任务的时候,再使用! CountDownLatch countDownLatch = new CountDownLatch(6); for (int i=1; i<=6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+" Go out"); countDownLatch.countDown(); // 数量-1 },String.valueOf(i)).start(); } countDownLatch.await(); // 等待计数器归零,然后再向下执行 System.out.println("Close Door"); } }
-
CountDownLatch 的不足
CountDownLatch 是一次性的,计算器的值只能在构造方法中初始化一次,之后没有人任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。
CyclicBarrier
-
概念
允许一组线程互相等待,直到到达某个公共屏障点。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 barrier。(例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。)public class Main { public static void main(String[] args) { /** * 等6个运动员上场比赛才开始 */ CyclicBarrier cyclicBarrier = new CyclicBarrier(6, ()->{ System.out.println("比赛开始!"); }); for (int i=1; i<=6; i++) { final int temp = i; new Thread(()->{ System.out.println("赛道:" + +Thread.currentThread().getName() + " ===> 运动员编号:" +temp); try { cyclicBarrier.await(); // 等待 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } } }
-
与 CountDownLatch 的区别
CountDownLatch | CyclicBarrier |
---|---|
减计数方式 | 加计数方式 |
计算为 0 时释放所有等待的线程 | 计数达到指定值时释放所有等待线程 |
计数为 0 时,无法重置 | 计数达到指定值时,计数置为 0 重新开始 |
调用 countDown() 方法计数减一,调用 await() 方法只进行阻塞,对计数没有任何影响 | 调用 await() 方法计数加 1,如果加 1后的值不等于构造方法的值,则线程阻塞 |
不可重复利用 | 可重复利用 |
Semaphore
-
概念
Semaphore 也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用 Semaphore 可以控制同时访问资源的线程个数,例如,实现了一个文件允许的并发访问数。多个共享资源互斥的使用!并发限流,控制最大的线程数!public class Main { public static void main(String[] args) { // 线程数量,停车位!限流! Semaphore semaphore = new Semaphore(); for (int i=1; i<=6; i++) { new Thread(() -> { // acquire() 得到 try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "抢到车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "离开车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { semphore.release(); // release() 释放 } }, String.valueOf(i)).start(); } } }
-
Semaphore 的主要方法
void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
void release():释放一个许可,将其返回给信号量。
int availablePermits():返回此信号量中当前可用的许可数。
boolean hasQueuedThreads():查询是否有线程正在等待获取。