- 在并发编程中,每个Java对象都存在一组监视器方法,如wait()、notify()以及notifyAll()方法,通过这些方法,我们可以实现线程间通信与协作(也称为等待唤醒机制),但是这些方法必须配合着synchronized关键字使用;
- 但是随着Condition的出现,我们发现与synchronized的等待唤醒机制相比Condition具有更多的灵活性以及精确性,这是因为notify()在唤醒线程时是随机(同一个锁),而Condition则可通过多个Condition实例对象建立更加精细的线程控制,也就带来了更多灵活性了,简单来说:
- 通过Condition能够精细的控制多线程的休眠与唤醒。
- 对于一个锁,我们可以为多个线程间建立不同的Condition。
- 那Condition到底如何运用呢?又是怎么样的一个继承结构呢?
- 通过上图我们可以发现,AQS以及支持64位的AQS类都分别通过内部类ConditionObject来实现了这个Condition接口。
- 我们再看一下Condition接口中有什么方法?
public interface Condition {
/**
* 当前线程进入等待状态直到被通知(signal)或中断
* 当其他线程调用singal()或singalAll()方法时,该线程将被唤醒
* 当其他线程调用interrupt()方法中断当前线程
* await()相当于synchronized等待唤醒机制中的wait()方法
*/
void await() throws InterruptedException;
//当前线程进入等待状态,直到被唤醒,该方法不响应中断🧡
void awaitUninterruptibly();
//调用该方法,当前线程进入等待状态,直到被唤醒或被中断或超时;其中nanosTimeout指的等待超时时间,单位纳秒🧡
long awaitNanos(long nanosTimeout) throws InterruptedException;
//同awaitNanos,但可以指明时间单位🧡
boolean await(long time, TimeUnit unit) throws InterruptedException;
//调用该方法当前线程进入等待状态,直到被唤醒、中断或到达某个时间期限(deadline),
//如果没到指定时间就被唤醒,返回true,其他情况返回false🧡
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获取与Condition相关联的锁,功能与notify()相同🧡
void signal();
//唤醒所有等待在Condition上的线程,该线程从等待方法返回前必须获取与Condition相关联的锁,功能与notifyAll()相同
void signalAll();
}
- 前面已经分析过,AQS中存在两种队列:
- 同步队列: 同步队列中的结点由Node(ExclusiveNode、SharedNode继承Node)类组成
- 等待队列: 等待队列中的结点也由Node(ConditionNode继承Node)组成、但是使用前必须获得锁,其结点的waitStatus的值为CONDITION。
- 我们具体来看一下这个内部类ConditionObject:中的属性:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
//等待队列的第一个等待结点🧡
private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
//等待队列的最后一个等待结点🧡
private transient ConditionNode lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { } //空构造器
//.....其他代码
- 在实现类ConditionObject中有两个结点分别是firstWaiter和lastWaiter,
- firstWaiter: 代表等待队列第一个等待结点,
- lastWaiter: 代表等待队列最后一个等待结点;
- 每个Condition都对应着一个等待队列,也就是说如果:
- 一个锁上创建了多个Condition对象,那么也就存在多个等待队列。
- 等待队列是一个FIFO的队列,在队列中每一个节点都包含了一个线程的引用,而该线程就是Condition对象上等待的线程。
- 当一个线程调用了await() 相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。
- 看一下ConditionNode类中的属性:
- ConditionNode nextWaiter; // link to next waiting node 说明是单向链表
- Condition中的等待队列模型:
- 如图所示:
- Node节点的数据结构,在等待队列中使用的变量与同步队列是不同的;
- Condtion中等待队列的结点只有直接指向后继结点并没有指明前驱结点,相当于单链表;而且使用的变量是nextWaiter而不是next,这点我们在前面分析结点Node的数据结构时讲过。firstWaiter指向等待队列的头结点,lastWaiter指向等待队列的尾结点,等待队列中结点的状态只有两种,CANCELLED和CONDITION,
- CANCELLED: 表示线程已结束需要从等待队列中移除;
- CONDITION: 表示条件结点等待被唤醒。
- 注意: AQS中只能存在一个同步队列,但可拥有多个等待队列( 因为每个Codition对象对应一个等待队列)。
JDK15的await()方法
下面从源码分析:看看被调用ConditionObject中的await()方法的线程是如何加入等待队列的,而又是如何从等待队列中被唤醒的;(JDK15的分析不懂,看JDK8的吧)
public final void await() throws InterruptedException {
if (Thread.interrupted()) //判断持有锁的线程是否被中断 🧡🧡
throw new InterruptedException(); //没有中断,继续往下执行
ConditionNode node = new ConditionNode(); //接下来准备创建新结点加入等待队列并返回 🧡🧡
long savedState = enableWait(node); //释放当前线程锁即释放同步状态🧡🧡
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false;
while (!canReacquire(node)) { //判断结点是否在同步队列(SyncQueue)中,即是否被唤醒🧡🧡
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
JDK8的await()方法
public final void await() throws InterruptedException {
//首先判断线程是否被中断,中断则直接抛出异常😁
if (Thread.interrupted())
throw new InterruptedException();
//创建新结点加入等待队列😁
Node node = addConditionWaiter();
//释放当前结点的锁即释放同步状态
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断结点是否在同步队列(SyncQueue)中,即是否被唤醒
while (!isOnSyncQueue(node)) {
//如果不在同步队列中,则挂起线程(在等待队列中挂起)
LockSupport.park(this);
//判断是否被中断唤醒,如果是退出循环。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被唤醒后执行(因此唤醒后会由等待队列转移到同步队列)自旋操作争取获得锁,同时判断线程是否被中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// clean up if cancelled
if (node.nextWaiter != null)
//清理等待队列中不为CONDITION状态的结点
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡
// 执行addConditionWaiter() , 添加到等待队列。💛
private Node addConditionWaiter() {
Node t = lastWaiter; // t是等待队列的尾节点💛
// 判断是否为结束状态的结点并移除,也就是将t设置为非结束状态的尾部节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建新结点状态,并设置为CONDITION状态💛
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//加入等待队列,如果等待队列为空,则将当前节点设为第一个节点,否则作为新的尾节点💛
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node; //返回等待队列尾部节点(前一个结点不是结束状态)💛
}
- await()方法主要做了3件事:
- 一是调用addConditionWaiter()方法:将当前线程封装成node结点加入到等待队列;
- 二是调用fullyRelease(node)方法:释放同步状态并唤醒后继(同步队列的)结点的线程;
- 三是调用isOnSyncQueue(node)方法:判断结点是否在同步队列中,注意是个while循环;
- 如果同步队列中没有该结点就直接挂起该线程;
- 如果线程被唤醒后就调用acquireQueued(node, savedState)执行自旋操作争取锁(当前线程结点从等待队列转移到同步队列并开始努力获取锁)。
JDK15唤醒signal()操作:
public final void signal() {
ConditionNode first = firstWaiter; //获取等待队列的第一个结点🧡
if (!isHeldExclusively()) //判断是否持有独占锁,如果不是抛出异常🧡
throw new IllegalMonitorStateException();
if (first != null) //唤醒等待队列第一个结点的线程
doSignal(first, false);
}
🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡
private void doSignal(ConditionNode first, boolean all) { //all传入的false,用来判断是不是signalAll🧡
while (first != null) {
ConditionNode next = first.nextWaiter;
//移除条件等待队列中的第一个结点如果后继结点为null,那么说明没有其他结点,因此将尾结点也设置为null🧡
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) { //判断该节点是不是条件状态,是则进入同步入队
enqueue(first);
if (!all) //看用不用循环判断
break;
}
first = next; //循环找一个
}
}
🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡
final int getAndUnsetStatus(int v) { // for signalling //这里我不太会分析,但是确实比JDK8要优化了,底层用的
return U.getAndBitwiseAndInt(this, STATUS, ~v); //Unsafe类
}
@ForceInline
public final int getAndBitwiseAndInt(Object o, long offset, int mask) {
int current;
do {
current = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset,
current, current & mask));
return current;
}
- 总结:
- 这里signal()方法做了两件事:
- 一是判断等待队列的第一个结点中的当前线程是否持有独占锁,没有就抛出异常,从这点也可以看出只有独占模式先采用等待队列,而共享模式下是没有等待队列的,也就没法使用Condition。
- 二是唤醒等待队列的第一个结点(前提不为空,也就是先await(),再signal()),即执行doSignal(first, false)
- doSignal(ConditionNode first, boolean all)方法显示了在JDK8上的 doSignal(first)的优化:
- 通过参数all来判断是signal()还是signalAll();
- 将状态为conditon的,全都放入到同步队列中(这里是优化的代码)。
- 这里signal()方法做了两件事:
- 流程如下图所示(注意无论是同步队列还是等待队列使用的Node数据结构都是同一个,不过是使用的内部变量不同罢了)