创建线程的四种方式
在java中,实现多线程主要有以下四种:
- 继承Thread类,重写run()方法;
- 实现Runnable接口,实现run()方法,并将Runnable实现类的实例作为Thread构造函数的参数target;
- 实现Callable接口,实现call()方法,然后通过FutureTask包装器来创建Thread线程;
- 通过ThreadPoolExecutor创建线程池,并从线程池中获取线程用于执行任务;
第1、2种方式无法获取线程的执行结果,因为通过重写的run()方法的返回值是void;第3种方式可以获取线程的执行结果,因为通过Callable接口的call()方法的返回值是Object,可以将返回的结果放在Object对象中;第4种方式对于两种情况都支持,具体取决于任务的类型,有返回值的任务必须实现Callable接口,无返回值的任务必须实现Runnable接口。
继承Thread类的方式
Thread实现了Runnable接口,代表一个线程的实例。启动线程的唯一方法就是通过Thread类的start()方法。start()方法是一个native方法,它将启动一个新线程,并执行run()方法。
这种方式实现多线程很简单,直接extends Thread,并重写run()方法,就可以启动新线程执行自己定义的run()方法。例如:
public class MyThread extends Thread {
public void run() {
System.out.println("MyThread.run()");
}
}
MyThread myThread1 = new MyThread();
MyThread myThread2 = new MyThread();
myThread1.start();
myThread2.start();
通过实现Runnable接口
通过实现Runnable接口,实现run()方法,将Runnable接口的实现类的实例作为Thread的带参构造函数中,并通过调用start()方法启动线程,如下:
public class ThreadDemo02 {
public static void main(String[] args){
System.out.println(Thread.currentThread().getName());
Thread t1 = new Thread(new MyThread());
t1.start();
}
}
class MyThread implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"-->我是通过实现接口的线程实现方式!");
}
}
实现Runnable接口比继承Thread类所具有的优势主要有:
- 可以避免JAVA中单继承的限制
- 线程池只能放入实现Runnable或Callable类线程,不能直接放入继承Thread的类
- 代码可以被多个线程共享,代码和数据独立,适合多个相同的程序代码的线程去处理同一个资源的情况
实现Callable接口,并通过FutureTask包装器来创建Thread线程
- 实现Callable接口,并实现call()方法;
- 创建Callable接口的实现类的实例,使用FutureTask类包装Callable对象,该FutureTask对象封装了Callable对象的call()方法的返回值;
- 使用FutureTask对象作为Thread类的构造函数的target参数创建并启动线程;
- 调用FutureTask对象的get()来获取子线程执行结束的返回值;
public class ThreadDemo03 {
public static void main(String[] args) {
Callable<Object> oneCallable = new Tickets<Object>();
FutureTask<Object> oneTask = new FutureTask<Object>(oneCallable);
Thread t = new Thread(oneTask);
System.out.println(Thread.currentThread().getName());
t.start();
}
}
class Tickets<Object> implements Callable<Object>{
//重写call方法
@Override
public Object call() throws Exception {
System.out.println(Thread.currentThread().getName()+"-->我是通过实现Callable接口通过FutureTask包装器来实现的线程");
return null;
}
}
使用ThreadPoolExecutor创建线程池
使用ThreadPoolExecutor创建线程池,并从线程池中获取线程用于执行任务。在JUC中,Executor框架已经实现了几种线程池,以Executor的newFixedThreadPool来作为Demo的展示。
import java.util.concurrent.*;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
/**
* 有返回值的线程
*/
@SuppressWarnings("unchecked")
public class Test {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
System.out.println("----程序开始运行----");
Date date1 = new Date();
int taskSize = 5;
// 创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(taskSize);
// 创建多个有返回值的任务
List<Future> list = new ArrayList<Future>();
for (int i = 0; i < taskSize; i++) {
Callable c = new MyCallable(i + " ");
// 执行任务并获取Future对象
Future f = pool.submit(c);
// System.out.println(">>>" + f.get().toString());
list.add(f);
}
// 关闭线程池
pool.shutdown();
// 获取所有并发任务的运行结果
for (Future f : list) {
// 从Future对象上获取任务的返回值,并输出到控制台
System.out.println(">>>" + f.get().toString());
}
Date date2 = new Date();
System.out.println("----程序结束运行----,程序运行时间【"
+ (date2.getTime() - date1.getTime()) + "毫秒】");
}
}
class MyCallable implements Callable<Object> {
private String taskNum;
MyCallable(String taskNum) {
this.taskNum = taskNum;
}
public Object call() throws Exception {
System.out.println(">>>" + taskNum + "任务启动");
Date dateTmp1 = new Date();
Thread.sleep(1000);
Date dateTmp2 = new Date();
long time = dateTmp2.getTime() - dateTmp1.getTime();
System.out.println(">>>" + taskNum + "任务终止");
return taskNum + "任务返回运行结果,当前任务时间【" + time + "毫秒】";
}
}
ExecutorService、Callable、Future实际上都是属于Executor框架。线程池支持有返回结果和无返回结果的任务,有返回值的任务必须实现Callable接口,无返回值的任务必须实现Runnable接口。对于有结果的任务,执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了,但需要注意的是:get方法是阻塞的,如果线程未返回结果,那么get()方***一直等待,直到有结果返回或者超时。
参考:
线程池的创建及工作原理和Executor框架
线程池主要是为了解决新任务执行时,应用程序为任务创建一个新线程以及任务执行完毕时,销毁线程所带来的开销。通过线程池,可以在项目初始化时就创建一个线程集合,然后在需要执行新任务时重用这些线程而不是每次都新建一个线程,一旦任务已经完成了,线程回到线程池中并等待下一次分配任务,达到资源复用的效果。
线程池的主要优势有:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可扩展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
创建线程池
通过Executors创建线程池:
在JUC包中的Executors中,提供了一些静态方法,用于快速创建线程池,常见的线程池有:
-
newSingleThreadExecutor:创建一个只有一个线程的线程池,串行执行所有任务,即使空闲时也不会被关闭。可以保证所有任务的执行顺序按照任务的提交顺序执行。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
适用场景:需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程活动的应用场景。
-
newFixedThreadPool:创建一个固定线程数量的线程池(corePoolSize==maximunPoolSize,使用LinkedBlockingQueue作为阻塞队列)。初始化时线程数量为零,之后每次提交一个任务就创建一个线程,直到线程达到线程池的最大容量。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
适用场景:为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。
-
newCachedThreadPool:创建一个可缓存的线程池,线程的最大数量为Integer.MAX_VALUE。空闲线程会临时缓存下来,线程会等待60s若还是没有任务加入的话就会被关闭。
适用场景:适用于执行很多的短时间异步任务的小程序,或者是负载较轻的服务器。
-
newScheduledThreadPool:创建一个支持执行延迟任务或者周期性执行任务的线程池。
ThreadPoolExecutor构造函数参数的说明:
使用Executors创建的线程池,其本质都是通过不同的参数构造一个ThreadPoolExecutor对象,主要包含以下7个参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 省略...
}
-
corePoolSize:线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列workQueue中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
-
maximunPoolSize:线程池中允许的最大线程数。如果当前workQueue满了之后可以创建的最大线程数。
-
keepAliveTime:空闲线程的存活时间。
-
unit:keepAliveTime空闲线程存活时间的单位。
-
workQueue:阻塞队列,用来存放等待被执行的任务,且任务必须实现Runnable接口,在JDK中提供了如下阻塞队列:
ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务; LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene; SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene PriorityBlockingQuene:具有优先级的无界阻塞队列; DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。 LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。 LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。 -
ThreadFactory:线程工厂,主要用来创建线程,默认为正常优先级、非守护线程。
-
Handler:线程池拒绝任务时的处理策略。
不要使用Executors创建线程池:
阿里巴巴开发手册并发编程有一条规定:线程池不允许使用Executors创建,而是应该通过ThreadPoolExecutor的方式创建。主要是因为这样可以避免资源耗尽的风险,因为使用Executors返回线程池对象的弊端有:
- FixedThreadPool和SingleThreadPool允许的阻塞队列长度为Integer.MAX_VALUE,这样会导致堆积大量的请求,从而导致OOM;
- CachedThreadPool允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。所以创建线程池,最好是根据线程池的用途,然后自己创建线程池。
线程池执行策略

