• 在并发编程中,每个Java对象都存在一组监视器方法,如wait()、notify()以及notifyAll()方法,通过这些方法,我们可以实现线程间通信与协作(也称为等待唤醒机制),但是这些方法必须配合着synchronized关键字使用;
  • 但是随着Condition的出现,我们发现与synchronized的等待唤醒机制相比Condition具有更多的灵活性以及精确性,这是因为notify()在唤醒线程时是随机(同一个锁),而Condition则可通过多个Condition实例对象建立更加精细的线程控制,也就带来了更多灵活性了,简单来说:
    • 通过Condition能够精细的控制多线程的休眠与唤醒。
    • 对于一个锁,我们可以为多个线程间建立不同的Condition。
  • 那Condition到底如何运用呢?又是怎么样的一个继承结构呢?

alt

  • 通过上图我们可以发现,AQS以及支持64位的AQS类都分别通过内部类ConditionObject来实现了这个Condition接口。
  • 我们再看一下Condition接口中有什么方法?
public interface Condition {
	 /**
	  * 当前线程进入等待状态直到被通知(signal)或中断
	  * 当其他线程调用singal()或singalAll()方法时,该线程将被唤醒
	  * 当其他线程调用interrupt()方法中断当前线程
	  * await()相当于synchronized等待唤醒机制中的wait()方法
	  */
    void await() throws InterruptedException;		
	//当前线程进入等待状态,直到被唤醒,该方法不响应中断🧡
    void awaitUninterruptibly();		
	//调用该方法,当前线程进入等待状态,直到被唤醒或被中断或超时;其中nanosTimeout指的等待超时时间,单位纳秒🧡
    long awaitNanos(long nanosTimeout) throws InterruptedException;
	//同awaitNanos,但可以指明时间单位🧡
    boolean await(long time, TimeUnit unit) throws InterruptedException;
	//调用该方法当前线程进入等待状态,直到被唤醒、中断或到达某个时间期限(deadline),
	//如果没到指定时间就被唤醒,返回true,其他情况返回false🧡
    boolean awaitUntil(Date deadline) throws InterruptedException;
	//唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获取与Condition相关联的锁,功能与notify()相同🧡
    void signal();
	//唤醒所有等待在Condition上的线程,该线程从等待方法返回前必须获取与Condition相关联的锁,功能与notifyAll()相同
    void signalAll();
}
  • 前面已经分析过,AQS中存在两种队列:
    • 同步队列: 同步队列中的结点由Node(ExclusiveNode、SharedNode继承Node)类组成
    • 等待队列: 等待队列中的结点也由Node(ConditionNode继承Node)组成、但是使用前必须获得锁,其结点的waitStatus的值为CONDITION。

alt

  • 我们具体来看一下这个内部类ConditionObject:中的属性:
    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
       	//等待队列的第一个等待结点🧡
        private transient ConditionNode firstWaiter; 
        /** Last node of condition queue. */
        //等待队列的最后一个等待结点🧡
        private transient ConditionNode lastWaiter; 
        /**
         * Creates a new {@code ConditionObject} instance.
         */
        public ConditionObject() { }		 //空构造器
        //.....其他代码
  • 在实现类ConditionObject中有两个结点分别是firstWaiter和lastWaiter,
    • firstWaiter: 代表等待队列第一个等待结点,
    • lastWaiter: 代表等待队列最后一个等待结点;
  • 每个Condition都对应着一个等待队列,也就是说如果:
    • 一个锁上创建了多个Condition对象,那么也就存在多个等待队列。
  • 等待队列是一个FIFO的队列,在队列中每一个节点都包含了一个线程的引用,而该线程就是Condition对象上等待的线程。
  • 当一个线程调用了await() 相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。
  • 看一下ConditionNode类中的属性:
  • ConditionNode nextWaiter; // link to next waiting node 说明是单向链表
  • Condition中的等待队列模型:

alt

  • 如图所示:
    • Node节点的数据结构,在等待队列中使用的变量与同步队列是不同的;
    • Condtion中等待队列的结点只有直接指向后继结点并没有指明前驱结点,相当于单链表;而且使用的变量是nextWaiter而不是next,这点我们在前面分析结点Node的数据结构时讲过。firstWaiter指向等待队列的头结点,lastWaiter指向等待队列的尾结点,等待队列中结点的状态只有两种,CANCELLED和CONDITION,
      • CANCELLED: 表示线程已结束需要从等待队列中移除;
      • CONDITION: 表示条件结点等待被唤醒。
    • 注意: AQS中只能存在一个同步队列,但可拥有多个等待队列( 因为每个Codition对象对应一个等待队列)。

JDK15的await()方法

下面从源码分析:看看被调用ConditionObject中的await()方法的线程是如何加入等待队列的,而又是如何从等待队列中被唤醒的;(JDK15的分析不懂,看JDK8的吧)

   public final void await() throws InterruptedException {
        if (Thread.interrupted())									//判断持有锁的线程是否被中断	🧡🧡	
            throw new InterruptedException();						//没有中断,继续往下执行
            
        ConditionNode node = new ConditionNode();					//接下来准备创建新结点加入等待队列并返回 🧡🧡	
        
        long savedState = enableWait(node);							//释放当前线程锁即释放同步状态🧡🧡	
        LockSupport.setCurrentBlocker(this); // for back-compatibility
        boolean interrupted = false, cancelled = false;
        
        while (!canReacquire(node)) {							//判断结点是否在同步队列(SyncQueue)中,即是否被唤醒🧡🧡	
            if (interrupted |= Thread.interrupted()) {
                if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                    break;              // else interrupted after signal
            } else if ((node.status & COND) != 0) {
                try {
                    ForkJoinPool.managedBlock(node);
                } catch (InterruptedException ie) {
                    interrupted = true;
                }
            } else
                Thread.onSpinWait();    // awoke while enqueuing
        }
        LockSupport.setCurrentBlocker(null);
        node.clearStatus();
        acquire(node, savedState, false, false, false, 0L);
        if (interrupted) {
            if (cancelled) {
                unlinkCancelledWaiters(node);
                throw new InterruptedException();
            }
            Thread.currentThread().interrupt();
        }
    }

