什么是线程池

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。

线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

而本文描述线程池是JDK中提供的ThreadPoolExecutor类。

当然,使用线程池可以带来一系列好处:

  • 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

线程池的创建

创建线程池主要是ThreadPoolExecutor类来完成,ThreadPoolExecutor 的有许多重载的构造方法,通过参数最多的构造方法来理解创建线程池有哪些需要配置的参数。ThreadPoolExecutor 的构造方法为:

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

下面对参数进行说明:

  1. corePoolSize:表示核心线程池的大小。当提交一个任务时,如果当前核心线程池的线程个数没有达到 corePoolSize,则会创建新的线程来执行所提交的任务,即使当前核心线程池有空闲的线程。如果当前核心线程池的线程个数已经达到了 corePoolSize,则不再重新创建线程。如果调用了prestartCoreThread()或者 prestartAllCoreThreads(),线程池创建的时候所有的核心线程都会被创建并且启动。
  2. maximumPoolSize:表示线程池能创建线程的最大个数。如果当阻塞队列已满时,并且当前线程池线程个数没有超过 maximumPoolSize 的话,就会创建新的线程来执行任务。
  3. keepAliveTime:空闲线程存活时间。如果当前线程池的线程个数已经超过了 corePoolSize,并且线程空闲时间超过了 keepAliveTime 的话,就会将这些空闲线程销毁,这样可以尽可能降低系统资源消耗。
  4. unit:时间单位。为 keepAliveTime 指定时间单位。
  5. workQueue:阻塞队列。用于保存任务的阻塞队列,关于阻塞队列可以看这篇文章。可以使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue
  6. threadFactory:创建线程的工程类。可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字,如果出现并发问题,也方便查找问题原因。
  7. handler:饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就需要采用一种策略来处理这种情况。采用的策略有这几种:
    1. AbortPolicy: 直接拒绝所提交的任务,并抛出RejectedExecutionException异常;
    2. CallerRunsPolicy:只用调用者所在的线程来执行任务;
    3. DiscardPolicy:不处理直接丢弃掉任务;
    4. DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务

线程池的运行状态

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP =  1 << COUNT_BITS;
private static final int TIDYING =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

根据上面源码可知, COUNT_BITS的值为29, CAPACITY的值为2的29次方-1, 二进制表示为: "00011111111111111111111111111111"(明显29个1);

上面的源码中线程池的运行状态的二进制表示:

状态 二进制 意义
RUNNING 11100000000000000000000000000000 接受新execute的task, 执行已入队的task
SHUTDOWN 0 不接受新execute的task, 但执行已入队的task, 中断所有空闲的线程
STOP 00100000000000000000000000000000 不接受新execute的task, 不执行已入队的task, 中断所有的线程
TIDYING 01000000000000000000000000000000 所有线程停止, workerCount数量为0, 将执行hook方法: terminated()
TERMINATED 01100000000000000000000000000000 terminated()方法执行完毕

可以看出, 线程池的状态由32位int整型的二进制的前三位表示.

核心属性ctl源码(线程池状态和有效线程数)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

核心属性ctl, 数据类型是AtomicInteger, 表示了两个含义:

  1. 线程池运行状态(runState)
  2. 线程池中的有效线程数(workerCount)

那是如何做到一个属性表示两个含义的呢? 那就要看看ctlOf方法

private static int ctlOf(int rs, int wc) { return rs | wc; }

ctlOf方法在线程池内部用来更新线程池的ctl属性,比如ctl初始化的时候: ctl = new AtomicInteger(ctlOf(RUNNING, 0)), 调用ThreadPoolExecutor#shutdown方法等;

rs表示runState, wc表示workerCount;

runStateworkerCount按位或运算得到ctl的值;

runStateworkerCount的值由下面两个方法packing和unpacking, 这里的形参c就是ctl.get()的值;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }

下面用表格更清晰理解:

方法 方法体 带入CAPACITY的值
runStateOf c & ~CAPACITY c & 11100000000000000000000000000000
workerCountOf c & CAPACITY c & 00011111111111111111111111111111

按位与运算, 相同位置, 同1才为1, 其余为0;

结合表格看, runStateOf方法取ctl前3位表示runState, workerCountOf方法取第4~32位的值表示workerCount;

