Condition接口

Condition主要是为了配合lock使用,类似于wait和notify配合synchronize使用一样,起到线程之间的通信作用。

public interface Condition {

     /** 
      * 暂停此线程直至一下四种情况发生
      * 1.此Condition被signal()
      * 2.此Condition被signalAll()
      * 3.Thread.interrupt()
      * 4.伪wakeup
      * 以上情况.在能恢复方法执行时,当前线程必须要能获得锁
      */
    void await() throws InterruptedException;

 
    //跟上面类似,不过不响应中断
    void awaitUninterruptibly();

    //带超时时间的await()
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    //带超时时间的await()
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    
    //带deadline的await()
    boolean awaitUntil(Date deadline) throws InterruptedException;

    //唤醒某个等待在此condition的线程
    void signal();
  
    //唤醒所有等待在此condition的所有线程
    void signalAll();
}

AQS中的Condition

这个其实底层和AQS中的clh队列一样,在java中使用了clh的变种形式开发juc里面的并发工具。底层有个Node类


这里有个属性waitStatus,上面那些常量事先定义 一些节点的状态,有个状态代表CONDITION代表此状态代表着现在该节点是在Condition等待队列中,因为waitStatus的不同可以看成有两条队列,等待锁的同步等待队列(waitStatus=0),等待唤醒的Condition队列。


这个对象里面也比较简单,主要是两个引用指向clh队列中的节点。

await方法源码解读

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            long savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

首先先总体解读一下,再细致下去到具体方法中

  1. 首先判断中断一下,因为这个方法本来就是响应中断的
  2. 添加一个节点到Condition等待队列尾部
  3. 释放锁
  4. 如果还在Condition等待队列中就等待park。如果不在那就是去了AQS的同步等待队列了那就去尝试竞争锁。
  5. 在signal之后,AQS一样的等待锁(执行acquireQueued方法)(寻找安全休息点,等着前驱唤醒)

第一步好理解,直接看第二步addConditionWaiter方法

private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

可以看到首先拿到Condition等待队列队尾,如果队尾已经失效了,那就清理掉,执行unlinkCancelledWaiters方法

private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

这里有点复杂,大概就是从开始遍历整个Condition等待队列如果Node状态不是Condition就移除修改nextWaitter的指向

这步完成后,就是创建个新的Node节点把它放在队列尾部就行,很简单的代码

再之后就是释放锁资源了fullyRelease方法的执行

final long fullyRelease(Node node) {
        boolean failed = true;
        try {
            long savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

调用的还是release方法,底层还是AQS的释放资源的方法,这个了解AQS的人应该很熟悉,大概讲解下流程,调用自己重写的tryRelease方法,修改state的值,然后修改node节点的状态,找到下一个要唤醒的节点,唤醒它。让它自己去获取锁,这就是简单到流程。不做过多描述,只要知道这是释放锁就行

然后就是isOnSyncQueue方法,从字面意思上看,是否在sync队列上,也就是说刚刚添加到Condition队列尾部的节点已经在Sync队列中了,也就是同步等待队列中,那么就没有必要park了,可能其他线程唤醒了也可能中断了,失效了都有可能

final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

很简单的代码,十分容易理解

然后就是调用一个park方法让线程阻塞就行。

signal方法源码解读

 public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

按照惯例先总结一下流程

  1. 如果没有获取到锁就异常,因为获取锁会把exclusiveOwnerThread属性改变为当前线程。isHeldExclusive方法就是检测exclusiveOwnerThread属性和当前线程是否相等,不同就报错
  2. 得到Condition队列中第一个
  3. 不为空就唤醒??这里有点疑问以前以为是随机唤醒,但这居然是第一个Condition队列中的节点

因为AQS分为独占和共享两个模式,也就是说这个Condition只能作用于独占模式的juc工具,好像我所知道的就ReentrantLock可以new出这个对象来进行阻塞

doSignal

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

首先把要signal的节点从Condition队列上剔除,也就是将节点的nextWaiter=null,然后调用transferForSignal方法去转变状态,转变到sync队列当中去

final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

这里的enq方法就是把这个队列放到sync队列的尾部,因为在doSignal方法中已经改变了nextwaiter的指向,并且改变了waitStatus的状态,所以这个节点也就是成功了一个标准的sync队列的节点。然后这个有个地方注意一下,因为现在节点在sync队列最尾部,如果前驱节点的waitStatus能成功改变状态为SIGNAL。这个状态意味着如果这个节点释放资源的时候会唤醒一下后面的节点。如果成功就没必要unpark,按AQS的流程走去竞争锁的资源,如果失败也就是怕没有其他节点唤醒这个节点,自己先唤醒一下。但是这里有个疑惑,这里唤醒线程不会让线程不安全的情况发生吗?

最后我们看下signal之后线程怎么获取锁


signal之后执行await后面的代码,执行到acquireQueued方法中

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//标记是否成功拿到资源
    try {
        boolean interrupted = false;//标记等待过程中是否被中断过
        
        //又是一个“自旋”!
        for (;;) {
            final Node p = node.predecessor();//拿到前驱
            //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
            if (p == head && tryAcquire(arg)) {
                setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
                p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
                failed = false;
                return interrupted;//返回等待过程中是否被中断过
            }
            
            //如果自己可以休息了,就进入waiting状态,直到被unpark()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

然后执行,执行到shouldParkAfterFailedAcquire方法,前面代码注释很详细应该没问题,肯容易看懂

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驱的状态
    if (ws == Node.SIGNAL)
        //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
        return true;
    if (ws > 0) {
        /*
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
         * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

现在就很明确了,这代码是找到安全的休息点,也就是前驱有效节点,然后告诉前驱你释放锁的时候通知下我,把我唤醒。这里就解决了在signal方法中为什么有一下线程唤醒而不会造成线程不安全的情况,因为最终都要到这里来,找安全点休息。


总结

  1. 首先判断中断一下,因为这个方法本来就是响应中断的
  2. 添加一个节点到Condition等待队列尾部
  3. 释放锁
  4. 如果还在Condition等待队列中就等待park。如果不在那就是去了AQS的同步等待队列了那就去尝试竞争锁。
  5. 在signal之后,AQS一样的等待锁(执行acquireQueued方法)(寻找安全休息点,等着前驱唤醒)但是唯一一个不同的地方在于park的点不一样,AQS是在acquirQueued方法中的parkAndCheckInterrupt,而Condition是在await方法中、意味着如果被前驱节点唤醒后,就执行acquireQueued进去获取资源state。第二种情况,也就是上面说的signal方法就唤醒了这个线程就直接执行acquireQueued方法找到合适的安全点。换句话说,正常情况,前驱唤醒就能获取到资源,类似AQS,其他情况下,前驱节点waitStatus不是signal那就直接唤醒,然后进入acquireQueued方法找到合适的休息点,也就是前驱waitStatus是signal的地方,然后自己park等待唤醒