线程池

背景

  • 池化技术:事先准备好一些资源,等待用的时候拿、不用的时候还回来。也就是线程复用
  • 好处:降低资源的消耗(减少资源对象的创建、销毁次数),提高程序的性能,方便管理,控制最大并发数
  • 常见池子:连接池、内存池、对象池


线程池的异常处理:



创建线程池的两种方法:

Executors 创建:

  1. 单例线程池Executors.newSingleThreadExecutor():创建一个单线程的线程池,可保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行;
  2. 缓存线程池Executors.newCachedThreadPool():创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程;
  3. 固定线程池Executors.newFixedThreadPool(5):创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;
  4. 周期线程池Executors.newScheduledThreadPool():创建一个周期性的线程池,支持定时及周期性执行任务。

ThreadPoolExecutor创建:

  • ThreadPoolExecutor类提供了4种构造方法,可根据需要来自定义一个线程池。

补充

  • 阿里手册不建议这么创建、用原生接口创建。上面的三个方法底层都是通过ThreadPoolExecutor来创建,不安全。所以我们需要通过ThreadPoolExecutor来创建线程池而不是Executors来创建

alt



execute 与 submit 的区别:

  • execute():可以提交 Runnable 类型的任务,没有返回值 void
  • submit():可以提交 Runnable、Callable 类型的任务,返回值为 Future 类型
  • execute():如果遇到异常直接抛出
  • submit():如果遇到异常,会在使用 Future 的 get() 获取返回值的时候,才会抛出异常


ThreadPoolExecutor

参数介绍

public ThreadPoolExecutor(int corePoolSize,  //核心线程池大小,简单来说就是一直开放的线程
   int maximumPoolSize,  //最大核心线程池大小
   long keepAliveTime,  //空闲线程存活时间
   TimeUnit unit,  //超时单位
   BlockingQueue<Runnable> workQueue,  //阻塞对列,需要指定长度
   ThreadFactory threadFactory,  //线程工厂
   RejectedExecutionHandler handler) {  //拒绝策略
   if (corePoolSize < 0 ||
     maximumPoolSize <= 0 ||
     maximumPoolSize < corePoolSize ||
     keepAliveTime < 0)
     throw new IllegalArgumentException();
   if (workQueue == null || threadFactory == null || handler == null)
     throw new NullPointerException();
     this.acc = System.getSecurityManager() == null ?
     null :
     AccessController.getContext();
     this.corePoolSize = corePoolSize;
     this.maximumPoolSize = maximumPoolSize;
     this.workQueue = workQueue;
     this.keepAliveTime = unit.toNanos(keepAliveTime);
     this.threadFactory = threadFactory;
     this.handler = handler;
}
  1. corePoolSize:核心线程数,默认是不会回收的,除非设置allowCoreThreadTimeOut 参数设置为 true 的时候,核心线程在空闲了 keepAliveTime 的时间后也会被回收的,相当于线程池自动给你动态修改了;
  2. maximumPoolSize:想要开放的线程超出了CorePoolSize加上workQueue的长度,就会开放空闲的线程。CorePoolSize + 空闲线程 = maximumPoolSize;
  3. keepAliveTime:空闲线程存活时间,线程池可容纳的最大线程数:maximumPoolSize + workQueue。如果workQueue超过了规定时间还是为空,空闲线程就会关闭,就只剩下CorePoolSize在工作。这里是空闲线程的存活时间,对等待区(workQueue)有效,对核心线程是无效的(设置allowCoreThreadTimeOut 为true就有效了);
  4. threadFactory:线程工厂(创建线程)、一般不用动。默认的线程工厂Executors.defaultThreadFactory();
  5. handler:如果当前线程数超过了maximumPoolSize + workQueue。就会采取拒绝策略

四种拒绝策略

  • AbortPolicy:默认拒绝策略,丢弃任务,并抛出拒绝执行RejectedExecutionException异常;
  • DiscardPolicy:直接丢弃;
  • DiscardOldestPolicy:丢弃workQueue中最老的一个任务,并将新任务加入;
  • CallerRunsPolicy:不丢弃,超出的任务由使用调用线程直接运行任务。


如何设置参数大小

  • 我们是否可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程
  • 线程池调优之动态参数配置

corePoolSize

  • 可以通过 setCorePoolSize() 方法动态配置、或者根据压测调整核心线程数的大小
//  设置corePoolSize的大小
public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

maximumPoolSize

  • CPU 密集型:获取当前电脑CPU的核数 + 1来作为最大的线程池、保证效率最佳。Runtime.getRuntime().availableProcessors() + 1。(这里为什么加一:即使当计算(CPU)密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保 CPU 的时钟周期不会被浪费;也就是备用线程、防止线程中断带来的影响)
  • IO 密集型:一般是两倍的CPU的核数
  • 参考:压测代码片段

