使用线程池是为了减少创建和销毁线程带来的资源消耗。线程池适用于单任务处理时间短且任务量较大的场景。

线程池核心参数

参数 说明
int corePoolSize 核心线程数
int maximumPoolSize 最大线程数
long keepAliveTime 非核心线程最大可空闲时间
TimeUnit unit 非核心线程最大可空闲时间单位
BlockingQueue<Runnable> workQueue 阻塞队列
ThreadFactory threadFactory 线程工厂
RejectedExecutionHandler handler 拒绝策略

运行过程

  1. 初始状态,每来一个任务都会创建一个核心线程,直到线程数等于corePoolSize
    if (workerCountOf(c) < corePoolSize) {
     if (addWorker(command, true))
         return;
     c = ctl.get();
    }
  2. 核心线程达到最大时,会将新任务加入到阻塞队列中,核心线程会从队列中取任务。
    if (isRunning(c) && workQueue.offer(command)) {
     // ...
    }
  3. 阻塞队列满时,再创建非核心线程来执行新任务。
    else if (!addWorker(command, false))
  4. 线程数达到最大后,还有新任务提交进来,就执行拒绝策略。

线程池状态

图片说明

  1. RUNNING
    初始状态,能够接收新任务,执行新任务
  2. SHUTDOWN
    调用shutdown方法时会变成该状态,该状态下不再接收新任务,但是可以将已提交的任务执行完。
  3. STOP
    调用shutdownNow方法时会变成该状态, 该状态下不接收新任务,不处理已提交的任务且会中断正在执行的任务。
  4. TIDYING
    所有任务已终止且workCount=0时,线程池会进入该状态。
  5. TERMINATED
    线程池彻底终止状态。

拒绝策略

RejectPolicy

策略 说明
AbortPolicy RejectedExecutionException异常
CallerRunsPolicy 使用调用者线程执行任务
DiscardPolicy 直接丢弃任务
DiscardOldestPolicy 丢弃等待最久的任务,即队列最前面的任务,并继续执行新任务

Worker

private boolean addWorker(Runnable firstTask, boolean core) {
    //...
    boolean workerAdded = false;
    Worker w = null;
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        //...
        workers.add(w);
        workerAdded = true;
        //...
        if (workerAdded) {
            // 线程启动后会执行worker的run方法
            t.start();
            workerStarted = true;
        }
    }
}
// 构造方法
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 创建线程时将worker本身传递进去了。
    this.thread = getThreadFactory().newThread(this);
}
public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 取得任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 线程重用关键逻辑
        // 循环执行当前任务 或 从队列取任务执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // ...
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    // ..
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
// 获取任务
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        // ...
        // 允许空闲核心线程被销毁或者线程数大于核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                // 返回null时,工作线程会结束并销毁
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

线程数设置建议

  1. 对于CPU密集型任务,CPU闲置时间较少,设置为CPU核数+1
  2. 对于IO密集型任务,CPU闲置时间相对较多,设置为2*CPU核数+1
    但是实际场景还是需要根据压测结果来设置的