执行逻辑说明:
-
当客户端提交任务时,线程池先判断核心线程数是否小于corePoolSize,如果是,则创建新的核心线程数运行这个任务;
-
如果正在运行的线程数大于或等于corePoolSize,则判断workQueue队列是否已满,如果未满,则将任务放入workQueue中;
-
如果workQueue队列已经满了,则判断当前线程池中的线程数量是否大于maximunPoolSize,如果小于maximunPoolSize,则启动一个非核心线程来执行任务;
-
如果线程池中线程数量大于或等于maximunPoolSize,那么线程池会根据设定的拒绝策略,做出相应的措施。
ThreadPoolExecutor.AbortPolic(默认):抛出RejectedExecutionException异常; ThereadPoolExecutor.CallerRunsPolicy:在当前正在执行的线程的execute方法中运行被拒绝的任务。 ThreadPoolExecutor.DiscardOldestPoliy:丢弃workQueue中等待最长时间的任务,并将被拒绝的任务添加到队列之中。 ThreadPoolExecutor.DiscardPolicy:将直接丢弃此线程。 -
当一个线程完成任务时,它会从workQueue中获取下一个任务来执行。
-
当一个线程空闲超过keepAliveTime设定的时间时,线程池会判断,如果当前线程数大于corePoolSize,那么这个线程就会被停掉。所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
如何合理的配置线程池的大小?
一般是根据任务类型来配置线程池大小:
- 如果是CPU密集型:那么就意味着CPU是稀缺资源,这个时候通常不能通过增加线程数来提高计算能力,因为线程数量太多,会导致频繁的上下文切换,一般这种情况下,建议合理的线程数值=CPU+1,减少线程上下文的切换;
- 如果是IO密集型:说明需要较多的等待,因为IO操作并不占用CPU,大部分线程都阻塞,所以可以多配置线程数,让CPU处理更多的业务,线程数=CPU核数*(1+平均等待时间/平均工作时间)。参考值可以是N(CPU)核数×2。
当然这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
一般情况下配置有界队列,在一些可能会有爆发性增长的情况下使用无界队列。任务多时,使用非阻塞队列并使用CAS操作替代锁可以获得好的吞吐量。
Executor
Executor是一个用于任务执行和调度的框架,目的是将任务的提交过程与执行过程解耦,使得用户只需关注任务的定义和提交,而不需要关注具体如何执行以及何时执行;其中,最顶层是Executor接口,它只有一个用于执行任务的execute()方法。Executor框架主要由3大部分组成:
- 任务:实现Callable接口或Runnable接口的类,其实例就可以成为一个任务提交给ExecutorService去执行:其中Callable任务可以返回执行结果,Runnable任务无返回结果。
- 任务的执行:包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架的关键类ThreadPoolExecutor也实现了ExecutorService接口;
- 任务的异步计算结果:包括Future接口和实现Future接口的FutureTask类、ForkJoinTask类。
使用步骤:
把任务(如Runnable接口或Callable接口的实现类)提交(submit、execute)给线程池执行。线程执行完毕之后,会返回一个异步计算结果Future,然后调用Future的get()方法等执行结果即可,Future的get()方***导致主线程阻塞,直到任务执行完成。
其中Runnable任务无返回结果,Callable任务可以返回执行结果,Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
另外,还有一个Executors类,它是一个工具类,提供了创建 ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 对象的静态方法。
线程池中submit()和execute()方法有什么区别?
两个方法都可以向线程池提交任务,execute()方法的返回类型是void,它定义在Executor接口中,而submit()方法可以返回持有计算结果的Future对象,它定义在ExecutorService接口中,它扩展自Executor接口,其它线程池类向ThreadPoolExecutor和ScheduledThreadPoolExecutor都有这些方法。
Java线程模型:Java线程与操作系统线程的关系
Java线程的本质,其实就是操作系统中的线程,Java线程的实现是基于一对一的线程模型,通过语言级别层面程序去间接调用系统内核的线程模型,即在使用Java线程时,JVM是转而调用当前操作系统的内核线程来完成当前任务。内核线程就是由操作系统内核支持的线程,这种线程是由操作系统内核来完成线程切换,内核通过操作调度器进而对线程执行调度,并将线程的任务映射到各个处理器上。
由于编写的多线程程序属于语言层面的,程序不会直接去调用内核线程,取而代之的是一种轻量级的进程,也就是通常意义上的进程,由于每个轻量级进程都会映射到一个内核线程,因此可以通过轻量级进程调用内核线程,进而由操作系统内核将任务映射到各个处理器,这种轻量级进程与内核线程间一对一的关系就称为一对一的线程模型。
Java线程模型如下图所示,每个线程最终都会映射到CPU中进行处理,如果CPU存在多核,那么一个CPU将可以并行执行多个线程任务:

详情请阅读此博客:线程池详解
参考:
线程池的创建及工作原理 和 Executor 框架_张维鹏的博客-CSDN博客
线程的中断
对于线程的停止,通常情况下是不会去手动停止的,而是等待线程自然运行至结束,但在实际开发中,很多情况中需要提前手动来停止线程,比如程序中出现异常错误、使用者关闭程序等情况。如果不能很好地停止线程那么可能会导致各种问题,所以正确的停止线程是非常重要的,常见的中断线程的方式有以下几种:
使用Thread类的stop()方法来终止线程
Thread类的stop()方法虽然可以终止线程,但该方法已被表示为废弃方法,原因是stop()方法太过暴力,即使线程只执行一半,也会被强行终止,不能保证线程资源正确释放,线程不安全,从而产生不可预料的结果,因此不提倡使用。
根据volatile修饰的标志位判断线程是否需要中断
public static class ChangeObjectThread extends Thread
{
// 表示是否停止线程
private volatile boolean stopMe = true;
public void stopMe() {
stopMe = false;
}
@Override
public void run() {
while (!stopMe) {
System.out.println("I'm running");
}
}
}
在上面的代码里面,定义了一个标记变量stopME,用于标识线程是否需要退出,当stopMe()方法被调用时,stopMe就会被赋值为false,此时在代码里面的while(!stopMe)就会检测到这个改动,线程就退出了。
通过interrupt中断机制终止线程
该方式的核心就是通过interrupt()方法设置线程的中断标志位,并通过isInterrupt()/interrupted()方法监视并判断中断信息,当线程检测到为true时则说明接收到中断信息,此时需要被中断线程做相应的处理。但如何去响应这个中断信号,被中断线程有完全的自主权,也就是中断结果是死亡或是继续运行,取决于这个被中断线程本身的逻辑。
- Thread.interrupte():设置线程的中断标志位为true,表示被其它线程进行了中断操作,但它不会像stop()方法那样强制中断正在运行的线程,仅仅起到通知被停止线程的作用;而被中断线程,则需要通过监视自身的标志位是否被中断来进行响应,比如使用isInterrupted()或interrupted()方法来判断是否被中断;
- this.interrupted():测试当前线程是否已经中断。如果连续两次调用该方法,第一次返回true,第二次返回false,因为interrupted()方法具有清除状态的功能,它内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态;
- this.isInterrupted():测试线程是否已经中断,但是不会清除状态标识。
interrupt机制需要被中断线程去监视中断标志位是否发生变化并做处理,那么当线程处于阻塞状态,确切来说,线程被Object.wait、Thread.join()和Thread.sleep()三种方法之一阻塞时,应该如何处理呢?
其实interrupte()方法也是支持线程阻塞情况的,一旦在上面几种情况下,线程的中断状态被置为“”中断,就会抛出InterruptedException,从而提早地终结被阻塞状态,所以只需要捕获InterruptedException异常并对中断进行响应即可。不过在抛出InterruptedException之前,JVM会先将该线程的中断标志位复位,所以此时调用isInterrupted()方法将会返回false,但如果线程既没有被阻塞,又没有通过Interrupted()/isInterrupted()进行监视并做出相应的处理,此时调用interrupt()将不起作用。下面就针对两种interrupt()机制写两个例子:
使用interrupt()+isInterrupted()来中断线程:
public static void main(String[] args)
{
//创建 interrupt-1 线程
Thread thread = new Thread(() -> {
while (true) {
System.out.println(Thread.currentThread().getName() + "线程正在执行...");
if (Thread.currentThread().isInterrupted())
{
System.out.println("线程1 接收到中断信息,中断线程...中断标记:" + Thread.currentThread().isInterrupted());
Thread.currentThread().interrupted();
System.out.println("经过 Thread.interrupted() 复位后,中断标记:" + Thread.currentThread().isInterrupted());
break;
}
}
}, "interrupt-1");
//启动线程 1
thread.start();
//创建 interrupt-2 线程,用于设置线程1的中断状态
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("设置线程中断...." );
thread.interrupt();
},"interrupt-2").start();
}
使用interrupt()+InterruptedException来中断线程:
public static void main(String[] args)
{
//创建 interrupt-1 线程
Thread thread = new Thread(() -> {
while (true) {
System.out.println(Thread.currentThread().getName() + "线程1开始执行...");
//判断当前线程是否中断,
if (Thread.currentThread().isInterrupted())
{
System.out.println("线程1 接收到中断信息,中断线程...中断标记:" + Thread.currentThread().isInterrupted());
break;
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
//因为抛出InterruptedException异常后,会导致中断标志复位为false,所以再次设置线程的中断状态,也可以直接使用break中断线程
Thread.currentThread().interrupt();
//break;
}
}
System.out.println("线程1执行结果...");
}, "interrupt-1");
//启动线程 1
thread.start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("设置线程中断...." );
//用于设置线程1的中断状态
thread.interrupt();
}
从上面的执行结果可以证明,当线程处于阻塞状态时,也是可以感受到中断通知并抛出异常的,所以不用担心长时间休眠中线程感受不到中断。
对于线程的停止,最正确最优雅的方式就是通过interrupt()的方式来实现,但interrupt()仅起到通知的作用,对被停止的线程而言,它拥有完全的自主权,既可以立即停止,也可以选择一段时间后停止,也可以选择不停止。
JMM内存模型与volatile内存语义
Java内存模型是Java虚拟机定义的一种多线程访问Java内存各个变量的访问规范,主要围绕如何解决并发过程中的原子性、可见性、有序性这三个问题来解决线程的安全问题。

