线程池

概述

  • 例子: 10年前单核CPU电脑,假的多线程,像马戏团小丑玩多个球,CPU需要来回切换。
    现在是多核电脑,多个线程各自跑在独立的CPU上,不用切换效率高。
  • 线程池的优势:
    线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
  • 它的主要特点为:线程复用;控制最大并发数;管理线程。
  • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
    第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
    第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

架构

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类

常见创建线程池方式

 Executors.newCachedThreadPool():无限线程池。
 Executors.newFixedThreadPool(nThreads):创建固定大小的线程池。
 Executors.newSingleThreadExecutor():创建单个线程的线程池。

线程池几个重要参数

源码:7大参数

/**7大参数*/
    public ThreadPoolExecutor(int corePoolSize,//1、corePoolSize:线程池中的常驻核心线程数
                              int maximumPoolSize,//2、maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1
                              long keepAliveTime,//3、keepAliveTime:多余的空闲线程的存活时间当前池中线程数量超过corePoolSize时,当空闲时
                              					//间达到keepAliveTime时,多余线程会被销毁直到只剩下corePoolSize个线程为止
                              TimeUnit unit,//4、unit:keepAliveTime的单位 
                              BlockingQueue<Runnable> workQueue,//5、workQueue:任务队列,被提交但尚未被执行的任务
                              ThreadFactory threadFactory,//6、threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可
                              RejectedExecutionHandler handler//7、handler:拒绝策略,表示当队列满了,并且工作线程大于
																//等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的runnable的策略
                              ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

线程池状态

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

执行任务 executer()

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();//获取当前线程池的状态 
        if (workerCountOf(c) < corePoolSize) {//当前线程数量小于 coreSize 时创建一个新的线程运行
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//如果当前线程处于运行状态,并且写入阻塞队列成功
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))  //双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
                reject(command);
            else if (workerCountOf(recheck) == 0)  //如果当前线程池为空就新创建一个线程并执行。
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略
            reject(command);
}

线程池底层工作原理


重要提示
1、在创建了线程池后,线程池中的线程数为零。
2、当调用
execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1如果正在运行的线程数量小于
corePoolSize**,那么马上创建线程运行这个任务;
2.2如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。**

线程池的拒绝策略

何时用:

等待队列已经排满了,再也塞不下新任务了同时,线程池中的max线程也达到了,无法继续为新任务服务。这个是时候我们就需要拒绝策略机制合理的处理这个问题。

JDK内置的拒绝策略

  1. AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
  2. CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
  3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
  4. DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

以上内置拒绝策略均实现了RejectedExecutionHandle接口

自定义线程池

实现demo:


class MyThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                2L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                //new ThreadPoolExecutor.AbortPolicy()
                //new ThreadPoolExecutor.CallerRunsPolicy()
                //new ThreadPoolExecutor.DiscardOldestPolicy()
                new ThreadPoolExecutor.DiscardOldestPolicy()
        );
        //10个顾客请求
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

    }

    private static void threadPool() {
        //List list = new ArrayList();
        //List list = Arrays.asList("a","b");
        //固定数的线程池,一池五线程

// ExecutorService threadPool = Executors.newFixedThreadPool(5); //一个银行网点,5个受理业务的窗口
// ExecutorService threadPool = Executors.newSingleThreadExecutor(); //一个银行网点,1个受理业务的窗口
        ExecutorService threadPool = Executors.newCachedThreadPool(); //一个银行网点,可扩展受理业务的窗口

        //10个顾客请求
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

几种常见的阻塞队列

  1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
  2. LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
  3. SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。

合理配置线程池你是如何考虑的?

  • CPU密集型

    CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。

    CPU密集型任务配置尽可能少的线程数量:
    一般公式为:CPU核数+1个线程的线程的线程池。

  • IO密集型

    • 1

      由于IO密集型任务线程并不是一直执行任务,则应配置尽可能多的线程,如CPU核数*2

    • 2

      IO密集型,即该任务需要大量的IO,即大量的阻塞。

      在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。
      所以IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。

      IO密集型时,大部分线程都阻塞,故需要多配置线程数:
      参考公式:CPU核数/1-阻塞系数 阻塞系数在0.8-0.9之间。

死锁编程及定位分析

  • 是什么

    死锁是指两个或者两个以上的进程在执行过程中,因抢夺资源而造成的一种互相等待的现象,若无外力干涉它们将都无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁出现的可能性也就很低,否则就会因争夺有限的资源而陷入死锁。

    • 产生死锁的主要原因

      • 系统资源不足
      • 进程运行推进的顺序不合适
      • 资源分配不当
  • 代码:


class HoldThread implements Runnable {

    private String lockA;
    private String lockB;

    public HoldThread(String lockA, String lockB) {
        this.lockA = lockA;
        this.lockB = lockB;
    }

    @Override
    public void run() {
        synchronized (lockA) {
            System.out.println(Thread.currentThread().getName() + "\t 自己持有锁" + lockA + "尝试获得" + lockB);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (lockB) {
                System.out.println(Thread.currentThread().getName() + "\t 自己持有锁" + lockB + "尝试获得" + lockA);
            }
        }
    }
}

/** * Description: * 死锁是指两个或者以上的进程在执行过程中, * 因争夺资源而造成的一种相互等待的现象, * 若无外力干涉那他们都将无法推进下去 * * @author veliger@163.com * @date 2019-04-14 0:05 **/
public class DeadLockDemo {
    public static void main(String[] args) {
        String lockA = "lockA";
        String lockB = "lockB";
        new Thread(new HoldThread(lockA, lockB), "threadAAA").start();
        new Thread(new HoldThread(lockB, lockA), "threadBBB").start();
    }
}

 
  • 解决

    • jps命令定位进程号
    • jstack找到死锁查看