关于死锁,你知道多少?

本文就什么是死锁?怎么找到死锁?怎么解决死锁?怎么避免死锁等问题展开分析,通过大量的代码和案例演示向大家描述死锁的前世今生。

死锁是什么,有什么危害?

定义

  • 并发情况下,当两个(或多个)线程(或进程)相互持有对方所需要的资源,又不主动释放,导致所有人都无法继续前进,程序无限阻塞,就是死锁

两个线程:

多个线程:

危害

  • 死锁的影响在不同系统中是不一样的,这取决于系统对死锁的处理能力

    • 数据库中:检测并放弃事务
    • JVM中:无法自动处理
  • 死锁的几率不高但是危害大

    • 一旦发生,多是高并发场景,影响用户多
    • 整个系统崩溃,子系统崩溃,性能降低
    • 压力测试无法找到所有的死锁

写一个死锁的例子

案例一:必然发生死锁

第一个线程拿到锁o1后等待500毫秒,这段时间第二个线程可以拿到锁o2

然后线程1等待锁o2,线程2等待锁o1

造成程序无限阻塞的现象

代码演示如下:

/** * 〈必定发生死锁的现象〉 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class MustDeadLock implements Runnable { int flag = 1; static Object o1 = new Object(); static Object o2 = new Object(); public static void main(String[] args) { MustDeadLock r1 = new MustDeadLock(); MustDeadLock r2 = new MustDeadLock(); r1.flag = 1; r2.flag = 0; Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); } @Override public void run() { System.out.println(Thread.currentThread().getName()+"开始了,flag = " + flag); if (flag == 1) { synchronized (o1){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o2){ System.out.println("线程1拿到两把锁"); } } } else if (flag == 0) { synchronized (o2){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o1){ System.out.println("线程2拿到两把锁"); } } } } } 
案例二:两个账户转账

模拟两个账户进行转账

  • 如果线程获得一个锁后等待500毫秒,会出现和案例一的死锁现象

  • 如果线程获得一个锁之后不等待500毫秒,只有很小的几率才会发生死锁,通常测试都会正常执行。

代码演示如下:

/** * 〈转账时出现死锁〉 * 一旦注释打开,发生死锁 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class TransferMoney implements Runnable { int flag = 1; static Account a = new Account(500); static Account b = new Account(500); public static void main(String[] args) throws InterruptedException { TransferMoney r1 = new TransferMoney(); TransferMoney r2 = new TransferMoney(); r1.flag = 1; r2.flag = 0; Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); ; t2.start(); t1.join(); t2.join(); System.out.println("a的余额" + a.balance); System.out.println("b的余额" + b.balance); } @Override public void run() { if (flag == 1) { transferMoney(a, b, 200); } if (flag == 0) { transferMoney(b, a, 200); } } public static void transferMoney(Account from, Account to, int amount) { synchronized (from) { //如果休眠500毫秒,那么另一个线程就会拿到to锁,造成相互等待的死锁现象 // try { // Thread.sleep(500); // } catch (InterruptedException e) { // e.printStackTrace(); // } synchronized (to) { if (from.balance - amount < 0) { System.out.println("余额不足,转账失败!"); } else { from.balance -= amount; to.balance += amount; System.out.println("成功转账" + amount + "元"); } } } } //账户对象,拥有属性balance static class Account { int balance; public Account(int balance) { this.balance = balance; } } } 
案例三:多人多次转账

如果两个锁之间不进行等待,很难发生死锁

为了验证不等待也会发生死锁,并且死锁的发生是具有传递性的(而不是仅有少数锁住其他正常运行的),下面我们来完成多人多次转账案例

设置500个账户,每个线程进行操作,并且每个线程转账100000次。每次转账的账户和金额都是随机产生的

演示代码如下:

/** * 〈模拟多人随机转账〉 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class MultiTransferMoney { //账户数 private static final int NUM_ACCOUNTS = 500; //账户金额 private static final int NUM_MONEY = 1000; //每人转账次数 private static final int NUM_ITERATIONS = 100000; //同时转账人数 private static final int NUM_THREADS = 5000; public static void main(String[] args) { Random random = new Random(); Account[] accounts = new Account[NUM_ACCOUNTS]; for (int i = 0; i < accounts.length; i++) { accounts[i] = new Account(NUM_MONEY); } class TransferThread extends Thread { @Override public void run() { for (int i = 0; i < NUM_ITERATIONS; i++) { int fromAcc = random.nextInt(NUM_ACCOUNTS); int toAcc = random.nextInt(NUM_ACCOUNTS); int amount = random.nextInt(NUM_MONEY); transferMoney(accounts[fromAcc], accounts[toAcc], amount); } System.out.println("运行结束!"); } } for (int i = 0;i<NUM_THREADS;i++){ new TransferThread().start(); } } public static void transferMoney(Account from, Account to, int amount) { synchronized (from) { synchronized (to) { if (from.balance - amount < 0) { System.out.println("余额不足,转账失败!"); } else { from.balance -= amount; to.balance += amount; System.out.println("成功转账" + amount + "元"); } } } } //账户对象,拥有属性balance static class Account { int balance; public Account(int balance) { this.balance = balance; } } } 

运行一段时间之后,死锁的现象就出现了,控制台没有输出“运行结束!”,并且进程也未结束。

验证了依然会发生死锁,并且死锁具有传递性,并不是只有一两个线程死锁,而是所有线程都会被锁死

发生死锁必须满足哪些条件

四个条件缺一不可:

  • 互斥条件

一个资源每一次只能被一个进程或者线程同时使用

  • 请求与保持条件

一个线程去请求一把锁,同时它自身还保持一把锁

请求的时候发生阻塞了,保持的锁也不释放

  • 不剥夺条件

没有外界条件来剥夺一个锁的拥有

  • 循环等待条件

各个锁之间存在相互等待的情况,构成环

如何定位死锁

  • jstack

    • 用命令行找到Java的pid(不同操作系统不同,详细去百度吧)
    • 执行${JAVA_HOME}/bin/jstack pid,查找死锁的信息
  • ThreadMXBean

在代码中获取是否发生死锁,如果发生了就打印出信息

在线程启动后,休眠一段时间等待进入死锁,然后进行检验并打印

//等1000毫秒,等它进入死锁 Thread.sleep(1000); ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); long[] deadlockedThreads = threadMXBean.findDeadlockedThreads(); //判断是否有死锁现象 if (deadlockedThreads != null && deadlockedThreads.length > 0) { for (int i = 0; i < deadlockedThreads.length; i++) { ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]); System.out.println("发现死锁:"+threadInfo.getThreadName()); } } 

在多人多次转账的案例中进行检查,运行结果如下

有哪些解决死锁问题的策略?

线上发生死锁怎么办
  • 保存案发现场后立刻重启服务器

  • 暂时保证线上服务的安全,然后在利用刚才保存的信息,排查死锁,修改代码,重新发版

常见修复策略
  1. 避免策略

    • 思路:避免相反的获取锁的顺序
    • 演示:将之前的两个账户转账的代码进行修改,将transferMoney方法代码修改如下

每次加锁前判断两个锁的hash值,如果两个hash值不相等,都是先获取hash值小的锁,再获取hash值大的锁;如果发送hash冲突,就再加一把锁锁住加锁的过程。保证无论什么顺序进行转账,都不会发生死锁

public static void transferMoney(Account from, Account to, int amount) { class Helper { public void transfer() { if (from.balance - amount < 0) { System.out.println("余额不足,转账失败!"); } else { from.balance -= amount; to.balance += amount; System.out.println("成功转账" + amount + "元"); } } } //获取对象的hash值 int fromHash = System.identityHashCode(from); int toHash = System.identityHashCode(to); //通过hash大小的比较,保证获取锁的顺序是一定的 //如果两个账户相互转账,都是先加hash值小的锁,保证了两次加锁的顺序一致,就不会有死锁了 if (fromHash < toHash) { synchronized (from) { synchronized (to) { new Helper().transfer(); } } } else if (fromHash > toHash) { synchronized (to) { synchronized (from) { new Helper().transfer(); } } //hash冲突发生了 } else { synchronized (lock) { synchronized (from) { synchronized (to) { new Helper().transfer(); } } } } } 
  1. 检测与恢复策略
  • 允许发生死锁
  • 每次调用锁都记录在有向图中
  • 定期检查“锁的调用链路图”中是否存在环路
  • 一旦发生死锁,调用死锁恢复机制
    • 线程终止
      逐个终止线程,直到死锁解除,顺序如下:
      • 优先级(前台交互还是后台处理)
      • 已占用资源和还需要的资源
      • 已运行时间
    • 资源抢占
      • 发出去的锁收回来,让线程回退几步
      • 缺点:可能同一个线程一直被抢占,造成饥饿
  1. 鸵鸟策略

如果死锁发送的几率非常低,那么我们就直接忽略它,知道死锁发送的时候,再人工修复

哲学家就餐问题

问题描述

假设有五位哲学家围坐在一张圆形餐桌旁,做以下两件事情之一:吃饭,或者思考。吃东西的时候,他们就停止思考,思考的时候也停止吃东西。餐桌中间有一大碗面,每两个哲学家之间有筷子。吃面需要两支筷子,所以假设哲学家必须用两只筷子吃东西。他们只能使用自己左右手边的那两只筷子。

就餐流程

  • 先拿起左手的筷子
  • 然后拿起右手的筷子
  • 如果筷子被人使用了,那就等别人用完
  • 吃完后,把筷子放回原位
代码演示
/** * 〈演示哲学家就餐问题导致的死锁〉 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class DiningPhilosophers { public static class Philosopher implements Runnable { private Object leftChopstick; private Object rightChopstick; public Philosopher(Object leftChopstick, Object rightChopstick) { this.leftChopstick = leftChopstick; this.rightChopstick = rightChopstick; } @Override public void run() { try { while (true) { doAction("Think"); synchronized (leftChopstick) { doAction("picked up left chopstick"); synchronized (rightChopstick) { doAction("picked up right chopstick"); doAction("put down right chopstick"); } doAction("put down left chopstick"); } } } catch (InterruptedException e) { e.printStackTrace(); } } } private static void doAction(String action) throws InterruptedException { //打印操作 System.out.println(Thread.currentThread().getName() + " " + action); //随机休息 Thread.sleep((long) Math.random() * 100); } public static void main(String[] args) { //定义哲学家 Philosopher[] philosophers = new Philosopher[5]; //定义筷子 Object[] chopticks = new Object[philosophers.length]; //初始化筷子 for (int i = 0; i < chopticks.length; i++) { chopticks[i] = new Object(); } //初始化哲学家 for (int i = 0; i < philosophers.length; i++) { Object leftChopstick = chopticks[i % philosophers.length]; Object rightChopstick = chopticks[(i + 1) % philosophers.length]; philosophers[i] = new Philosopher(leftChopstick, rightChopstick); new Thread(philosophers[i], "哲学家" + (i + 1)+"号 ").start(); } } } 

可能的一种结果:

每个哲学家都拿起来左边的筷子,然后都在等待右边的筷子,进入循环等待的死锁现象

多种解决方案
  • 服务员检查(避免策略)
    由服务员进行判断分配,如果发现可能会发生死锁,不允许就餐
  • 改变一个哲学家拿叉子的顺序(避免策略)
    改变其中一个拿的顺序,破坏环路
  • 餐票(避免策略)
    吃饭必须拿餐票,餐票一共只有4张,吃完了回收
  • 领导调节(检测与恢复策略)
    定时检查,如果发生死锁,随机剥夺一个的筷子
改变一个哲学家拿叉子的顺序的实现

只需要修改哲学家初始化代码,将最后一个哲学家的拿筷子顺序进行交换,将代码

philosophers[i] = new Philosopher(rightChopstick, leftChopstick); 

替换成

if (i == philosophers.length - 1) { philosophers[i] = new Philosopher(rightChopstick, leftChopstick); } else { philosophers[i] = new Philosopher(leftChopstick, rightChopstick); } 

工程中如何避免死锁

  1. 设置超时时间,超时发警报
    • Lock的tryLock(long timeout,TimeUnit unit)
    • synchronized不具备尝试锁的能力
    • 造成超时的可能性很多,发生了死锁,死循环,线程执行慢

代码演示:

/** * 〈用trylock来避免死锁〉 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class TryLockDeadLock implements Runnable { static Lock lock1 = new ReentrantLock(); static Lock lock2 = new ReentrantLock(); int flag = 1; public static void main(String[] args) { TryLockDeadLock r1 = new TryLockDeadLock(); TryLockDeadLock r2 = new TryLockDeadLock(); r1.flag = 1; r2.flag = 0; new Thread(r1).start(); new Thread(r2).start(); } @Override public void run() { for (int i = 0; i < 100; i++) { if (flag == 1) { try { //尝试锁,超时时间800毫秒 if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) { //随机休眠下,造成每次不一样 Thread.sleep(new Random().nextInt(1000)); System.out.println("线程1成功获取锁1"); if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) { System.out.println("线程1成功获取锁2"); System.out.println("线程1成功获取两把锁了"); lock2.unlock(); lock1.unlock(); break; } else { System.out.println("线程1获取锁2失败,已重试"); lock1.unlock(); //随机休眠下,造成每次不一样 Thread.sleep(new Random().nextInt(1000)); } } else { System.out.println("线程1获取锁1失败,已重试"); } } catch (InterruptedException e) { e.printStackTrace(); } } if (flag == 0) { try { //尝试锁,超时时间3000毫秒 if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) { //随机休眠下,造成每次不一样 Thread.sleep(new Random().nextInt(1000)); System.out.println("线程2成功获取锁2"); if (lock1.tryLock(3000, TimeUnit.MILLISECONDS)) { System.out.println("线程2成功获取锁1"); System.out.println("线程2成功获取两把锁了"); lock1.unlock(); lock2.unlock(); break; } else { System.out.println("线程2获取锁2失败,已重试"); lock2.unlock(); //随机休眠下,造成每次不一样 Thread.sleep(new Random().nextInt(1000)); } } else { System.out.println("线程2获取锁1失败,已重试"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } } 

一次运行结果如下:

虽然互斥的拿到了锁,但是获取超时后自动释放了,解决了死锁的情况

  1. 多使用并发类而不是自己设计的类
  • ConcurrentHashMap、ConcurrentLinkedQueue、AtomicBoolean等
  • Java.util.concurrent.atomic中的方法
  • 多用并发集合少用同步集合
  1. 降低锁的使用粒度:使用不同的锁而不是一个锁
  2. 如果能用同步代码块,就不用同步方法:自己指定锁的对象
  3. 新建线程的时候最好起个有意义的名字,方便排查
  4. 避免锁的嵌套实现
  5. 分配资源前先看看能不能收回来:银行家算法
  6. 尽量不要几个功能使用同一个锁:专锁专用

活锁和饥饿

活锁和饥饿也是属于活跃性问题,和死锁有一定的相似性

  • 活锁:任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试,失败,尝试,失败。
  • 饥饿:一个或者多个线程因为种种原因无法获得所需要的资源,导致一直无法执行的状态。
  • 活锁与死锁的区别在于活锁并不是尝试一次不能获取锁就阻塞了,而是动态的一直尝试获取锁,并且有可能解开
  • Java 中导致饥饿的原因:
    • 高优先级线程吞噬所有的低优先级线程的 CPU 时间。
    • 线程被永久堵塞在一个等待进入同步块的状态,因为其他线程总是能在它之前持续地对该同步块进行访问。
    • 线程在等待一个本身也处于永久等待完成的对象(比如调用这个对象的 wait 方法),因为其他线程总是被持续地获得唤醒

本文整理自Java并发底层原理精讲

本文如有表述错误或者代码错误的,请及时联系我更正


更多Java面试复习笔记和总结可访问我的面试复习专栏《Java面试复习笔记》,或者访问我另一篇博客《Java面试核心知识点汇总》查看目录和直达链接