线程池的工作原理,以及拒绝策略,大家都很熟悉,下面主要讲一下线程池shutdown的原理,以及一些不常用操作的原理。

shutdown

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
复制代码

启动有序关闭,在该关闭中执行先前提交的任务,但不接受任何新任务。如果已关闭,则调用不会产生任何其他影响。此方法不等待先前提交的任务完成执行。使用awaitTermination可以做到这一点。

advanceRunState

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}
复制代码

将runState转换为给定状态,或者已经存在的状态比给定状态大时将直接返回。 循环使用CAS设置状态,设置成功返回。

interruptIdleWorkers

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
复制代码

因为work每次执行任务的时候都会先lock,完成任务后unlock, 如果tryLock可以成功说明work当前没有在执行任务。使用interrupt中断空闲的work线程。

tryTerminate

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
复制代码

shutdownNow

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
复制代码

尝试停止所有正在执行的任务,暂停正在等待的任务的处理,并返回正在等待执行的任务的列表。从此方法返回后,这些任务将从任务队列中耗尽(删除)。此方法不等待主动执行的任务终止。除了尽最大努力尝试停止处理正在执行的任务之外,没有任何保证。此实现通过中断取消任务,因此任何无法响应中断的任务都可能永远不会终止。

interruptWorkers

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
复制代码

中断所有线程,即使处于活动状态也是如此。

drainQueue

private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}
复制代码

使用drainTo方法将任务队列写入新列表。但是,如果队列是DelayQueue或其他类型的队列,但poll或drainTo可能无法删除某些元素,则将它们逐个删除。

Worker

Worker主要维护运行任务线程的中断控制状态,以及其他次要簿记。此类扩展了AbstractQueuedSynchronizer,以简化获取和释放围绕每个任务执行的锁。我们实现了一个简单的非可重入互斥锁,而不是使用ReentrantLock,因为我们不希望辅助任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取该锁。另外,为了抑制直到线程真正开始运行任务之前的中断,我们将锁定状态初始化为负值,并在启动时将其清除(在runWorker中)。

runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码

Work反复从队列中获取任务并执行它们,同时解决许多问题:

  1. 我们可能从一个初始任务开始,在这种情况下,我们不需要第一个。否则,只要池是 运行时,我们从getTask获得任务。如果返回null,则 Work由于池状态或配置参数更改而退出。其他退出是由于引发异常 外部代码,在这种情况下,completedAbruptly为true,其通常导致processWorkerExit替换此线程。
  2. 在运行任何任务之前,先获取锁以防止在执行任务时其他池中断,然后我们确保除非池正在停止,否则此线程没有它的interrupt set。
  3. 在每次运行任务之前,都要调用beforeExecute,可能会引发异常,在这种情况下,我们导致线程死亡(中断循环,使用completelyAbruptly为true)无需处理 任务。
  4. 假设beforeExecute正常完成,我们运行 任务,收集其抛出的任何异常以发送给afterExecute。 我们分别处理RuntimeException,Error(两者 规范保证我们可以捕获)和任意Throwables。 因为我们无法在Runnable.run中抛出Throwables,所以我们 将它们包装在错误的出路(到线程的 UncaughtExceptionHandler)。 任何抛出的异常也保守地导致线程死亡。
  5. task.run完成后,我们调用afterExecute,这可能也会引发异常,这也会导致线程 死。 根据JLS Sec 14.20,此例外是即使task.run抛出也将有效。
  6. 异常机制的net效果是afterExecute和线程的UncaughtExceptionHandler具有相同的精度 我们可以提供的有关以下方面遇到的任何问题的信息用户代码。

processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();  // shutdown状态时,每个工作线程完成工作后,终止线程池

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);  // 工作线程执行任务异常退出时,重新启动一个工作线程来完成任务
    }
}
复制代码

线程池的状态

线程池的控制状态ctl是一个atomic integer。表示workCount和runState两个字段。

workerCount指示有效线程数。int的后29位表示有效线程数。

runState,指示是否正在运行,正在关闭等。int的前三位表示线程池的状态。

  1. RUNNING: 接受新任务并处理排队的任务
  2. SHUTDOWN: 不接受新任务,而是处理排队的任务
  3. STOP: 不接受新任务,不处理排队任务以及中断进行中的任务
  4. TIDYING: 所有任务已终止,workerCount为零,线程转换为状态TIDYING将运行Terminated()挂钩方法
  5. TERMINATED: terminated()执行完成

运行状态切换

  • RUNNING -> SHUTDOWN: 在调用shutdown()时,可能隐式在finalize()中
  • (RUNNING or SHUTDOWN) -> STOP: 调用shutdownNow()
  • SHUTDOWN -> TIDYING: 队列和work pool为空
  • STOP -> TIDYING: work pool为空
  • TIDYING -> TERMINATED: terminated()执行完成

状态相关的一些代码

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }

private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

private static final int COUNT_BITS = Integer.SIZE - 3;  // 29,11101
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 29位,全是1,值 536870911

private static final int RUNNING    = -1 << COUNT_BITS;  // 32位,前三位是1, 值 -536870912
private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 值 0
private static final int STOP       =  1 << COUNT_BITS;  // 30位,第一位是1,值 536870912
private static final int TIDYING    =  2 << COUNT_BITS;  // 31位,第一位是1,值 1073741824
private static final int TERMINATED =  3 << COUNT_BITS;  // 31位,前两位是1, 值 1610612736

-1  // 11111111111111111111111111111111, 32位
0   // 0
1   // 1
2   // 10
3   // 11

~CAPACITY // 32位,前三位是1,11100000000000000000000000000000,值-536870912