环境

jdk1.8

相关类

ThreadPoolExecutor;Executors;ExecutorService;Executor
Executor是一个接口,有一个execute的抽象方法,用于运行新任务
ExecutorService也是一个接口,它继承了Executor接口,并且额外提供了shutdown,submit等线程池方法
ThreadPoolExecutor是我们常说的线程池,它实现了ExecutorService,并且可以让我们精细地调整各种参数来满足特定需求;
Executors是一个工厂类,它有许多静态方法可以生成一个ThreadPoolExecutor实例,常说的四种线程池就是由它包装提供的,它的缺点是固定参数且因为最大线程数或等待队列长度设定过大会导致OOM

 

简单使用实例

    public static void main(String[] argv) {
        // 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,5,TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(5));
        // 执行3个任务
        for (int i = 0; i < 3; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(1);
                }
            });
        }
        executor.shutdown();
    }

线程池参数

1.corePoolSize

核心线程池大小:最小活线程数,除非设定了allowCoreThreadTimeOut否则这些线程不会因过长时间空闲而销毁;

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     */
    private volatile int corePoolSize;
/**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;

当allowCoreThreadTimeOut默认为false,核心线程即使空闲也会保持alive,当为true时使用keepAliveTime去限制核心线程的存活时间

(1)初始时线程池里的线程数为0,然后接到Runnable任务才逐渐增加直到达到corePoolSize

(2)此时再继续添加新任务线程池不会开新的线程,而是将其放入workQueue队列等待

(3)若workQueue队列数已满才会开新的线程处理,直到总线程数CAPACITY达到maximumPoolSize

2.maximumPoolSize

最大线程数:并不是实际线程数,实际线程数是CAPACITY

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     */
    private volatile int maximumPoolSize;

3.keepAliveTime

空闲线程等待时间,超过核心线程数corePoolSize或allowCoreThreadTimeOut时会被执行判断--超过keepAliveTime的线程会被销毁

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     */
    private volatile long keepAliveTime;

4.threadFactory

线程工厂:所有线程都通过这个工厂去创建,一般使用它的default默认的就行

    /**
     * Factory for new threads. All threads are created using this
     * factory (via method addWorker).  All callers must be prepared
     * for addWorker to fail, which may reflect a system or user's
     * policy limiting the number of threads.  Even though it is not
     * treated as an error, failure to create threads may result in
     * new tasks being rejected or existing ones remaining stuck in
     * the queue.
     *
     * We go further and preserve pool invariants even in the face of
     * errors such as OutOfMemoryError, that might be thrown while
     * trying to create threads.  Such errors are rather common due to
     * the need to allocate a native stack in Thread.start, and users
     * will want to perform clean pool shutdown to clean up.  There
     * will likely be enough memory available for the cleanup code to
     * complete without encountering yet another OutOfMemoryError.
     */
    private volatile ThreadFactory threadFactory;

5.handler

当队列与线程池都满了的时候的处理策略

    /**
     * Handler called when saturated or shutdown in execute.
     */
    private volatile RejectedExecutionHandler handler;

 6.workQueue

阻塞等待队列

    /**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue<Runnable> workQueue;

 

有了以上几个参数就可以new一个线程池了

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

   // 创建线程池
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,5,TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(5));

 几种阻塞队列

1.LinkedBlockingQueue:采用链表结构,基于先进先出的顺序,若初始化时未指定长度则默认无限长

2.ArrayBlockingQueue:采用数组结构,基于先进先出的顺序,因为时数组所以初始化时必须指定长度

3.synchronousQueue:采用该阻塞队列意味着来一个任务就创建一个任务

4.PriorityBlockingQueue:按某种规则指定的优先队列

Executors提供的四种线程池

需要注意的是不推荐使用Executors提供的线程池

1FixedThreadPool SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM 

2CachedThreadPool ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM  

1.FixedThreadPool

固定线程数的线程池,其corePoolSize与maximumPoolSize相等,因为没有设定allowCoreThreadTimeOut为true所以就算是没有任务,这些线程都会一直处于active状态,直到关闭线程池

因为采用LinkedBlockingQueue策略所以多的任务需要按先后顺序排队,并且该阻塞队列没有长度限制,可能会导致OOM

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2.SingleThreadPool

只有一个线程的线程池,因为没有设定allowCoreThreadTimeOut为true所以就算是没有任务,这个线程也会处于激活状态

因为采用LinkedBlockingQueue策略所以多的任务需要按先后顺序排队,并且该阻塞队列没有长度限制,可能会导致OOM

它与创建单线程相比,意义在于不必重新创建和销毁一个线程,重复利用此线程

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

3.newCachedThreadPool

corePoolSize为0而maximumPoolSize无限大,这意味着线程池当有新的任务到来它就会开新的线程去处理,这适用于短耗时任务,但若是同时来了很多很多任务可能会因为创建太多线程导致OOM

每个线程的空闲时间被设定为60LSeconds,当超过时就会被线程池销毁

    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

4.ScheduledThreadPool

创建一个有定时功能的线程池,给定corepoolsize是线程池一直存在的线程

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

 

利用Future与Callable获取线程执行结果

实例

    public static void main(String[] argv) throws ExecutionException, InterruptedException{
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,4,0,TimeUnit.MILLISECONDS,new LinkedBlockingDeque<Runnable>());
        Future<Integer> future = pool.submit(new Callable<Integer>(){
            @Override
            public Integer call() throws Exception{
                return 1;
            }
        });
        System.out.println(future.get());
    }

Callable相似于Runnable,两者都可被ExcutorService执行,但是Runnable的run()方法固定为void,而Callable可以返回任意类型

ExcutorService有一个submit方法,执行线程最终是要靠execute()方法,但是submit会将结果返回为一个Future

<T> Future<T> submit(Callable<T> task);

由上可知Future是线程执行的结果的载体,通过调用get方法可以获取结果值(当未限定时间则会一直阻塞等待线程执行完成)

    V get() throws InterruptedException, ExecutionException;