我们可以根据自己的业务场景,从“N+1”和“2N”两个公式中选出一个适合的,计算出一个大概的线程数量,之后通过实际压测(IO密集型可以调用IO流等操作、CPU密集型可以计算素数等操作),逐渐往“增大线程数量”和“减小线程数量”这两个方向调整,然后观察整体的处理时间变化,最终确定一个具体的线程数量。

//  修改maximumPoolSize大小
 public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            interruptIdleWorkers();
    }

workQueue

  • 没有设置workQueue大小的方法。所以和这个时候就需要我们自定义一个阻塞对列。如何定义呢,我们想要设置对列的长度的时候一般会用到capacity这个关键词,通过查看源码。以LinkedBlockIngQueue队列为例
//  设置为final,并且没有setter()、getter()方法
private final int capacity;

自定义的阻塞对列。步骤:复制LinkedBlockIngQueue的内容,将category去掉final换成volatile,加上get、set方法

/** The capacity bound,&nbs***bsp;Integer.MAX_VALUE if none */
    private volatile int capacity;

    public void setCapacity(int capacity){
        this.capacity = capacity;
    }

    public int getCapacity() {
        return capacity;
    }

这样就可以修改参数了。

ResizableCapacityLinkedBlockIngQueue queue = (ResizableCapacityLinkedBlockIngQueue) executor.getQueue();
//  修改的数值
queue.setCapacity(temp);


问题

  • 在springboot容器里面写,由于没有响应容器的关闭或者Tomcat的关闭信号导致shutdown()关闭线程池失效。

1)、创建线程池(函数式接口):

/**
 * @author SHshuo
 * @data 2021/8/20--9:33
 * 创建线程池,核心线程为2、最大线程池为本机CPU的最大线程数+1、
 * 空闲线程存活时间为30秒、链式阻塞对列长度为2、默认线程工厂、默认拒绝策略超出会抛出异常
 * <p>
 * 没有输入值,只有返回值所以用supplier函数接口
 */
public class ThreadUtils {

    //    创建线程
    private Supplier<ExecutorService> supplier = () -> {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                Runtime.getRuntime().availableProcessors()+1,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(2),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        return threadPool;
    };

    //    提供对外开放的接口
    public ExecutorService getThreadPool() {
        return supplier.get();
    }

}

2)、调用线程:shutdown()方法失效

/**
 * @author SHshuo
 * @data 2021/8/20--16:27
 * 这个例子很有感触,我根据视频讲解的创建线程池的方法,优化我写项目对应的内容。
 * 我第一次用到函数接口,但是发现一个大问题:就是shutdown语句一直失效
 * 项目的结果就是用了springboot集合的线程池ThreadPoolTaskExecutor。
 *
 * 具体参考:https://blog.csdn.net/francislpx/article/details/105249189
 */
public class ThreadService {

    ThreadUtils threadUtils = new ThreadUtils();

    public void test(List<String> list) {
        list.parallelStream().forEach(p -> {
//            这里的多线程并发执行
            threadUtils.getThreadPool().execute(() -> {
                execute(p);
            });
        });
        System.out.println("线程池关闭");
        threadUtils.getThreadPool().shutdown();
    }

    public void execute(String account) {
        System.out.println("执行内容");
    }
}

参考资料



ThreadPoolTaskExecutor

  • springboot集成的ThreadPoolTaskExecutor,不得不说,真的太好了。对ThreadPoolExecutor的封装已经帮我们实现了修改corePoolSize、maximumPoolSize、workQueue的方法。

源码:

//  修改corePoolSize的大小
public void setCorePoolSize(int corePoolSize) {
		synchronized (this.poolSizeMonitor) {
			this.corePoolSize = corePoolSize;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setCorePoolSize(corePoolSize);
			}
		}
	}

//  修改maxPoolSize的大小
public void setMaxPoolSize(int maxPoolSize) {
		synchronized (this.poolSizeMonitor) {
			this.maxPoolSize = maxPoolSize;
			if (this.threadPoolExecutor != null) {
				this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
			}
		}
	}

//  修改QueueCapacity的大小
public void setQueueCapacity(int queueCapacity) {
		this.queueCapacity = queueCapacity;
	}

完整案例:

@Configuration
@EnableAsync
public class ThreadConfig implements AsyncConfigurer {

//      创建线程池
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() + 1);
        executor.setQueueCapacity(2);
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

参考:



动态线程池

很好的完善了线程池的参数配置问题