使用线程池是为了减少创建和销毁线程带来的资源消耗。线程池适用于单任务处理时间短且任务量较大的场景。
线程池核心参数
参数 | 说明 |
---|---|
int corePoolSize | 核心线程数 |
int maximumPoolSize | 最大线程数 |
long keepAliveTime | 非核心线程最大可空闲时间 |
TimeUnit unit | 非核心线程最大可空闲时间单位 |
BlockingQueue<Runnable> workQueue | 阻塞队列 |
ThreadFactory threadFactory | 线程工厂 |
RejectedExecutionHandler handler | 拒绝策略 |
运行过程
- 初始状态,每来一个任务都会创建一个核心线程,直到线程数等于
corePoolSize
。if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); }
- 核心线程达到最大时,会将新任务加入到阻塞队列中,核心线程会从队列中取任务。
if (isRunning(c) && workQueue.offer(command)) { // ... }
- 阻塞队列满时,再创建非核心线程来执行新任务。
else if (!addWorker(command, false))
- 线程数达到最大后,还有新任务提交进来,就执行拒绝策略。
线程池状态
- RUNNING
初始状态,能够接收新任务,执行新任务 - SHUTDOWN
调用shutdown方法时会变成该状态,该状态下不再接收新任务,但是可以将已提交的任务执行完。 - STOP
调用shutdownNow方法时会变成该状态, 该状态下不接收新任务,不处理已提交的任务且会中断正在执行的任务。 - TIDYING
所有任务已终止且workCount=0时,线程池会进入该状态。 - TERMINATED
线程池彻底终止状态。
拒绝策略
策略 | 说明 |
---|---|
AbortPolicy | 抛RejectedExecutionException 异常 |
CallerRunsPolicy | 使用调用者线程执行任务 |
DiscardPolicy | 直接丢弃任务 |
DiscardOldestPolicy | 丢弃等待最久的任务,即队列最前面的任务,并继续执行新任务 |
Worker
private boolean addWorker(Runnable firstTask, boolean core) { //... boolean workerAdded = false; Worker w = null; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //... workers.add(w); workerAdded = true; //... if (workerAdded) { // 线程启动后会执行worker的run方法 t.start(); workerStarted = true; } } } // 构造方法 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 创建线程时将worker本身传递进去了。 this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 取得任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 线程重用关键逻辑 // 循环执行当前任务 或 从队列取任务执行 while (task != null || (task = getTask()) != null) { w.lock(); // ... try { beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { // .. } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } // 获取任务 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // ... // 允许空闲核心线程被销毁或者线程数大于核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // 返回null时,工作线程会结束并销毁 return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
线程数设置建议
- 对于CPU密集型任务,CPU闲置时间较少,设置为CPU核数+1
- 对于IO密集型任务,CPU闲置时间相对较多,设置为2*CPU核数+1
但是实际场景还是需要根据压测结果来设置的