Java内存模型将内存分为了主内存和工作内存(也称为栈空间)。主内存存放所有的共享变量,所有线程都可以访问。每个线程都有自己的工作内存,存储了该线程使用到的变量的副本,线程对变量的所有操作都必须在自己的工作内存中完成,不能直接操作主存中的变量。操作时,首先将变量从主内存拷贝到自己的工作内存中,然后在自己的工作内存中对变量进行操作,操作完成后再将变量写回主存。不同的线程间也无法直接访问对方的工作内存的变量,线程间的变量值的传递必须通过主内存来完成。
- 原子性:指的是一个操作是不可中断的,即使是在多线程环境下,一个操作一旦开始就不会被其它线程影响。
- 可见性:指的是当一个线程修改了某个共享变量的值,其它线程能够马上得知这个修改的值。
串行程序不存在可见性问题,因为在任何一个操作中修改了某个变量的值,后序的操作中都能读取这个修改过的变量值。但在多线程环境中就不一定了,因为线程对共享变量的操作都是拷贝到各自的工作内存中进行操作后才写回到主内存中的,这就可能存在一个线程A修改了共享变量x的值,还未写回主内存,另外一个线程B又对主内存中同一个共享变量x进行操作,但此时A线程工作内存***享变量x对线程B来说并不可见,这种工作内存与主内存同步延迟现象就造成了可见性问题,另外指令重排序以及编译器优化也可能导致可见性问题。
- 有序性:对于多线程环境,因为程序编译成机器码指令后可能会出现指令重排序现象,重排后的指令与原指令的顺序未必一致,有可能出现乱序现象。
指令重排序:计算机在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序,在单线程条件下,指令重排序可以保证执行结果的一致性,但是在多线程条件下,这些重排优化可能会导致程序出现内存可见性问题,不能保证多线程间语义一致性。
原子性、可见性、有序性问题的解决措施:
- 原子性问题:除了JVM自身提供对基本数据类型读写操作的原子性外,对于方法级别或者代码级别的原子性操作,可以使用synchronize关键字或者重入锁ReentrantLock保证程序执行的原子性。
- 可见性问题:工作内存与主内存同步延迟现象导致的可见性问题,可以使用synchronize、lock或者volatile关键字解决。,它们都可以使一个线程修改后的变量立即对其它线程可见。
- 有序性问题:可以利用volatile、synchronize关键字解决。
happens-before原则内容如下:
程序顺序原则:即在一个线程内必须保证语义串行性,也就是说按照代码顺序执行。
锁规则:解锁(unlock)操作必然发生在后续的同一个锁的加锁(lock)之前,也就是说,如果对于一个锁解锁后,再加锁,那么加锁的动作必须在解锁动作之后(同一个锁)。
volatile 规则:volatile 变量的写,先发生于读,这保证了volatile变量的可见性,简单的理解就是,volatile 变量在每次被线程访问时,都强迫从主内存中读该变量的值,而当该变量发生变化时,又会强迫将最新的值刷新到主内存,任何时刻,不同的线程总是能够看到该变量的最新值。
线程启动规则:线程的 start() 方法先于它的每一个动作,即如果线程A在执行线程B的 start() 方法之前修改了共享变量的值,那么当线程B执行 start() 方法时线程A对共享变量的修改对线程B可见。
传递性:A先于B ,B先于C,那么A必然先于C
线程终止规则:线程的所有操作先于线程的终结,Thread.join() 方法的作用是等待当前执行的线程终止。假设在线程B终止之前,修改了共享变量,线程从线程B的 join() 方法成功返回后,线程B对共享变量的修改将对线程A可见。
线程中断规则:对线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过 Thread.interrupted() 方法检测线程是否中断。
对象终结规则:对象的构造函数执行,结束先于 finalize() 方法。
volatile内存语义
volatile的作用:
volatile是Java虚拟机提供的轻量级同步机制,是线程不安全的,volatile跟可见性和有序性有关,被volatile修饰的共享变量,具有以下两个作用:
-
保证不同线程对该变量操作的内存可见性:当变量被volatile修饰时,那么对它的修改会立刻刷新到主存,同时其它线程操作volatile变量时,JMM会把该线程对应的工作内存置为无效,那么该线程只能从主存中重新读取共享变量,保证读取到最新值。
通过synchronize和Lock也能够保证可见性,线程在释放锁之前,会把共享变量值都刷回主存,但是相比于volatile,synchronize和Lock的开销都更大。
-
禁止指令重排序。
内存屏障:
volatile在内存中的语义是通过内存屏障实现,即可见性和禁止重排序。把加入volatile关键字的代码和未加入volatile关键字的代码都生成汇编代码,会发现加入volatile关键字的代码会多出一个内存屏障指令,它是一个CPU指令。内存屏障提供了以下功能:
- 告诉编译器和处理器,重排序时不能把后面的指令重排序到内存屏障之前的位置,从而避免多线程环境下出现乱序执行现象。
- 保存某些变量的内存可见性,利用该特性实现volatile的内存可见性。
volatile的原子性:
volatile的两点内存语义能保证可见性和有序性,但是不能保证原子性:对单个volatile变量的读/写具有原子性,但是对于类似volatile这样的复合操作就无能为力了,要想保证原子性,只能借助于synchronize、lock或者并发包下的atomic的原子操作类了。
volatile使用场景:
-
状态量标记:这种对变量的读写操作,标记为volatile可以保证修改对线程立即可见,效率也比synchronize、Lock有一定的提升。
public class VolatileSafe { volatile boolean close; public void close(){ close=true; } public void doWork(){ while (!close){ System.out.println("safe...."); } } }对于boolean变量close值的修改属于原子性操作,因此可以通过使用volatile修饰变量close,使用该变量对其它线程立即可见,从而达到线程安全的目的。
-
单例模式的实现,典型的双重检查锁定(DCL):这是一种懒汉的单例模式,使用时才创建对象,而且为了避免初始化操作的指令重排序,给instance加上了volatile。指令重排序会导致,当一条线程访问的instance不为null时,但是实际上instance实例未必已初始化完成,也就造成了线程安全问题。
public class DoubleCheckLock { //禁止指令重排优化 private volatile static DoubleCheckLock instance; private DoubleCheckLock(){} public static DoubleCheckLock getInstance(){ //第一次检测 if (instance==null){ //同步 synchronized (DoubleCheckLock.class){ if (instance == null){ //多线程环境下可能会出现问题的地方 instance = new DoubleCheckLock(); } } } return instance; } }发生指令重排序时,有可能在对象初始化完成前就赋值完成了,在内存里面开辟了一片存储区域后直接返回内存的引用,这个时候还没有真正的初始化完对象。但是别的线程去判断instance==null发现不等于null,所以就直接拿去用了,其实这个对象是个半成品,那就有空指针异常了。
参考:
JMM内存模型与volatile内存语义_张维鹏的博客-CSDN博客
详细可阅读此博客:Volatile详解,太详细了 - Code2020 - 博客园 (cnblogs.com)
synchronized锁机制原理
线程安全是并发编程中的重要关注点,造成线程安全问题的主要原因有两点:一是存在共享数据(也称临界资源),而是存在多条线程共同操作共享数据。因此为了解决这个问题,可能需要这样一个方案,当存在多个线程操作共享数据时,需要保证同一时刻有且只有一个线程在操作共享数据,其它线程必须等到该线程处理完数据后再进行,这种方式叫互斥锁,即能达到互斥访问目的的锁,也就是说当一个共享数据被当前正在访问的线程加上互斥锁后,在同一个时刻,其它线程只能处于等待的状态,直到当前线程处理完毕释放该锁。
synchronized的作用:
synchronized通过当前线程持有对象锁,从而拥有访问权限,而其它没有持有当前对象锁的线程无法拥有访问权限,保证在同一时刻,只有一个线程可以执行某个方法或者某个代码块,从而保证线程安全。synchronized可以保证线程的可见性,synchronized属于隐式锁,锁的持有与释放都是隐式的,无需干预。synchronized最主要的三种应用方式:
- 修饰实例方法:作用于当前实例加锁,进入同步代码前要获得当前实例的锁
- 修饰静态方法:作用于当前类对象加锁,进入同步代码前要获得当前类对象的锁
- 修饰代码块:指定加锁对象,进入同步代码块前要获得给定对象的锁
synchronized作用于实例方法:
public class AccountingSync implements Runnable{
//共享资源(临界资源)
static int i=0;
/**
* synchronized 修饰实例方法
*/
public synchronized void increase(){
i++;
}
@Override
public void run() {
for(int j=0;j<1000000;j++){
increase();
}
}
public static void main(String[] args) throws InterruptedException {
AccountingSync instance=new AccountingSync();
Thread t1=new Thread(instance);
Thread t2=new Thread(instance);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
/**
* 输出结果:
* 2000000
*/
}
i++操作并不具备原子性,所以会产生线程安全问题。由以上代码可以看出,当一个线程正在访问一个对象的synchronized实例方法,那么其它线程不能访问该对象的其它synchronized方法,毕竟一个对象只有一把锁,当一个线程获取了该对象的锁之后,其它线程无法获取该对象的锁,所以无法访问该对象的synchronized实例方法,但是其它线程还是可以访问该实例对象的其它非synchronized方法。如果一个线程A需要访问实例对象obj1的synchronized方法f1(当前对象锁时obj1),另一个对象B需要访问实例对象obj2的synchronized方法f2(当前对象锁是obj2),这样是允许的,因为两个实例对象锁并不相同,此时如果两个线程操作数据并非共享的,线程安全是有保障的,遗憾的是如果两个线程操作的是共享数据,那么线程安全就有可能无法保证了,如下代码将演示出该现象:
public class AccountingSyncBad implements Runnable{
static int i=0;
public synchronized void increase(){
i++;
}
@Override
public void run() {
for(int j=0;j<1000000;j++){
increase();
}
}
public static void main(String[] args) throws InterruptedException {
//new新实例
Thread t1=new Thread(new AccountingSyncBad());
//new新实例
Thread t2=new Thread(new AccountingSyncBad());
t1.start();
t2.start();
//join含义:当前线程A等待thread线程终止之后才能从thread.join()返回
t1.join();
t2.join();
System.out.println(i);
}
}
上述代码与前面不同的是同时创建了两个新实例AccountingSyncBad,然后启动两个不同的线程对共享变量i进行操作,但操作结果是1452317而不是期望结果2000000。虽然使用synchronized修饰了increase方法,但却new了两个不同的锁,这也就意味着存在着两个不同的实例对象锁,因此t1和t2都会进入各自的对象锁,也就是说t1和t2线程使用的是不同的锁,因此线程安全是无法保证的。
解决这种困境的方式是将synchronized作用于静态的increase方法,这样的话,对象锁就是当前类对象,因此无论创建多少个实例对象,但对于类对象只有一个,所以在这样的情况下对象锁就是唯一的。下面将synchronized作用于静态increase方法。
synchronized作用于静态方法:
当synchronized作用于静态方法,其锁就是当前类的class对象锁。由于静态成员不专属于任何一个实例对象,是类成员,因此通过Class对象锁可以控制静态成员的并发操作。需要注意的是如果一个线程A调用一个实例对象的非static synchronized方法,而线程B需要调用这个实例对象所属类的静态synchronized方法,是允许的,不会发生互斥现象,因为访问静态synchronized方法占用的锁是当前类的Class对象,而访问非静态synchronized方法占用的锁是当前实例对象锁,看如下代码:
public class AccountingSyncClass implements Runnable{
static int i=0;
/**
* 作用于静态方法,锁是当前class对象,也就是
* AccountingSyncClass类对应的class对象
*/
public static synchronized void increase(){
i++;
}
/**
* 非静态,访问时锁不一样不会发生互斥
*/
public synchronized void increase4Obj(){
i++;
}
@Override
public void run() {
for(int j=0;j<1000000;j++){
increase();
}
}
public static void main(String[] args) throws InterruptedException {
//new新实例
Thread t1=new Thread(new AccountingSyncClass());
//new新实例
Thread t2=new Thread(new AccountingSyncClass());
//启动线程
t1.start();t2.start();
t1.join();t2.join();
System.out.println(i);
}
}
synchronized同步代码块:
除了使用关键字修饰实例方法和静态方法外,还可以使用同步代码块,在某些情况下,我们编写的方法体可能比较大,同时存在一些比较耗时的操作,而需要同步的代码又只有一小部分,如果直接对整个方法进行同步操作,可能会得不偿失,此时可以使用同步代码块的方式对需要同步的代码进行包裹,这样就无需对整个方法进行同步操作了,同步代码块的使用示例如下:
public class AccountingSync implements Runnable{
static AccountingSync instance=new AccountingSync();
static int i=0;
@Override
public void run() {
//省略其他耗时操作....
//使用同步代码块对变量i进行同步操作,锁对象为instance
synchronized(instance){
for(int j=0;j<1000000;j++){
i++;
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1=new Thread(instance);
Thread t2=new Thread(instance);
t1.start();t2.start();
t1.join();t2.join();
System.out.println(i);
}
}
从代码看出,将synchronized作用于一个给定的实例对象instance,即当前实例对象就是锁对象,每次当线程进入synchronized包裹的代码块时就会要求当前线程持有instance实例对象锁,如果当前有其它线程正持有该对象锁,那么新到的线程就必须等待,这样也就保证了每次只有一个线程执行i++操作。当然除了instance作为对象外,还可以使用this对象(代表当前实例)或者当前类的Class对象作为锁,如下代码:
//this,当前实例对象锁
synchronized(this){
for(int j=0;j<1000000;j++){
i++;
}
}
//class对象锁
synchronized(AccountingSync.class){
for(int j=0;j<1000000;j++){
i++;
}
}
synchronized底层语义原理
synchronized锁机制在Java虚拟机中的同步是基于进入和退出监视器锁对象monitor实现的(无论是显式同步还是隐式同步都是如此),每个对象的对象头都关联着一个monitor对象,当一个monitor被某个线程持有后,它便处于锁定状态。在HotSpot虚拟机中,monitor是由ObjectMonitor实现的,每个等待锁的线程都会被封装成ObjectWaiter,ObjectMonitor中有两个集合,_WaitSet和 _EntryList,用来保存ObjectWaiter对象列表,owner区域指向持有ObjectMonitor对象的线程。当多个线程同时访问同一段同步代码时,首先会进入 _EntryList集合尝试获取monitor,当线程获取到对象的monitor后进入 _Owner区域并把 _owner变量设置为当前线程,同时monitor中的计数器count加1;若线程调用wait()方法,将释放当前持有的monitor,count自减1,owner变量恢复为null,同时该线程进入 _WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor并复位变量的值,以便其它线程获取monitor。如下图所示:
_EntryList:存储处于Blocked状态的ObjectWaiter对象列表。
_WaitSet:存储处于wait状态的ObjectWaiter对象列表。

synchronized的显示同步与隐式同步:
synchronized分为显式同步(同步代码块)与隐式同步(同步方法),显式同步指的是有明确的monitorenter和monitorexit指令,而隐式同步并不是由monitorenter和monitorexit指令来实现同步的,而是由方法调用指令读取运行时常量池中方法的ACC_SYNCHRONIZED标志来隐式实现的。
synchronized代码块底层原理:
synchronized同步语句块的实现是显式同步的,通过monitorenter和monitorexit指令实现,其中monitorenter指令指向同步代码块的开始位置,monitorexit指令则指明同步代码块的结束位置,当执行monitorenter指令时,当前线程将尝试获取objectref(对象锁)所对应的monitor的持有权:
- 当对象锁的monitor的进入计数器为0,那线程可以成功取得monitor,并将计数器设置为1,取锁成功。
- 如果当前线程已经拥有对象锁的monitor的持有权,那它可以重入这个monitor,,重入时计数器的值也会加1。
- 若其它线程已经拥有对象锁的monitor的所有权,那当前线程将被阻塞,直到正在执行线程执行完毕,即monitor指令被执行,执行线程将释放monitor并设置计数器值为0,其它线程将有机会持有monitor。
编译器会确保无论方法通过何种方式完成,无论是正常结束还是异常结束,代码中调用过的每条monitorenter指令都有执行器对应monitorexit指令。为了确保在方法异常完成时,monitorenter和monitorexit指令依然可以正确配对执行,编译器会自动产生一个异常处理器可处理所有的异常,它的目的就是用来执行monitorexit指令。

synchronized方法底层原理:
synchronized同步方法的实现是隐式的,无需通过字节码指令来控制,它是在方法调用和返回操作之中实现。JVM可以通过方法常量池中的方法表结构中的ACC_SYNCHRONIZED访问标志判断一个方法是否同步方法。当方法调用时,调用指令将会检查方法的ACC_SYNCHRONIZED访问标志是否被设置,如果设置了,表示该方法是一个同步方法,执行线程将先持有monitor,然后再执行方法,最后在方法完成(无论是正常完成还是非正常完成)时释放monitor。在方法执行期间,执行线程持有了monitor,其它任务线程都无法再获得同一个monitor。
如果一个同步方法执行期间抛出了异常,并且在方法内部无法处理此异常,那这个同步方法所持有的monitor将在异常抛出到同步方法之外时自动释放。

JVM对synchronized锁的优化
在早期版本中,synchronized属于重量级锁,效率低下,因为监视器锁monitor是依赖于操作系统的Mutex互斥量来实现的,操作系统实现线程之间的切换时需要从用户态转换到核心态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高。在JDK6之后,synchronized在JVM层面做了优化,减少锁的获取和释放所带来的性能消耗,主要优化方向有以下几点:
锁升级:偏向锁->轻量级锁->自旋锁->重量级锁
锁的状态总共有四种,无锁状态、偏向锁、轻量级锁和重量级锁。随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级到重量级锁,但是锁的升级是单向的,只能从低到高升级,不会出现锁的降级。重量级锁基于操作系统的互斥量实现的,而偏向锁与轻量级锁不同,它们是通过CAS并配合Mark Word一起实现的。
Mark Word标志位:

Mark Word记录了对象的hashCode、分代年龄、锁标记位相关的信息,由于对象头的信息是与对象自身定义的数据没有关系的额外存储成本,因此考虑到JVM的空间效率,Mark Word被设计成为一个非固定的数据结构,以便存储更多有效的数据,它会根据对象本身的状态复用自己的存储空间。
锁升级过程
为什么要引入偏向锁?
因为经过HotSpot的作者大量的研究发现,大多数时候是不存在锁竞争的,常常是一个线程多次获得同一个锁,因此如果每次都要竞争锁会增大很多没有必要付出的代价,为了降低获取锁的代价,才引入的偏向锁。
偏向锁的升级:
当线程1访问代码块并获取锁对象时,会在java对象头和栈帧中记录偏向的锁的ThreadID,因为偏向锁不会主动释放锁,因此以后线程1再次获取锁的时候,需要比较当前线程的threadID和锁对象的对像头中的threadID是否一致,如果一致(还是线程1获取锁对象),则无需使用CAS来加锁、解锁;如果不一致(其它线程,如线程2要竞争锁对象,而偏向锁不会主动释放因此还是存储的线程1的threadID),那么需要查看锁对象中记录的线程1是否存活,如果没有存活,那么锁对象被重置为无锁状态,其它线程(线程2)可以竞争将其设置为偏向锁;如果存活,那么立刻查找线程1的栈帧信息,如果还是需要继续持有这个锁对象,那么暂停当前线程1,撤销偏向锁,升级为轻量级锁;如果线程1不再使用该锁对象,那么将锁对象状态设为无锁状态,重新偏向新的线程。
为什么要引入轻量级锁?
轻量级锁针对的是竞争锁对象的线程不多,而且线程持有锁的时间也不长的场景。因为阻塞线程需要CPU从用户态转到内核态,代价较大,如果刚刚阻塞线程之后锁就被释放了,那这个代价就有点得不偿失了,因此这个时候就干脆不阻塞这个线程,让它先等待锁释放。
轻量级锁什么时候升级为重量级锁?
线程1获取轻量级锁时会先把锁对象的对象头Mark Word复制一份到线程1的栈帧中创建的用于存储锁记录的空间,然后使用CAS把锁对象的对象头中的内容替换为线程1存储的锁记录的地址;
如果在线程1复制(对象头指的都是锁对象的对象头)对象头的同时(在线程1CAS之前),线程2也准备获取锁,复制了对象头到线程2的锁记录空间中,但是在线程2CAS的时候,发现线程1已经把对象头换了,线程2的CAS失败,那么线程2就尝试使用自旋锁来等待线程1释放锁。
但是如果自旋的时间太长也不行,因为自旋的次数是有限制的,比如10次或者100次,如果自旋次数到了,线程1还没有释放锁,或者线程1还在执行,线程2还在自旋等待,这时又有一个线程3来竞争这个锁对象,那么这个时候轻量级锁就会膨胀为重量级锁。重量级锁把除了拥有锁的线程都阻塞,防止CPU空转。
**自适应自旋锁:**自适应自旋锁解决的是“锁竞争时间不确定”的问题,自适应意味着自旋的时间不再固定了,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。
- 如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也很有可能再次成功,进而它将允许自旋等待持续相对更长的时间,比如100个循环。
- 相反的,如果对于某个锁,自旋很少成功获得过,那在以后要获取这个锁时将可能减少自旋时间甚至省略自旋过程,以避免浪费处理器资源。
但自旋锁带来的副作用就是不公平的锁机制:处于阻塞状态的线程,并没有立刻竞争被释放的锁。然而,处于自旋状态的线程,则很有可能优先获得这把锁。
锁消除:
锁消除属于编译器对锁的优化,JIT编译时会使用逃逸分析技术,通过对运行上下文的扫描,取出不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间。
锁粗化:
JIT编译器动态编译时,如果发现几个相邻的同步块使用的是同一个锁实例,那么JIT编译器将会把这几个同步块合并为一个大的同步块,从而避免一个线程“反复申请、释放同一个锁”所带来的性能开销。
偏向锁的废除:
在JDK6中引入的偏向锁能够减少竞争锁定的开销,使得JVM的性能得到了显著改善,但是JDK15却决定将偏向锁禁用,并在以后删除它。主要有以下几个原因:
- 为了支持偏向锁使得代码复杂度大幅度提升,并且对HotSpot的其它组件产生了影响,这种复杂性已成为理解代码的障碍,也阻碍了对同步系统进行重构。
- 在更高的JDK版本中针对多线程场景推出了性能更高的并发数据结构,所以过去看到的性能提升,在现在看来已经不那么明显了。
- 围绕线程池队列和工作线程构建的应用程序,性能通常在禁用偏向锁的情况下变的更好。
参考:
- synchronized锁机制原理 与 Lock锁机制_张维鹏的博客-CSDN博客
- Synchronized关键字和锁升级,详细分析偏向锁和轻量级锁的升级_tongdanping的博客-CSDN博客
- 深入理解Java并发之synchronized实现原理_zejian_的博客-CSDN博客
Lock接口
synchronized属于隐式锁,即锁的持有与释放都是隐式的,无需人为干预,而显式锁,即锁的持有和释放都必须由我们手动编写。在Java1.5中,官方在Concurrent并发包中加入了Lock接口,该接口中提供了lock()方法和unLock()方法对显式加锁和释放锁操作进行支持,简单了解一下代码编写如下:
Lock lock = new ReentrantLock();
lock.lock();
try{
//临界区......
}finally{
lock.unlock();
}
正如代码所显示(ReentrantLock是Lock的实现类),当前线程使用lock()方法与unlock()对临界区进行包围,其它线程由于无法持有锁将无法进入临界区直到当前线程释放锁,注意unlock()操作必须在finally代码块中,这样可以确保即使临界区执行抛出异常,线程最终也能正常释放锁。Lock接口还提供了一下相关方法:
public interface Lock {
//加锁
void lock();
//解锁
void unlock();
//可中断获取锁,与lock()不同之处在于可响应中断操作,即在获
//取锁的过程中可中断,注意synchronized在获取锁时是不可中断的
void lockInterruptibly() throws InterruptedException;
//尝试非阻塞获取锁,调用该方法后立即返回结果,如果能够获取则返回true,否则返回false
boolean tryLock();
//根据传入的时间段获取锁,在指定时间内没有获取锁则返回false,如果在指定时间内当前线程未被中断并获取到锁则返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//获取等待通知组件,该组件与当前锁绑定,当前线程只有获得了锁
//才能调用该组件的wait()方法,而调用后,当前线程将释放锁。
Condition newCondition();
重入锁ReentrantLock
JDK1.5新增的类,实现了Lock接口,作用与synchronized关键字相当,但比synchronized更加灵活。ReentrantLock本身也是一种支持重进入的锁,即该锁可以支持一个线程对资源重复加锁,同时也支持公平锁和非公平锁。
所谓公平与非公平指的是在请求先后顺序上,先对锁进行请求的就一定先获取到锁,那么这就是公平锁。反之,如果对于锁的获取并没有时间上的先后顺序,如后请求的线程可能先获取到锁,这就是非公平锁,一般而言,非公平锁机制的效率往往会胜过公平锁机制,但在某些场景下,可能更注重时间先后顺序,那么公平锁自然是很好的选择。需要注意的是ReentrantLock支持对同一线程重复加锁,但是加锁多少次,就必须解锁多少次,这样才可以成功释放锁。下面看看ReentrantLock的简单使用案例:
import java.util.concurrent.locks.ReentrantLock;
public class ReenterLock implements Runnable{
public static ReentrantLock lock=new ReentrantLock();
public static int i=0;
@Override
public void run() {
for(int j=0;j<10000000;j++){
lock.lock();
//支持重入锁
lock.lock();
try{
i++;
}finally{
//执行两次解锁
lock.unlock();
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock tl=new ReenterLock();
Thread t1=new Thread(tl);
Thread t2=new Thread(tl);
t1.start();t2.start();
t1.join();t2.join();
//输出结果:20000000
System.out.println(i);
}
}
实际上ReentrantLock是基于AQS并发框架实现的。
AQS抽象队列同步器原理
AQS(Abstract Queued Synchronizer)抽象队列同步器,是JUC中实现锁及同步组件的基础。工作原理就是如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就将获取不到锁的线程加入到等待队列中。这时,就需要一套线程阻塞等待以及被唤醒时的锁分配机制,而AQS是通过CLH队列实现锁分配的机制。
CLH同步队列的模型:
CLH队列是由内部类Node构成的同步队列,是一个双向队列(不存在队列实例,仅存在节点之间的关联关系),将请求共享资源的线程封装成Node节点来实现锁的分配;同时利用内部类ConditionObject构建等待队列,当调用ConditionObject的await()方法后,线程将会加入等待队列中,当调用ConditionObject的signal()方法后,线程将从等待队列移动到同步队列中进行锁竞争。AQS中只能存在一个同步队列,但可拥有多个等待队列。AQS的CLH同步队列的模型如下图:

AQS有三个主要变量,分别是head、tail、state,其中head指向同步队列的头部,注意head为空结点,不存储信息。而tail则是同步队列的队尾,同步队列采用的是双向链表的结构是为了方便对队列进行查找操作。当Node节点被设置为head后,其thread信息和前驱结点将被清空,因为该线程已获取到同步状态,已经在执行了,也就没有必要存储相关信息了,head只保存后继结点的指针即可,便于head节点释放同步状态后唤醒后继结点。
队列的入队和出队操作都是无锁操作,基于CAS+自旋锁实现,AQS维护了一个volatile修饰的int类型的state同步状态,volatile保证线程之间的可见性,并通过CAS对该同步状态进行原子操作、实现对其值的修改。当state=0时,表示没有任何线程占有共享资源的锁,当state=1时,则说明当前有线程正在使用共享变量,其它线程必须加入同步队列进行等待;
内部类Node数据结构分析:
static final class Node {
//共享模式
static final Node SHARED = new Node();
//独占模式
static final Node EXCLUSIVE = null;
//标识线程已处于结束状态
static final int CANCELLED = 1;
//等待被唤醒状态
static final int SIGNAL = -1;
//条件状态
static final int CONDITION = -2;
//在共享模式中使用表示获得的同步状态会被传播
static final int PROPAGATE = -3;
//等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种取值
volatile int waitStatus;
//同步队列中前驱结点
volatile Node prev;
//同步队列中后继结点
volatile Node next;
//请求锁的线程
volatile Thread thread;
//等待队列中的后继结点,这个与Condition有关,稍后会分析
Node nextWaiter;
//判断是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//.....
}
AQS分为两种模式:独占模式EXCLUSIVE和共享模式SHARED,所谓共享模式是一个锁允许多条线程同时操作,如semaphore、CountDownLatch采用的就是基于AQS的共享模式实现的,而独占模式则是同一个时间段只能有一个线程对共享资源进行操作,多余的请求线程需要排队等待,如ReentrantLock、CyclicBarrier是基于独占模式实现的。
变量waitStatus表示当前封装成Node节点的线程的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE:
- CANCELLED:值为1,表示在同步队列中的线程等待超时或者被中断,处于已结束状态,需要从同步队列中移除Node节点。
- SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
- CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其它线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE:值为-3,在共享模式下使用,表示该线程以及后继线程进行无条件传播。前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
总之,AQS作为基础组件,对于锁的实现存在两种不同的模式,即共享模式(Semaphore)和独占模式(ReentrantLock),无论是共享模式还是独占模式的实现类,其内部都是基于AQS实现的,也都维持着一个虚拟的同步队列,当请求锁的线程超过现有的模式的限制时,会将线程包装成Node结点并将线程当前必要的信息存储到Node结点中,然后加入同步队列等待获取锁,而这系列操作都由AQS协助完成,这也是作为基础组件的原因,无论是Semaphore还是ReentrantLock,其内部绝大多数方法都是间接调用AQS完成的,下面是AQS整体类图结构:
这里以ReentrantLock为例,简单讲解ReentrantLock与AQS的关系
ReentrantLock内部存在3个实现类,分别是sync、NonfairSync、FairSync,其中Sync继承自AQS实现了解锁tryRelease()方法,而NonfairSync、FairSync则继承自Sync,实现了获取锁的tryAcquire()方法,ReentrantLock的所有方法调用都通过间接调用AQS和Sync类及其子类来完成的。
从上述类图可以看出AQS是一个抽象类,但请注意其源码中并没有抽象的方法,这是因为AQS只是作为一个基础组件,并不希望直接作为直接操作类对外输出,而更倾向于作为基础组件,为真正的实现类提供基础设施,如构建同步队列,控制同步状态等。事实上,从设计模式角度来看,AQS是基于模板方法模式设计的,其内部处理提供并发操作核心方法以及同步队列操作外,还提供了一些模板方法让子类自己实现,如加锁操作以及解锁操作,为什么这么做?
这是因为AQS作为基础组件,封装的是核心并发操作,但是实现上分为两种模式,即共享模式与独占模式,而这两种模式的加锁与解锁实现方式是不一样的,但AQS只关注内部公共方法实现并不关心外部不同模式的实现,所以提供了模板方法给子类使用,也就是说实现独占锁,如ReentrantLock需要自己实现tryAcquire()方法和tryRelease()方法,而实现共享模式的Semaphore,则需要实现tryAcquireShared()方法和tryReleaseShared()方法。这样做的好处是无论是共享模式还是独占模式,其基础的实现都是同一套组件AQS,只不过是加锁解锁的逻辑不同罢了。更重要的是如果需要自定义锁的话,也变得非常简单,只需要选择不同的模式实现不同的加锁和解锁的模板方法即可,AQS提供给独占模式和共享模式的模板方法如下:
//AQS中提供的主要模板方法,由子类实现。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
//独占模式下获取锁的方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//独占模式下解锁的方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//共享模式下获取锁的方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//共享模式下解锁的方法
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//判断是否为持有独占锁
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
}
AQS抽象类中已经实现了线程在等待队列的维护方式(如获取资源失败入队/唤醒出队等),而对于具体共享资源的获取与释放(也就是锁的获取和释放)则交由具体的同步器来完成。
ReentrantLock中独占模式下非公平锁的获取流程:
获取独占锁的过程是定义在tryAcquire()中的,当前线程尝试获取同步状态,如果获取失败,就将线程封装成Node节点插入到CLH同步队列中。插入同步队列后,线程并没有放弃同步状态,而是根据前置节点状态判断是否继续获取,如果前置节点是head节点,继续尝试获取,否则就将线程挂起。如果成功获取同步状态则将自己设置为head节点。当持有同步状态的线程释放资源后,也会唤醒队列中的后继线程。

详情请阅读此博客:深入剖析基于并发AQS的(独占锁)重入锁(ReetrantLock)及其Condition实现原理_zejian_的博客-CSDN博客
ConditionObject阻塞队列
AQS的阻塞队列是基于内部类ConditionObject实现的,而ConditionObject实现了Condition接口。Condition主要用于线程的等待和唤醒,能够精细的控制多线程的休眠与唤醒,具备更多的灵活性,通过多个Condition实例对象建立不同的等待队列,从而实现同一个锁拥有多个等待队列。而synchronized关键字只能有一组等待唤醒队列,使用notify()唤醒线程时只能随机唤醒队列的一个线程。
ConditionObject阻塞队列实现原理:
Condition的具体实现之一是AQS的内部类ConditionObject,每个Condition都对应着一个等待队列,也就是说如果一个锁上创建了多个Condition对象,那么也就存在多个等待队列。当调用ConditionObject的await()方法后,线程将会加入等待队列中,当调用ConditionObject的signal()方法后,线程将从等待队列转移到同步队列中进行锁竞争。AQS的ConditionObject中的等待队列模型如下:

AQS的线程唤醒机制原理:
AQS的线程唤醒是通过signal()方法实现的,signal()方法线程唤醒的流程图如下:

signal()方法主要调用了doSignal(),而此方法中做了两件事:
- 从条件等待队列中移除被唤醒的节点,然后重新维护条件等待队列的firstWaiter和lastWaiter的指向。
- 将从等待队列移除的节点加入同步队列中,如果进入到同步队列失败并且条件等待队列还有不为空的节点,则继续循环唤醒后继其它节点的线程。
注意:无论是同步队列还是等待队列,使用Node数据结构都是同一个,不过是使用的内部变量不同而已。
所以signal()的流程可以概述为:
- signal()被调用后,先判断当前线程是否持有独占锁
- 如果有,那么唤醒当前Condition等待队列的第一个结点的线程,并从等待队列中移除该结点,添加到同步队列中
- 如果加入同步队列失败,那么继续唤醒等待队列中的其它节点的线程
- 如果成功加入同步队列,那么如果其前驱结点已结束或者设置前驱结点状态为Node.SIGNAL失败,则通过LockSupport.unpark() 唤醒被通知节点代表的线程
到此signal()任务完成,被唤醒后的线程,将调用AQS的acquireQueud()方法加入获取同步状态的竞争中,这就是等待唤醒机制的整个流程实现原理。
参考:
CountDownLatch同步器详解:并发编程之CountDownLatch原理与应用 - 掘金 (juejin.cn)
CyclicBarrier同步器详解:并发编程之CyclicBarrier原理与使用 - 掘金 (juejin.cn)
CountDownLatch和CyclicBarrier同步器的区别:【对线面试官】CountDownLatch和CyclicBarrier的区别 - 掘金 (juejin.cn)
Semaphore同步器详解:并发编程之Semaphore原理与应用 - 掘金 (juejin.cn)
ThreadLocal原理总结
ThreadLocal提供了线程内部的局部变量,当在多线程环境中使用ThreadLocal维护变量时,会为每个线程生成该变量的副本,每个线程只操作自己线程中的变量副本,不同线程间的数据相互隔离、互不影响,从而保证了线程的安全。
ThreadLocal适用于无状态,副本变量独立后不影响业务逻辑的高并发场景,如果业务逻辑强依赖于变量副本,则不适合用ThreadLocal解决,需要另寻解决方案。
ThreadLocal的数据结构
在JDK8中,每个线程Thread内部都维护了一个ThreadLocalMap的数据结构,ThreadLocalMap中有一个由内部类Entry组成的table数组,Entry的key就是线程的本地化对象ThreadLocal,而value则存放了当前线程所操作的变量副本。每个ThreadLocal只能保存一个副本value,并且各个线程的数据互不干扰,如果想要一个线程保存多个副本变量,就需要创建多个ThreadLocal。
一个ThreadLocal的值,会根据线程的不同,分散在N个线程中,所以获取ThreadLocal的value,有两个步骤:
- 第一步:根据线程获取ThreadLocalMap
- 第二步,根据自身从ThreadLocalMap中获取值,所以它的this就是Map的key
当执行set()方法时,其值是保存在当前线程的ThreadLocal变量副本中,当执行get()方法时,是从当前线程的ThreadLocal的变量副本获取。所以对于不同的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本值,形成了线程的隔离,互不干扰。

ThreadLocal的核心方法
set()方法:
// 设置当前线程对应的ThreadLocal值
public void set(T value) {
Thread t = Thread.currentThread(); // 获取当前线程对象
ThreadLocalMap map = getMap(t);
if (map != null) // 判断map是否存在
map.set(this, value);
// 调用map.set 将当前value赋值给当前threadLocal。
else
createMap(t, value);
// 如果当前对象没有ThreadLocalMap 对象。
// 创建一个对象 赋值给当前线程
}
// 获取当前线程对象维护的ThreadLocalMap
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
// 给传入的线程 配置一个threadlocals
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
获得当前线程,根据当前线程获得map。如果map不为空,则将参数设置到map中,当前的ThreadLocal作为key。如果map为空,则给该线程创建map,设置初始值。
get()方法:
public T get() {
Thread t = Thread.currentThread();//获得当前线程对象
ThreadLocalMap map = getMap(t);//线程对象对应的map
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);// 以当前threadlocal为key,尝试获得实体
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 如果当前线程对应map不存在
// 如果map存在但是当前threadlocal没有关连的entry。
return setInitialValue();
}
// 初始化
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
执行流程:先尝试获得当前线程,再根据当前线程获取对应的map。如果获得的map不为空,以当前ThreadLocal为key尝试获得entry。如果entry不为空,返回其值。如果上述步骤无法进行,则通过initialValue函数获得初始值,然后给当前线程创建新map。
remove()方法:
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
首先尝试获取当前线程,然后根据当前线程获得map,从map中尝试删除entry。
initialValue()方法:
protected T initialValue() {
return null;
}
如果没有调用set()直接get(),则会调用此方法,该方法只会被调用一次,默认返回一个默认值null,如果不想返回null,可以Override进行覆盖。
ThreadLocal的哈希冲突解决方法
和HashMap不同,ThreadLocalMap结构中没有next引用,也就是说ThreadLocalMap中解决哈希冲突的方式并非链表的方式,而是采用线性探测的方式,当发生哈希冲突时就将步长加1或减1,寻找下一个相邻的位置。如下图所示:

根据ThreadLocal对象的hash值,定位到table中的位置i;如果当前位置是null,就初始化一个Entry对象放在位置i;如果位置i已经有Entry对象了,如果这个Entry对象的key与即将设置的key相同,那么重新设置Entry的value;如果位置i的Entry对象和即将设置的key不同,那么寻找下一个空位置。
具体源码如下:
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);//计算索引位置
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) { // 开放定址法解决哈希冲突
ThreadLocal<?> k = e.get();
//直接覆盖
if (k == key) {
e.value = value;
return;
}
if (k == null) {// 如果key不是空value是空,垃圾清除内存泄漏防止。
replaceStaleEntry(key, value, i);
return;
}
}
// 如果ThreadLocal对应的key不存在并且没找到旧元素,则在空元素位置创建个新Entry
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
// 环形数组 下一个索引
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
ThreadLocal的内存泄漏
在使用ThreadLocal时,当使用完变量后,必须手动调用remove()方法删除Entry对象,否则会造成value的内存泄漏,严格来说,ThreadLocal是没有内存泄漏问题的,之所以会出现内存泄漏,也是忘记执行remove()引起的,这是使用不规范导致的。
不过有些人认为ThreadLocal的内存泄漏是跟Entry中使用弱引用key有关,这个结论是不对的。ThreadLocal造成内存泄漏的根本原因并不是key使用弱引用,因为即使key使用强引用,也会造成Entry对象的内存泄漏,内存泄漏的根据原因在于ThreadLocalMap的生命周期与当前线程CurrentThread的生命周期相同,且ThreadLocal使用完没有进行手动删除导致的。下面将针对这两种情况进行分析:
如果key使用强引用:
如果在业务代码中使用完ThreadLocal,则此时Stack中的ThreadLocalRef就会被回收了。但是此时ThreadLocalMap中的Entry中的Key是强引用,会造成ThreadLocal实例无法回收。如果没有删除Entry并且CurrentThread依然运行的情况下,强引用链如下图所示,会导致Entry内存泄漏。

所以强引用无法避免内存泄漏。
如果key使用弱引用:
如果在业务代码中使用完ThreadLocal,则此时Stack中的ThreadLocalRef就会被回收了。但是此时ThreadLocalMap中的Entry中的key是弱引用ThreadLocal,会造成ThreadLocal实例被回收,此时Entry中的key=null。但是当没有手动删除Entry以及CurrentThread依然运行的时候,还是存在强引用链,但因为ThreadLocalRef已经被回收了,那么此时的value就无法访问到了,导致value内存泄漏。

所以弱引用也无法避免内存泄漏。
内存泄漏的原因:
- ThreadLocalRef用完后Entry没有手动删除
- ThreadLocalRef用完后CurrentThread依然在运行
第一点表明当在使用完ThreadLocal后,调用其对应的remove()方法删除对应的Entry就可以避免内存泄漏。
第二点是由于ThreadLocalMap是CurrentThread的一个属性,被当前线程引用,生命周期跟CurrentThread一样,如果当前线程结束ThreadLocalMap被回收,自然里面的Entry也会回收了,但问题是此时的线程不一定会被回收,比如线程是从线程池中获取的,用完后就放回池子里了。
所以ThreadLocal内存泄漏的根源是ThreadLocalMap的生命周期跟Thread一样,如果用完ThreadLocal没有手动删除就会导致内存泄漏。
为什么使用弱引用:
- Entry中的key(ThreadLocal)是弱引用,目的是将ThreadLocal对象的生命周期跟线程周期解绑,用WeakReference弱引用关联的对象,只能生存到下一次垃圾回收之前,GC发生时,不管内存够不够,都会被回收。
- 当使用完ThreadLocal,而Thread仍然运行时,即使忘记调用remove()方法,弱引用也会比强引用多一层保障;当GC发生时,弱引用的ThreadLocal被回收,那么key就为null。而ThreadLocalMap中的set、get方***针对key==null的情况进行处理,如果key==null,则系统认为value也应该是无效了应该设置为null,也就是说对应的value会在下次调用ThreadLocal的set、get方法时,执行底层ThreadLocalMap的expungeStaleEntry()方法进行清除无用的value,从而避免内存泄漏。
如果想共享线程的ThreadLocal数据怎么办?
使用InheritableThreadLocal可以实现多个线程访问ThreadLocal的值,在主线程中创建一个InheritableThreadLocal的实例,然后在子线程中得到这个InheritableThreadLocal实例设置的值。
private void test() {
final ThreadLocal threadLocal = new InheritableThreadLocal();
threadLocal.set("主线程的ThreadLocal的值");
Thread t = new Thread() {
@Override
public void run() {
super.run();
Log.i( "我是子线程,我要获取其他线程的ThreadLocal的值 ==> " + threadLocal.get());
}
};
t.start();
}
为什么一般用ThreadLocal都要用static?
ThreadLocal能实现线程的数据隔离,不在于它自己本身,而在于Thread的ThreadLocalMap。所以,ThreadLocal可以只实例化一次,只分配一块存储空间就可以了,没有必要作为成员变量多次被初始化。
参考:ThreadLocal 原理总结_张维鹏的博客-CSDN博客
Atomic原子类的原理
Atomic原子操作类是基于无锁CAS+volatile实现的,并且类中的所有方法都是用final修饰,进一步保证线程安全。而CAS算法的具体实现方式在于Unsafe类中,Unsafe类的所有方法都是native修饰的,也就是说所有方法都是直接调用操作系统底层资源进行相应任务。Atomic使用乐观策略,每次操作时都假设没有冲突发生,并采用volatile配合CAS去修改内存中的变量,如果失败则重试,直到成功为止。
CAS(Compare And Swap)
执行函数:CAS(V,E,U),其包含3个参数:内存值V,旧预期值E,要修改的值U。
- 当且仅当预期值E和内存值V相同时,才会将内存值修改为U并返回true。
- 若V值和E值不同,则说明已经有其它线程做了更新,则当前线程不执行更新操作,但可以选择重新读取该变量尝试再次修改该变量,也可以放弃操作。

CAS一定要和volatile变量配合使用,这样才能保证每次拿到的变量是主内存中最新的那个值,否则旧的预期值E对某条线程来说,永远是一个不会变的值E,只要某次CAS操作失败,永远都不可能成功。由于CAS无锁操作中没有锁的存在,因此不可能出现死锁的情况,也就是天生免疫死锁。
CPU指令对CAS的支持:
由于CAS的步骤很多,那会不会存在一种情况:假设某个线程在判断V和E相同后,正要赋值时,切换了线程,更改了值,从而造成了数据不一致呢?
答案是否定的,因为CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
CAS的ABA问题及其解决方案:
假设这样一种场景,当一个线程执行CAS(V,E,U)操作,在获取到当前变量V,准备修改为新值U前,另外两个线程已连续修改了两次变量V的值,使得该值又恢复为旧值,这样的话,就无法正确判断这个变量是否已被修改过。

解决方法:使用带版本的标志或者时间戳解决ABA问题,在更新数据时,只有要更新的数据和版本标识符合期望值,才允许替换。
Atomic原子类的使用:
anpackage cn.itcats.thread.safe.Test1;
import java.util.concurrent.atomic.AtomicInteger;
public class Sequence {
//使用原子类AtomicInteger
private AtomicInteger a = new AtomicInteger(0);
public int add() {
return a.getAndIncrement(); //先获取再自增,对应a++
//若使用++a 则对应方法是a.incrementAndGet(); 先自增再获取 ,
//多说一句 a-- 就是 a.getAndDecrement();
//若a = a + 10;————对应API a.getAndAdd(10);
}
public static void main(String[] args) {
Sequence sequence = new Sequence();
new Thread(new Runnable() {
public void run() {
while (true) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + sequence.add());
}
}
}).start();
new Thread() {
public void run() {
while (true) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + sequence.add());
}
};
}.start();
}
}
运行结果中并没有出现重复的数字。因此对数值的一些非原子性操作,都可以使用原子类转化为原子性操作。
除了对int等基本数据类型的操作,还可以对数组使用原子类进行原子性操作。
private int[] s = {1,2,3};
AtomicIntegerArray array = new AtomicIntegerArray(s);
public int arrayAdd() {
return array.getAndAdd(0, 10);
}
详情请阅读此博客:Java并发编程-无锁CAS与Unsafe类及其并发包Atomic_zejian_的博客-CSDN博客
参考:Atomic原子类与CAS原理_张维鹏的博客-CSDN博客
阻塞队列
阻塞队列最大的特性在于支持阻塞添加和阻塞删除方法:
- 阻塞添加:当阻塞队列已满时,队列会阻塞加入元素的线程,直到队列元素不满时才重新唤醒线程执行加入元素操作。
- 阻塞删除:当阻塞队列为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作。
Java中的阻塞队列接口BlockingQueue继承自Queue接口,因此先来看看阻塞队列接口提供的主要方法:
public interface BlockingQueue<E> extends Queue<E> {
// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
// 在成功时返回 true,如果此队列已满,则抛IllegalStateException。
boolean add(E e);
// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
// 如果该队列已满,则在到达指定的等待时间之前等待可用的空间,该方法可中断
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//将指定的元素插入此队列的尾部,如果该队列已满,则一直等到(阻塞)。
void put(E e) throws InterruptedException;
//获取并移除此队列的头部,如果没有元素则等待(阻塞),直到有元素将唤醒等待线程执行该操作
E take() throws InterruptedException;
//获取并移除此队列的头部,在指定的等待时间前一直等到获取元素, //超过时间方法将结束
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//从此队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o);
}
//除了上述方法还有继承自Queue接口的方法
//获取但不移除此队列的头元素,没有则抛异常NoSuchElementException
E element();
//获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek();
//获取并移除此队列的头,如果此队列为空,则返回 null。
E poll();
插入方法:
- add(E e):添加成功返回true,失败则抛illegalStateException异常
- offer(E e):成功返回true,如果此队列已满,则返回false
- put(E e):将元素插入此队列的尾部,如果该队列已满,则一直阻塞
删除方法:
- remove(Object o):移除指定元素,成功返回true,失败返回false
- poll():获取并移除此队列的头元素,若队列为空,则返回null
- take():获取并移除此队列头元素,若没有元素则一直阻塞
检查方法:
- element():获取但不移除此队列的头元素,没有元素则抛出异常
- peek():获取但不移除此队列的头元素;若队列为空,则返回null
ArrayBlockingQueue
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;
/**获取数据的索引,主要用于take,poll,peek,remove方法 */
int takeIndex;
/**添加数据的索引,主要用于 put, offer, or add 方法*/
int putIndex;
/** 队列元素的个数 */
int count;
/** 控制并发访问的锁 */
final ReentrantLock lock;
/**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 */
private final Condition notEmpty;
/** notFull条件对象,用于通知put方法队列未满,可执行添加操作 */
private final Condition notFull;
/** 迭代器 */
transient Itrs itrs = null;
}
ArrayBlockingQueue内部通过数组对象items来存储所有的数据,需要注意的是ArrayBlockingQueue通过ReentrantLock来同时控制添加线程与移除线程的并发访问,这点与LinkedBlockingQueue区别很大。而对于notEmpty条件对象则是用于存放等待或唤醒调用take()方法的线程,告诉它们队列已有元素,可以执行获取操作。同理notFull条件对象用于等待或唤醒调用put()方法的线程,告诉它们队列未满,可以执行添加元素的操作。takeIndex代表的是下一个方法(take、pool、peek、remove)被调用时获取数组元素的索引,putIndex则代表下一个方法(put、offer、add)被调用时元素添加到数组中的索引。
阻塞添加:put()
put()方法的特点是阻塞添加,当队列满时通过条件对象来阻塞当前调用put()方法的线程,直到线程又再次被唤醒执行。总的来说添加线程的执行存在以下两种情况:一是队列已满,那么新到来的put线程将添加到notFull的条件队列中等待;二是有移除线程执行移除操作,移除成功同时唤醒put线程。

