关于死锁,你知道多少?
本文就什么是死锁?怎么找到死锁?怎么解决死锁?怎么避免死锁等问题展开分析,通过大量的代码和案例演示向大家描述死锁的前世今生。
快速到达看这里
死锁是什么,有什么危害?
定义
- 并发情况下,当两个(或多个)线程(或进程)相互持有对方所需要的资源,又不主动释放,导致所有人都无法继续前进,程序无限阻塞,就是死锁
两个线程:
多个线程:
危害
-
死锁的影响在不同系统中是不一样的,这取决于系统对死锁的处理能力
- 数据库中:检测并放弃事务
- JVM中:无法自动处理
-
死锁的几率不高但是危害大
- 一旦发生,多是高并发场景,影响用户多
- 整个系统崩溃,子系统崩溃,性能降低
- 压力测试无法找到所有的死锁
写一个死锁的例子
案例一:必然发生死锁
第一个线程拿到锁o1后等待500毫秒,这段时间第二个线程可以拿到锁o2
然后线程1等待锁o2,线程2等待锁o1
造成程序无限阻塞的现象
代码演示如下:
/** * 〈必定发生死锁的现象〉 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class MustDeadLock implements Runnable { int flag = 1; static Object o1 = new Object(); static Object o2 = new Object(); public static void main(String[] args) { MustDeadLock r1 = new MustDeadLock(); MustDeadLock r2 = new MustDeadLock(); r1.flag = 1; r2.flag = 0; Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); t2.start(); } @Override public void run() { System.out.println(Thread.currentThread().getName()+"开始了,flag = " + flag); if (flag == 1) { synchronized (o1){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o2){ System.out.println("线程1拿到两把锁"); } } } else if (flag == 0) { synchronized (o2){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (o1){ System.out.println("线程2拿到两把锁"); } } } } }
案例二:两个账户转账
模拟两个账户进行转账
-
如果线程获得一个锁后等待500毫秒,会出现和案例一的死锁现象
-
如果线程获得一个锁之后不等待500毫秒,只有很小的几率才会发生死锁,通常测试都会正常执行。
代码演示如下:
/** * 〈转账时出现死锁〉 * 一旦注释打开,发生死锁 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class TransferMoney implements Runnable { int flag = 1; static Account a = new Account(500); static Account b = new Account(500); public static void main(String[] args) throws InterruptedException { TransferMoney r1 = new TransferMoney(); TransferMoney r2 = new TransferMoney(); r1.flag = 1; r2.flag = 0; Thread t1 = new Thread(r1); Thread t2 = new Thread(r2); t1.start(); ; t2.start(); t1.join(); t2.join(); System.out.println("a的余额" + a.balance); System.out.println("b的余额" + b.balance); } @Override public void run() { if (flag == 1) { transferMoney(a, b, 200); } if (flag == 0) { transferMoney(b, a, 200); } } public static void transferMoney(Account from, Account to, int amount) { synchronized (from) { //如果休眠500毫秒,那么另一个线程就会拿到to锁,造成相互等待的死锁现象 // try { // Thread.sleep(500); // } catch (InterruptedException e) { // e.printStackTrace(); // } synchronized (to) { if (from.balance - amount < 0) { System.out.println("余额不足,转账失败!"); } else { from.balance -= amount; to.balance += amount; System.out.println("成功转账" + amount + "元"); } } } } //账户对象,拥有属性balance static class Account { int balance; public Account(int balance) { this.balance = balance; } } }
案例三:多人多次转账
如果两个锁之间不进行等待,很难发生死锁
为了验证不等待也会发生死锁,并且死锁的发生是具有传递性的(而不是仅有少数锁住其他正常运行的),下面我们来完成多人多次转账案例
设置500个账户,每个线程进行操作,并且每个线程转账100000次。每次转账的账户和金额都是随机产生的
演示代码如下:
/** * 〈模拟多人随机转账〉 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class MultiTransferMoney { //账户数 private static final int NUM_ACCOUNTS = 500; //账户金额 private static final int NUM_MONEY = 1000; //每人转账次数 private static final int NUM_ITERATIONS = 100000; //同时转账人数 private static final int NUM_THREADS = 5000; public static void main(String[] args) { Random random = new Random(); Account[] accounts = new Account[NUM_ACCOUNTS]; for (int i = 0; i < accounts.length; i++) { accounts[i] = new Account(NUM_MONEY); } class TransferThread extends Thread { @Override public void run() { for (int i = 0; i < NUM_ITERATIONS; i++) { int fromAcc = random.nextInt(NUM_ACCOUNTS); int toAcc = random.nextInt(NUM_ACCOUNTS); int amount = random.nextInt(NUM_MONEY); transferMoney(accounts[fromAcc], accounts[toAcc], amount); } System.out.println("运行结束!"); } } for (int i = 0;i<NUM_THREADS;i++){ new TransferThread().start(); } } public static void transferMoney(Account from, Account to, int amount) { synchronized (from) { synchronized (to) { if (from.balance - amount < 0) { System.out.println("余额不足,转账失败!"); } else { from.balance -= amount; to.balance += amount; System.out.println("成功转账" + amount + "元"); } } } } //账户对象,拥有属性balance static class Account { int balance; public Account(int balance) { this.balance = balance; } } }
运行一段时间之后,死锁的现象就出现了,控制台没有输出“运行结束!”,并且进程也未结束。
验证了依然会发生死锁,并且死锁具有传递性,并不是只有一两个线程死锁,而是所有线程都会被锁死
发生死锁必须满足哪些条件
四个条件缺一不可:
- 互斥条件
一个资源每一次只能被一个进程或者线程同时使用
- 请求与保持条件
一个线程去请求一把锁,同时它自身还保持一把锁
请求的时候发生阻塞了,保持的锁也不释放
- 不剥夺条件
没有外界条件来剥夺一个锁的拥有
- 循环等待条件
各个锁之间存在相互等待的情况,构成环
如何定位死锁
-
jstack
- 用命令行找到Java的pid(不同操作系统不同,详细去百度吧)
- 执行${JAVA_HOME}/bin/jstack pid,查找死锁的信息
-
ThreadMXBean
在代码中获取是否发生死锁,如果发生了就打印出信息
在线程启动后,休眠一段时间等待进入死锁,然后进行检验并打印
//等1000毫秒,等它进入死锁 Thread.sleep(1000); ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); long[] deadlockedThreads = threadMXBean.findDeadlockedThreads(); //判断是否有死锁现象 if (deadlockedThreads != null && deadlockedThreads.length > 0) { for (int i = 0; i < deadlockedThreads.length; i++) { ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]); System.out.println("发现死锁:"+threadInfo.getThreadName()); } }
在多人多次转账的案例中进行检查,运行结果如下
有哪些解决死锁问题的策略?
线上发生死锁怎么办
-
保存案发现场后立刻重启服务器
-
暂时保证线上服务的安全,然后在利用刚才保存的信息,排查死锁,修改代码,重新发版
常见修复策略
-
避免策略
- 思路:避免相反的获取锁的顺序
- 演示:将之前的两个账户转账的代码进行修改,将transferMoney方法代码修改如下
每次加锁前判断两个锁的hash值,如果两个hash值不相等,都是先获取hash值小的锁,再获取hash值大的锁;如果发送hash冲突,就再加一把锁锁住加锁的过程。保证无论什么顺序进行转账,都不会发生死锁
public static void transferMoney(Account from, Account to, int amount) { class Helper { public void transfer() { if (from.balance - amount < 0) { System.out.println("余额不足,转账失败!"); } else { from.balance -= amount; to.balance += amount; System.out.println("成功转账" + amount + "元"); } } } //获取对象的hash值 int fromHash = System.identityHashCode(from); int toHash = System.identityHashCode(to); //通过hash大小的比较,保证获取锁的顺序是一定的 //如果两个账户相互转账,都是先加hash值小的锁,保证了两次加锁的顺序一致,就不会有死锁了 if (fromHash < toHash) { synchronized (from) { synchronized (to) { new Helper().transfer(); } } } else if (fromHash > toHash) { synchronized (to) { synchronized (from) { new Helper().transfer(); } } //hash冲突发生了 } else { synchronized (lock) { synchronized (from) { synchronized (to) { new Helper().transfer(); } } } } }
- 检测与恢复策略
- 允许发生死锁
- 每次调用锁都记录在有向图中
- 定期检查“锁的调用链路图”中是否存在环路
- 一旦发生死锁,调用死锁恢复机制
- 线程终止
逐个终止线程,直到死锁解除,顺序如下:- 优先级(前台交互还是后台处理)
- 已占用资源和还需要的资源
- 已运行时间
- 资源抢占
- 发出去的锁收回来,让线程回退几步
- 缺点:可能同一个线程一直被抢占,造成饥饿
- 线程终止
- 鸵鸟策略
如果死锁发送的几率非常低,那么我们就直接忽略它,知道死锁发送的时候,再人工修复
哲学家就餐问题
问题描述
假设有五位哲学家围坐在一张圆形餐桌旁,做以下两件事情之一:吃饭,或者思考。吃东西的时候,他们就停止思考,思考的时候也停止吃东西。餐桌中间有一大碗面,每两个哲学家之间有筷子。吃面需要两支筷子,所以假设哲学家必须用两只筷子吃东西。他们只能使用自己左右手边的那两只筷子。
就餐流程
- 先拿起左手的筷子
- 然后拿起右手的筷子
- 如果筷子被人使用了,那就等别人用完
- 吃完后,把筷子放回原位
代码演示
/** * 〈演示哲学家就餐问题导致的死锁〉 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class DiningPhilosophers { public static class Philosopher implements Runnable { private Object leftChopstick; private Object rightChopstick; public Philosopher(Object leftChopstick, Object rightChopstick) { this.leftChopstick = leftChopstick; this.rightChopstick = rightChopstick; } @Override public void run() { try { while (true) { doAction("Think"); synchronized (leftChopstick) { doAction("picked up left chopstick"); synchronized (rightChopstick) { doAction("picked up right chopstick"); doAction("put down right chopstick"); } doAction("put down left chopstick"); } } } catch (InterruptedException e) { e.printStackTrace(); } } } private static void doAction(String action) throws InterruptedException { //打印操作 System.out.println(Thread.currentThread().getName() + " " + action); //随机休息 Thread.sleep((long) Math.random() * 100); } public static void main(String[] args) { //定义哲学家 Philosopher[] philosophers = new Philosopher[5]; //定义筷子 Object[] chopticks = new Object[philosophers.length]; //初始化筷子 for (int i = 0; i < chopticks.length; i++) { chopticks[i] = new Object(); } //初始化哲学家 for (int i = 0; i < philosophers.length; i++) { Object leftChopstick = chopticks[i % philosophers.length]; Object rightChopstick = chopticks[(i + 1) % philosophers.length]; philosophers[i] = new Philosopher(leftChopstick, rightChopstick); new Thread(philosophers[i], "哲学家" + (i + 1)+"号 ").start(); } } }
可能的一种结果:
每个哲学家都拿起来左边的筷子,然后都在等待右边的筷子,进入循环等待的死锁现象
多种解决方案
- 服务员检查(避免策略)
由服务员进行判断分配,如果发现可能会发生死锁,不允许就餐 - 改变一个哲学家拿叉子的顺序(避免策略)
改变其中一个拿的顺序,破坏环路 - 餐票(避免策略)
吃饭必须拿餐票,餐票一共只有4张,吃完了回收 - 领导调节(检测与恢复策略)
定时检查,如果发生死锁,随机剥夺一个的筷子
改变一个哲学家拿叉子的顺序的实现
只需要修改哲学家初始化代码,将最后一个哲学家的拿筷子顺序进行交换,将代码
philosophers[i] = new Philosopher(rightChopstick, leftChopstick);
替换成
if (i == philosophers.length - 1) { philosophers[i] = new Philosopher(rightChopstick, leftChopstick); } else { philosophers[i] = new Philosopher(leftChopstick, rightChopstick); }
工程中如何避免死锁
- 设置超时时间,超时发警报
- Lock的tryLock(long timeout,TimeUnit unit)
- synchronized不具备尝试锁的能力
- 造成超时的可能性很多,发生了死锁,死循环,线程执行慢
代码演示:
/** * 〈用trylock来避免死锁〉 * * @author Chkl * @create 2020/3/9 * @since 1.0.0 */ public class TryLockDeadLock implements Runnable { static Lock lock1 = new ReentrantLock(); static Lock lock2 = new ReentrantLock(); int flag = 1; public static void main(String[] args) { TryLockDeadLock r1 = new TryLockDeadLock(); TryLockDeadLock r2 = new TryLockDeadLock(); r1.flag = 1; r2.flag = 0; new Thread(r1).start(); new Thread(r2).start(); } @Override public void run() { for (int i = 0; i < 100; i++) { if (flag == 1) { try { //尝试锁,超时时间800毫秒 if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) { //随机休眠下,造成每次不一样 Thread.sleep(new Random().nextInt(1000)); System.out.println("线程1成功获取锁1"); if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) { System.out.println("线程1成功获取锁2"); System.out.println("线程1成功获取两把锁了"); lock2.unlock(); lock1.unlock(); break; } else { System.out.println("线程1获取锁2失败,已重试"); lock1.unlock(); //随机休眠下,造成每次不一样 Thread.sleep(new Random().nextInt(1000)); } } else { System.out.println("线程1获取锁1失败,已重试"); } } catch (InterruptedException e) { e.printStackTrace(); } } if (flag == 0) { try { //尝试锁,超时时间3000毫秒 if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) { //随机休眠下,造成每次不一样 Thread.sleep(new Random().nextInt(1000)); System.out.println("线程2成功获取锁2"); if (lock1.tryLock(3000, TimeUnit.MILLISECONDS)) { System.out.println("线程2成功获取锁1"); System.out.println("线程2成功获取两把锁了"); lock1.unlock(); lock2.unlock(); break; } else { System.out.println("线程2获取锁2失败,已重试"); lock2.unlock(); //随机休眠下,造成每次不一样 Thread.sleep(new Random().nextInt(1000)); } } else { System.out.println("线程2获取锁1失败,已重试"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } }
一次运行结果如下:
虽然互斥的拿到了锁,但是获取超时后自动释放了,解决了死锁的情况
- 多使用并发类而不是自己设计的类
- ConcurrentHashMap、ConcurrentLinkedQueue、AtomicBoolean等
- Java.util.concurrent.atomic中的方法
- 多用并发集合少用同步集合
- 降低锁的使用粒度:使用不同的锁而不是一个锁
- 如果能用同步代码块,就不用同步方法:自己指定锁的对象
- 新建线程的时候最好起个有意义的名字,方便排查
- 避免锁的嵌套实现
- 分配资源前先看看能不能收回来:银行家算法
- 尽量不要几个功能使用同一个锁:专锁专用
活锁和饥饿
活锁和饥饿也是属于活跃性问题,和死锁有一定的相似性
- 活锁:任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试,失败,尝试,失败。
- 饥饿:一个或者多个线程因为种种原因无法获得所需要的资源,导致一直无法执行的状态。
- 活锁与死锁的区别在于活锁并不是尝试一次不能获取锁就阻塞了,而是动态的一直尝试获取锁,并且有可能解开
- Java 中导致饥饿的原因:
- 高优先级线程吞噬所有的低优先级线程的 CPU 时间。
- 线程被永久堵塞在一个等待进入同步块的状态,因为其他线程总是能在它之前持续地对该同步块进行访问。
- 线程在等待一个本身也处于永久等待完成的对象(比如调用这个对象的 wait 方法),因为其他线程总是被持续地获得唤醒
本文整理自Java并发底层原理精讲
本文如有表述错误或者代码错误的,请及时联系我更正
更多Java面试复习笔记和总结可访问我的面试复习专栏《Java面试复习笔记》,或者访问我另一篇博客《Java面试核心知识点汇总》查看目录和直达链接