在多线程的环境下,运行状态和有效线程数量往往需要保证统一,不能出现一个改而另一个没有改的情况,如果将他们放在同一个AtomicInteger中,利用AtomicInteger的原子操作,就可以保证这两个值始终是统一的。

代码分析

java.util.concurrent.ThreadPoolExecutor#execute:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *1.判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,
         * 如果能完成新线程创建exexute方法结束,成功提交任务
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 2.在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态
         * 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要
         * reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 3.如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池
         * 已经达到饱和状态,所以reject这个他任务
         */
        int c = ctl.get();
         // 1.如果当前有效线程数小于核心线程数,调用addWorker执行任务(即创建一条线程执行该任务)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
         // 2.如果当前有效线程大于等于核心线程数,并且当前线程池状态为运行状态,则将任务添加到阻塞队列中,等待空闲线程取出队列workQueue执行
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);// 移除成功,拒绝该非运行的任务
            else if (workerCountOf(recheck) == 0)
                // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
                // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
                addWorker(null, false);
        }
        // 3.如果阻塞队列已满,则调用addWorker执行任务(即创建一条线程执行该任务)
        else if (!addWorker(command, false))
            reject(command);// 如果创建线程失败,则调用线程拒绝策略
}

画一个简单的图:
图片说明
上图中的worker可简单理解为线程池中的一个线程, workers.size()即使线程池中的线程数;

  1. workers.size()小于corePoolSize时, 创建新的线程执行提交的task.
  2. workers.size()大于corePoolSize时, 并且workQueue没有满, 将task添加到workQueue.
  3. workers.size()大于corePoolSize时, 并且workQueue已经满了, 但是workers.size()<maximumPoolSize, 就创建一个空闲线程处理task.
  4. 当workers.size()大于corePoolSize时, 并且workQueue已经满了, 但是workers.size()>=maximumPoolSize, 执行拒绝策略.

java.util.concurrent.ThreadPoolExecutor#addWorker:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
          // 如果rs大于SHUTDOWN,则说明此时线程池不在接受新任务了
          // 如果rs等于SHUTDOWN,同时满足firstTask为空,且阻塞队列如果有任务,则继续执行任务
          // 也就说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
            // 如果有效线程数大于等于线程池所容纳的最大线程数(基本不可能发生),不能添加任务
            // 或者有效线程数大于等于当前限制的线程数,也不能添加任务
            // 限制线程数量有任务是否要核心线程执行决定,core=true使用核心线程执行任务
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c)) // 使用CAS增加有效线程数量
                    break retry;
                c = ctl.get();  // 如果再次获取ctl变量值
                if (runStateOf(c) != rs)// 再次对比运行状态,如果不一致,再次循环执行,说明此处有其他并发操作,修改了State
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 任务是否已执行
        boolean workerStarted = false;
        // 任务是否已添加
        boolean workerAdded = false;
        // 任务包装类,我们的任务都需要添加到Worker中
        Worker w = null;
        try {
            w = new Worker(firstTask);
             // 获取Worker中的Thread值
            final Thread t = w.thread;
            if (t != null) {
                 // 操作workers HashSet 数据结构需要同步加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 获取当前线程池的运行状态
                    int rs = runStateOf(ctl.get());

                    // rs < SHUTDOWN表示是RUNNING状态;
                    // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                    // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                    // rs是RUNNING状态时,直接创建线程执行任务
                    // 当rs等于SHUTDOWN时,并且firstTask为空,也可以创建线程执行任务,也就说明了SHUTDOWN状态时不再接受新任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                 // 启动线程执行任务
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
}
这里特别强调,firstTask是开启线程执行的首个任务,之后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,需要注意。

创建一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中,这里需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁。

如何执行任务?

我们注意到上面的代码中:

// 启动线程执行任务
if (workerAdded) {
  t.start();
  workerStarted = true;
}

这里的t是w.thread得到的,即是Worker中用于执行任务的线程,该线程由ThreadFactory创建,我们再看看生成Worker的构造方法:

Worker(Runnable firstTask) {
  setState(-1); // inhibit interrupts until runWorker
  this.firstTask = firstTask;
  this.thread = getThreadFactory().newThread(this);
}

newThread传的参数是Worker本身,而Worker实现了Runnable接口,所以当我们执行t.start()时,执行的是Worker的run()方法,找到入口了:

java.util.concurrent.ThreadPoolExecutor.Worker#run:

public void run() {
  runWorker(this);
}

java.util.concurrent.ThreadPoolExecutor#runWorker:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 循环从workQueue阻塞队列中获取任务并执行
            while (task != null || (task = getTask()) != null) {
                // 加同步锁的目的是为了防止同一个任务出现多个线程执行的问题
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果线程池正在关闭,须确保中断当前线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 执行任务前可以做一些操作,交由子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 将task置为空,让线程自行调用getTask()方法从workQueue阻塞队列中获取任务   
                    task = null;
                    // 记录Worker执行了多少次任务,用于统计
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
             // 线程回收过程
            processWorkerExit(w, completedAbruptly);
            //将task置为空,目的是为了让线程自行调用getTask()方法从workQueue阻塞队列中获取任务。
        }
}