具体代码如下:
//put方法,阻塞时可中断
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//该方法可中断
try {
//当队列元素个数与数组长度相等时,无法添加元素
while (count == items.length)
//将当前调用线程挂起,添加到notFull条件队列中等待唤醒
notFull.await();
enqueue(e);//如果队列没有满直接添加。。
} finally {
lock.unlock();
}
}
//入队操作
private void enqueue(E x) {
//获取当前数组
final Object[] items = this.items;
//通过putIndex索引对数组进行赋值
items[putIndex] = x;
//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;//队列中元素数量加1
//唤醒调用take()方法的线程,执行元素获取操作。
notEmpty.signal();
}
阻塞删除:take()
take()方法其实很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入notEmpty()条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么put操作将会唤醒take线程,执行take操作,图示如下:

具体代码如下:
//从队列头部删除,队列没有元素就阻塞,可中断
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//中断
try {
//如果队列没有元素
while (count == 0)
//执行阻塞操作
notEmpty.await();
return dequeue();//如果队列有元素执行删除操作
} finally {
lock.unlock();
}
}
//删除队列头元素并返回
private E dequeue() {
//拿到当前数组的数据
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取要删除的对象
E x = (E) items[takeIndex];
将数组中takeIndex索引位置设置为null
items[takeIndex] = null;
//takeIndex索引加1并判断是否与数组长度相等,
//如果相等说明已到尽头,恢复为0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//队列个数减1
if (itrs != null)
itrs.elementDequeued();//同时更新迭代器中的元素数据
//删除了元素说明队列有空位,唤醒notFull条件对象添加线程,执行添加操作
notFull.signal();
return x;
}
LinkedBlockingQueue
LinkedBlockingQueue是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,但大小默认为Integer.MAX_VALUE,建议使用LinkedBlockingQueue时手动传值,避免队列过大造成机器负载或者内存爆满等情况。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 节点类,用于存储数据
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 阻塞队列的大小,默认为Integer.MAX_VALUE */
private final int capacity;
/** 当前阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();
/** 阻塞队列的头结点 */
transient Node<E> head;
/** 阻塞队列的尾节点 */
private transient Node<E> last;
/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();
/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();
}
每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加到链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock和putLock对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,可以大大提高吞吐量。这里再次强调如果没有给LinkedBlockingQueue指定容量大小,其默认值将是Integer.MAX_VALUE,如果存在添加速度大于删除速度的时候,有可能会内存溢出。

