当存在多线程执行任务,需要等待指定数量的子线程都执行完后,再执行主线程任务场景时,可以使用CountDownLatch类来实现。其静态内部类Sync继承了AbstractQueuedSynchronizer,主要实现了AQS的tryAcquireShared和tryReleaseShared方法。
基本使用
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
// 定义计数器值为10的CountDownLatch对象
CountDownLatch countDownLatch = new CountDownLatch(10);
// 创建10个子线程执行任务,最少需要10个子线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 任务执行完后将计数器减1
countDownLatch.countDown();
}
System.out.println(Thread.currentThread().getName() + " end...");
}, "Thread-" + i).start();
}
// 在此等待所有任务完成
countDownLatch.await();
System.out.println("Thread-main end...");
}
} 源码解析
- 构造一个计数器10的
CountDownLatch对象,设置state值为10
// step 1
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
// step 2
Sync(int count) {
setState(count);
} countDown以及await方法分析
// step 1: 调用了sync实例的releaseShared方法
public void countDown() {
sync.releaseShared(1);
}
// step 2: 接着调用了sync实例的tryReleaseShared方法
public final boolean releaseShared(int arg) {
// 尝试将计数器state进行减一
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// step3: 接着查看`tryReleaseShared`方法,该方法具体实现是在`Sync`类中
private static final class Sync extends AbstractQueuedSynchronizer {
// 省略不相关代码...
protected boolean tryReleaseShared(int releases) {
// 此处的for循环是为了解决在高并发场景下,CAS操作失败时,进行重试
for (;;) {
int c = getState();
// 当state已经为0时,直接返回false。
if (c == 0)
return false;
int nextc = c-1;
// 基于CAS操作,保证state变量的原子性更新
if (compareAndSetState(c, nextc))
// 当state更新为0时,返回true。
return nextc == 0;
}
}
}
// step 4: 根据tryReleaseShared方法的分析得知:
// 只有将state成功更新为0的线程,才会执行doReleaseShared逻辑,即step 2中的if块逻辑
// 在分析这个方法之前,先看下await方法,直接看step 5
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
// step 5: 判断子线程是否执行完,反之将主线程阻塞。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 线程是否已被中断
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 将主线程阻塞
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) { // 改方法也是在CountDownLatch重写了
// 这里逻辑很简单,就是判断state是否为0,为0时才返回1
// 所以如果在执行await之前,state为已经为0时,不会将主线程阻塞。
// 有且仅当还有等待的线程时,才会将主线程阻塞。
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 将主线程添加到阻塞队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取前继节点
final Node p = node.predecessor();
// 该队列里中只有一个主线程节点,所以此处为true
if (p == head) {
// 再次检查state是否为0
int r = tryAcquireShared(arg);
// 条件满足表示state为0,所需数量的子线程任务已执行完
if (r >= 0) {
// 将head指针指向新加的主线程阻塞节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 如果子线程未执行完,将head节点设为可通知运行状态
// 并在第二次for循环后,设置为阻塞状态,通过LockSupport$park方法设置
// 注意:此处阻塞之后,会在子线程执行完后,进行唤醒,唤醒后,会从此处继续执行
// 继续执行后,会再进入一次循环,下一次循环进来时,state即为0了
// head指针也会指向主线程阻塞的那个节点,退出循环
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// step 6: 再回到step 4源码处
// 此方法进入条件为state为0,即子线程执行完毕, 将唤醒主线程
private void doReleaseShared() {
for (;;) {
Node h = head;
// 如果阻塞队列中存在阻塞线程节点
if (h != null && h != tail) {
int ws = h.waitStatus;
// 第五步已分析到,主线程阻塞前,head的waitStatus值会被修改为Node.SIGNAL
if (ws == Node.SIGNAL) {
// 若设置失败,通过for循环重试
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 将主线程唤醒,主线程继续执行,通过LockSupport$unpark方法设置
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
} 
京公网安备 11010502036488号