JDK8的await()方法

public final void await() throws InterruptedException {
		//首先判断线程是否被中断,中断则直接抛出异常😁
      if (Thread.interrupted())			
          throw new InterruptedException();
          
   		//创建新结点加入等待队列😁
      Node node = addConditionWaiter();		
       //释放当前结点的锁即释放同步状态
      int savedState = fullyRelease(node);	
      int interruptMode = 0;
   	  //判断结点是否在同步队列(SyncQueue)中,即是否被唤醒
      while (!isOnSyncQueue(node)) {	
       	  //如果不在同步队列中,则挂起线程(在等待队列中挂起)
          LockSupport.park(this);		
          //判断是否被中断唤醒,如果是退出循环。
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)   
              break;
      }
       //被唤醒后执行(因此唤醒后会由等待队列转移到同步队列)自旋操作争取获得锁,同时判断线程是否被中断
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 
          interruptMode = REINTERRUPT;
       // clean up if cancelled
      if (node.nextWaiter != null) 
          //清理等待队列中不为CONDITION状态的结点
          unlinkCancelledWaiters();
      if (interruptMode != 0)
          reportInterruptAfterWait(interruptMode);
  }
🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡  🧡🧡🧡🧡
// 执行addConditionWaiter() , 添加到等待队列。💛  
	private Node addConditionWaiter() {
	    Node t = lastWaiter;		// t是等待队列的尾节点💛
		   // 判断是否为结束状态的结点并移除,也就是将t设置为非结束状态的尾部节点
	      if (t != null && t.waitStatus != Node.CONDITION) {      
	          unlinkCancelledWaiters();
	          t = lastWaiter;
	      }
	     //创建新结点状态,并设置为CONDITION状态💛
	      Node node = new Node(Thread.currentThread(), Node.CONDITION);
		//加入等待队列,如果等待队列为空,则将当前节点设为第一个节点,否则作为新的尾节点💛
	      if (t == null) 
	          firstWaiter = node;
	      else
	          t.nextWaiter = node;
	      lastWaiter = node;
	      return node;  //返回等待队列尾部节点(前一个结点不是结束状态)💛
	        }

  • await()方法主要做了3件事:
    • 一是调用addConditionWaiter()方法:将当前线程封装成node结点加入到等待队列;
    • 二是调用fullyRelease(node)方法:释放同步状态并唤醒后继(同步队列的)结点的线程;
    • 三是调用isOnSyncQueue(node)方法:判断结点是否在同步队列中,注意是个while循环;
      • 如果同步队列中没有该结点就直接挂起该线程;
      • 如果线程被唤醒后就调用acquireQueued(node, savedState)执行自旋操作争取锁(当前线程结点从等待队列转移到同步队列并开始努力获取锁)。

JDK15唤醒signal()操作:


		 public final void signal() {
		     ConditionNode first = firstWaiter;			//获取等待队列的第一个结点🧡
		     if (!isHeldExclusively())					//判断是否持有独占锁,如果不是抛出异常🧡
		        throw new IllegalMonitorStateException();
		     if (first != null)							//唤醒等待队列第一个结点的线程
		        doSignal(first, false);
		   }
🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡  🧡🧡🧡🧡

        private void doSignal(ConditionNode first, boolean all) {   //all传入的false,用来判断是不是signalAll🧡       
            while (first != null) {
                ConditionNode next = first.nextWaiter;	
           //移除条件等待队列中的第一个结点如果后继结点为null,那么说明没有其他结点,因此将尾结点也设置为null🧡	
                if ((firstWaiter = next) == null)
                    lastWaiter = null;
                if ((first.getAndUnsetStatus(COND) & COND) != 0) {  //判断该节点是不是条件状态,是则进入同步入队
                    enqueue(first);
                    if (!all)			//看用不用循环判断
                        break;
                }
                first = next;							//循环找一个
            }
        }
🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡 🧡🧡🧡🧡  🧡🧡🧡🧡
       final int getAndUnsetStatus(int v) {     // for signalling       //这里我不太会分析,但是确实比JDK8要优化了,底层用的
          return U.getAndBitwiseAndInt(this, STATUS, ~v);				//Unsafe类
       }
       
          @ForceInline
	    public final int getAndBitwiseAndInt(Object o, long offset, int mask) {
	        int current;
	        do {
	            current = getIntVolatile(o, offset);
	        } while (!weakCompareAndSetInt(o, offset,
	                                                current, current & mask));
	        return current;
	    }

  • 总结:
    • 这里signal()方法做了两件事:
      • 一是判断等待队列的第一个结点中的当前线程是否持有独占锁,没有就抛出异常,从这点也可以看出只有独占模式先采用等待队列,而共享模式下是没有等待队列的,也就没法使用Condition。
      • 二是唤醒等待队列的第一个结点(前提不为空,也就是先await(),再signal()),即执行doSignal(first, false)
    • doSignal(ConditionNode first, boolean all)方法显示了在JDK8上的 doSignal(first)的优化:
      • 通过参数all来判断是signal()还是signalAll();
      • 将状态为conditon的,全都放入到同步队列中(这里是优化的代码)。
  • 流程如下图所示(注意无论是同步队列还是等待队列使用的Node数据结构都是同一个,不过是使用的内部变量不同罢了)

alt