阻塞添加:put()
public void put(E e) throws InterruptedException {
//添加元素为null直接抛出异常
if (e == null) throw new NullPointerException();
int c = -1;
//构建节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//获取队列的个数
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//判断队列是否已满,如果已满则阻塞当前线程
while (count.get() == capacity) {
notFull.await();
}
//添加元素并更新count值
enqueue(node);
c = count.getAndIncrement();
//如果队列容量还没满,唤醒下一个添加线程,执行添加操作
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等待线程,因此count肯定会变化
//这里的if条件表示如果队列中还有1条数据,由于队列中存在数据那么就唤醒消费锁
if (c == 0)
signalNotEmpty();
}
这里的put()方法做了两件事,第一件事是判断队列是否满,满了的话将当前线程加入等待队列,没满就将结点封装成Node入队,然后再次判断队列添加完成后是否已满,不满就继续唤醒等待在条件对象notFull上的添加线程。第二件事是,判断是否需要唤醒等待在notEmpty条件对象上的消费线程。
这里可能会有点疑惑,为什么添加完成后是继续唤醒在条件对象notFull上的添加线程而不是向ArrayBlockingQueue那样直接唤醒notEmpty条件对象上的消费线程》而又为什么要当if(c==0)时才去唤醒消费线程呢?
-
唤醒添加线程的原因:在添加新元素完成后,会判断队列是否已满,不满就继续唤醒周期条件对象notFull上的添加线程,这点与前面分析的ArrayBlockingQueue很不相同,在ArrayBlockingQueue内部完成添加操作后,会直接唤醒消费线程对元素进行获取,这是因为ArrayBlockingQueue只用了一个ReentrantLock同时对添加线程和消费线程进行控制,这样如果在添加完成后再次唤醒添加线程的话,消费线程可能永远无法执行,而对于LinkedBlockingQueue来说就不一样了,其内部对添加线程和消费线程分别使用了各自的ReentrantLock锁对并发进行控制,也就是说添加线程和消费线程是不会互斥的,所以添加锁只要管好自己的添加线程即可,添加线程自己直接唤醒自己的其它添加线程,如果没有等待的添加线程,直接结束了。如果有就直到队列元素已满才结束挂起,注意消费线程的执行过程也是如此。这也是为什么LinkedBlockingQueue的吞吐量要相对大些的原因。
-
为什么if(c==0)时才去唤醒消费线程:这是因为消费线程一旦被唤醒,就一直处于消费的状态,直到队列为空才结束,所以c值是一直变化的(c值是添加完元素前队列的大小),此时c只可能是等于0或大于0,如果c=0,那么说明之前消费线程已停止,条件对象上可能存在等待的消费线程,添加完数据后应该是c+1,那么有数据就直接唤醒消费线程,如果没有存在等待的消费线程就直接结束了,等待下一次的take消费操作。如果c>0那么消费线程就不会被唤醒,只能等待下一个消费操作(pool、take、remove)的调用,那为什么不是条件c>0就去唤醒呢?
因为消费线程一旦被唤醒后会和添加线程一样,一直不断唤醒其它消费线程,如果添加前c>0,那么很可能上一次调用消费线程后,数据并没有被消费完,条件队列上也就不存在等待的消费线程,所以c>0唤醒消费线程的意义不大当然如果添加线程一直添加元素,那么一直c>0,消费线程执行的话就要等待下一次调用消费操作remove等才会执行了。
阻塞删除:take()
public E take() throws InterruptedException {
E x;
int c = -1;
//获取当前队列大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中断
try {
//如果队列没有数据,挂机当前线程到条件对象的等待队列中
while (count.get() == 0) {
notEmpty.await();
}
//如果存在数据直接删除并返回该数据
x = dequeue();
c = count.getAndDecrement();//队列大小减1
if (c > 1)
notEmpty.signal();//还有数据就唤醒后续的消费线程
} finally {
takeLock.unlock();
}
//满足条件,唤醒条件对象上等待队列中的添加线程
if (c == capacity)
signalNotFull();
return x;
}
take()方法是一个可阻塞可中断的移除方法,主要做了两件事:一是如果队列没有数据就挂起当前线程到notEmpty条件对象的等待队列中,如果有数据就删除节点并返回数据项,同时唤醒后续消费线程;二是尝试唤醒条件对象notFull上等待队列中的添加线程。
两种队列阻塞队列的不同
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(默认是Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界情况下,可能会造成内存溢出等问题。
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
- 创建与销毁对象的开销不同,ArrayBlockingQueue采用数组作为存储容器,在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。在长时间内需要高效并发地处理大批量数据时,对于GC可能存在较大影响。
- 队列添加或移除的锁不一样,ArrayBlockingQueue的锁是没有分离的,添加操作和移除操作采用同一个ReentrantLock锁,而LinkedBlockingQueue的锁是分离的,添加采用的是putLock,移除采用的是takeLock,这样可以大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。



京公网安备 11010502036488号