如何保证核心线程不被销毁?

我们之前已经知道线程池中可维持corePoolSize数量的常驻核心线程,那么它们是如何保证执行完任务而不被线程池回收的呢?在前面的章节中你可能已经到从workQueue队列中会阻塞式地获取任务,如果没有获取任务,那么就会一直阻塞下去

java.util.concurrent.ThreadPoolExecutor#getTask:

private Runnable getTask() {
        // 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP
            // 则CAS操作减少工作线程数量,并且返回null,线程被回收
            // 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();// CAS操作将线程池中的线程数量减一
                return null;
            }
             // 获取线程池中的有效线程数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
    // 如果开发者主动开启allowCoreThreadTimeOut并且获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的
    // allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收
    // 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 这里说明了两点销毁线程的条件:
        // 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize,
        // 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了
        // 以上两点满足其一,都可以触发线程超时回收
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 尝试用CAS将线程池线程数量减一
                if (compareAndDecrementWorkerCount(c))
                    // 减一成功后返回null,线程被回收
                    return null;
                // 否则循环重试
                continue;
            }

            try {
                // 如果timed为true,阻塞超时获取任务,否则阻塞获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // 如果poll超时获取任务超时了, 将timeOut设置为true
                // 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
}

实现思路大致是:

将timedOut超时标记默认设置为false;
计算timed的值,该值决定了线程的生死大权,(timed && timedOut) 即是线程超时回收的条件之一,需要注意的是第一次(timed && timedOut) 为false,因为timedOut默认值为false,此时还没到poll超时获取的操作;
根据timed值来决定是用阻塞超时获取任务还是阻塞获取任务,如果用阻塞超时获取任务,超时后timedOut会被设置为true,接着继续循环,若(timed && timedOut) 为true,满足线程超时回收。

processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 正常的话再runWorker的getTask方法workerCount已经被减一了
        if (completedAbruptly)
            decrementWorkerCount();
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加线程的completedTasks
            completedTaskCount += w.completedTasks;
            // 从线程池中移除超时或者出现异常的线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 尝试停止线程池
        tryTerminate();
        int c = ctl.get();
        // runState为RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 线程不是异常结束
            if (!completedAbruptly) {
                // 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 线程池还不为空那就不用担心了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1.线程异常退出
            // 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理
            addWorker(null, false);
        }
  }

tryTerminate

processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少wokerCount或SHUTDOWN状态下从队列中移除任务。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 以下状态直接返回:
            // 1.线程池还处于RUNNING状态
            // 2.SHUTDOWN状态但是任务队列非空
            // 3.runState >= TIDYING 线程池已经停止了或在停止了
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;
            // 只能是以下情形会继续下面的逻辑:结束线程池。
            // 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
            // 2.STOP状态,当调用了shutdownNow方法
            // workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
            // 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
                // ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 进入TIDYING状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 子类重载:一些资源清理工作
                        terminated();
                    } finally {
                        // TERMINATED状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 继续awaitTermination
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }

shutdown

shutdown这个方***将runState置为SHUTDOWN,会终止所有空闲的线程。shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方***终止所有的线程。主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法:
他们的实现如下:

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
            advanceRunState(SHUTDOWN);
            // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
            // tryTerminate方法中会保证队列中剩余的任务得到执行。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // STOP状态:不再接受新任务且不再执行队列中的任务。
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 返回队列中还没有被执行的任务。
        tasks = drainQueue();
    }
    finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,
            // 因此保证了中断的肯定是空闲的线程。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    }
    finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    // 初始化时state == -1
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}