多线程
- JUC
JUC就是java.util.concurrent下面的类包,专门用于多线程的开发。
实现多线程的三种方式:
- Thread:
- Thread类是在java.lang包中定义的。
- 一个类只要继承了Thread类同时覆写了本类中的run()方法就可以实现多线程操作了
- 一个类只能继承一个父类,这是此方法的局限。
- Runnable
- 使用Runnable定义的子类中没有start()方法,只有Thread类中才有。
- Thread类,有一个构造方法:public Thread(Runnable targer)此构造方法接受Runnable的子类实例,也就是说可以通过Thread类来启动Runnable实现的多线程。
- 优势:
- 避免点继承的局限,一个类可以继承多个接口。
- 适合于资源的共享
- Callable
- Callable 和 Runnable 的使用方法大同小异, 区别在于:
- Callable 使用 call() 方法, Runnable 使用 run() 方法
- call() 可以返回值, 而 run()方法不能返回。
- call() 可以抛出受检查的异常,比如ClassNotFoundException, 而run()不能抛出受检查的异常。
- Callable 和 Runnable 的使用方法大同小异, 区别在于:
- 线程和进程
这里只是提到相关的点,如果要仔细理解线程和进程,麻烦到操作系统文件去观看
一个进程往往可以包含多个线程,至少包含一个
Java默认有2个线程
- mian(主线程)
- GC(垃圾回收线程)
对于Java而言的线程:Thread、Runnable、Callable
Java 不可以直接开启线程(因为Java是运行在虚拟机系统上的,不能直接与硬件进行交互)
public synchronized void start() { if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); boolean started = false; try { // private native void start0(); 这是一个本地方法,底层的C++ start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } }
并发与并行
并发编程:并发、并行
并发编程的本质:充分利用CPU的资源
- 并发(多线程操作同一个资源)
CPU 一核 ,模拟出来多条线程,其实是线程之间轮换着占用CPU的资源 - 并行
CPU 多核 ,多个线程可以同时执行,一般利用线程池。
System.out.println(Runtime.getRuntime().availableProcessors()); // 获取cpu的核数
线程的状态
public enum State {
// 新生
NEW,
// 运行
RUNNABLE,
// 阻塞
BLOCKED,
// 等待(死等)
WAITING,
// 超时等待
TIMED_WAITING,
// 终止
TERMINATED;
}wait & sleep
- 来自不同的类
- wait => Object
- sleep => Thread
- 关于锁的释放
- wait 会释放锁
- sleep不会释放
- 使用的范围是不同的
- wait:在同步代码块中
- sleep 可以再任何地方睡
- 是否需要捕获异常
- wait 不需要捕获异常
- sleep 必须要捕获异常
- Lock锁
使用基本的买票案例来展示
Synchronized
平常我们都是创建一个线程,然后把属性和操作都写在里面,然后运行,但是实际工作中,我们应该将资源类抽象出来,然后丢入线程进行运行,这样才符合OOP,面向对象编程。
当有一些函数式接口时,可以使用lambda表达式(jdk1.8之后才有),基本的格式是(参数)->{ 代码 }。
public class Demo1 {
public static void main(String[] args) { // 并发:多线程操作同一个资源类,把资源丢入线程 Ticket ticket = new Ticket(); // @FunctionalInterface 函数式接口,jdk1.8 lambda表达式 (参数)->{ 代码 } new Thread(() -> { for (int i = 1; i < 40; i++){ ticket.sale(); } }, "A").start(); new Thread(() -> { for (int i = 1; i < 40; i++){ ticket.sale(); } }, "B").start(); new Thread(() -> { for (int i = 1; i < 40; i++){ ticket.sale(); } }, "C").start(); }}
// 资源类OOP
class Ticket {// 属性、方法 private int number = 50; // 卖票的方式 // synchronized 本质: 队列,锁 public synchronized void sale(){ if (number > 0) System.out.println(Thread.currentThread().getName() + "剩余:" + --number); }}
Lock
- 官方文档的推荐格式:
Lock lock = new ReentrantLock(); lock.lock(); try{ // 需要加锁的方法体 }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); }
- lock种类:
- ReentrantLock:常用的可重入锁
- ReentrantReadWriteLock.ReadLock:读锁
- ReentrantReadWriteLock.WriteLock:写锁
- 公平锁:十分公平,先来后到
- 非公平锁:不公平,可以插队 (默认)
public class Demo2 {
public static void main(String[] args) {
Ticket2 ticket2 = new Ticket2();
new Thread(() -> { for (int i = 1; i < 40; i++) ticket2.sale(); }, "A").start();
new Thread(() -> { for (int i = 1; i < 40; i++) ticket2.sale(); }, "B").start();
new Thread(() -> { for (int i = 1; i < 40; i++) ticket2.sale(); }, "C").start();
}
}
class Ticket2 {
private int number = 50;
Lock lock = new ReentrantLock();
// 卖票的方式
// synchronized 本质: 队列,锁
public void sale(){
lock.lock();
try{
if (number > 0)
System.out.println(Thread.currentThread().getName() + "剩余:" + --number);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}Synchronize & ReentrantLock
相同点
- 它们都是加锁方式同步,都是阻塞性同步,可以理解为重量级锁。
- 阻塞和唤醒的代价比较高,因为总是要在内核态和用户态进行转换。
不同点
- Synchronize是Java语言的关键字,是原生语法层面上的互斥,是由JVM实现的。
- ReentrantLock是API层面上的锁,需要lock()和unlock()进行手动加锁和解锁,并结合try()和finally()语句来完成。
- Synchronize可以修饰方法和代码块,而ReentrantLock一般直接加在所要同步的代码块前后。
- Synchronized 线程 1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去;
- ReentrantLock是等待可中断的,如果一个线程阻塞太久,可以选择中断,然后去做其他事情,而Synchronize不行,要一直阻塞等待。
- Synchronize只能是非公平锁,ReentrantLock可以通过一个布尔值去改变公平性,默认也是非公平锁。
- ReentrantLock可以绑定多个Condition对象,但是Synchronize如果要多条件关联的话,只能多加一把锁。
- Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码
- 生产者和消费者
Synchronized 版
public class ProductConsume {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{for(int i = 0;i < 10;i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(()->{for(int i = 0;i < 10;i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
class Data {
private int num = 0;
public synchronized void increment() throws InterruptedException {
if (num != 0){
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName()+"=>"+num);
this.notify();
}
public synchronized void decrement() throws InterruptedException {
if (num == 0){
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName()+"=>"+num);
this.notify();
}
}- 问题存在,A B C D 4 个线程! 虚假唤醒
问题解决: if 改为 while 判断(因为if只判断一次,但是while会不停地判断)
public synchronized void increment() throws InterruptedException {
while (num != 0){ this.wait(); } num++; System.out.println(Thread.currentThread().getName()+"=>"+num); this.notify();}
JUC 版
Condition精准控制和唤醒线程
public class ProductConsume2 {
public static void main(String[] args) {
Data2 data = new Data2();
new Thread(()->{ for (int i = 0; i < 5; i++) data.printA(); }, "A").start();
new Thread(()->{ for (int i = 0; i < 5; i++) data.printB(); }, "B").start();
new Thread(()->{ for (int i = 0; i < 5; i++) data.printC(); }, "C").start();
}
}
class Data2 {
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private int num = 1;
public void printA(){
lock.lock();
try{
while (num != 1){
condition1.await();
}
System.out.println(Thread.currentThread().getName() + "=> AAAAAA");
num = 2;
condition2.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void printB(){
lock.lock();
try{
while (num != 2){
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "=> BBBBBB");
num = 3;
condition3.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void printC(){
lock.lock();
try{
while (num != 3){
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "=> CCCCCC");
num = 1;
condition1.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
lock.unlock();
}
}
}8个锁现象
public class Test1 {
public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sendSms();},"A").start(); // 等待 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone.call();},"B").start(); }}
class Phone{// synchronized 锁的对象是方法的调用者!、 // 两个方法用的是同一个锁,谁先拿到谁执行! public synchronized void sendSms(){ System.out.println("发短信"); } public synchronized void call(){ System.out.println("打电话"); }}
标准情况下(也就是以上无延迟版),两个线程先打印 发短信还是 打电话?
- 是先打印发短信,然后再打印打电话
- 由于他们两个方法使用synchronized锁,它锁住的是这个对象。
- 所以要等先获得锁的方法释放锁,才可以轮到下一个方法。
- sendSms延迟4秒,和上述情况一致,因为锁的状态还是没变。
public synchronized void sendSms(){ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); }
- 增加了一个普通方法后,先执行发短信还是Hello
public void hello(){ System.out.println("hello"); }- 这里没有锁,不是同步方法,不受锁的影响,所以先执行hello。
两个对象,两个同步方法, 发短信还是 打电话
- 两个对象拥有的锁是不同的,所以它们之间线程方法的执行不受到锁的影响。
增加两个静态的同步方法,只有一个对象,先打印 发短信?打电话
class Phone3{ // synchronized 锁的对象是方法的调用者! // static 静态方法 // 类一加载就有了!锁的是Class public static synchronized void sendSms(){ try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public static synchronized void call(){ System.out.println("打电话"); } }- synchronized 锁的对象是这个类,也是唯一的。也会跟情况1一样考虑锁。
接下来的三种情况和上面的无异,都是判断锁的对象是否唯一,然后再看是否延迟和运行的先后顺序。
- 两个对象,增加两个静态的同步方法。
- 1个静态的同步方法,1个普通的同步方法 ,一个对象。
- 1个静态的同步方法,1个普通的同步方法 ,两个对象。
- 集合类的安全性
ArrayList & Vector & CopyOnWriteArrayList
直接使用ArrayList
public class ListTest {
public static void main(String[] args) { List<String> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(list); }, String.valueOf(i)).start(); } }}
可能会导致java.util.ConcurrentModificationException并发修改异常
可以使用 Collections.synchronizedList(new ArrayList<>()); 将该集合同步化。
- 使用List<string> list = new Vector<>();,Vector是一个线程安全的类,因为它的方法几乎都被synchronized锁住。
public synchronized void setElementAt(E obj, int index) { if (index >= elementCount) { throw new ArrayIndexOutOfBoundsException(index + " >= " + elementCount); } elementData[index] = obj; }</string> - 使用List<string> list = new CopyOnWriteArrayList<>();,CopyOnWriteArrayList也是一个线程安全的类,它读时候不加锁,写的时候复制副本。(不能保证实时一致性,只能保证最终一致性)
public boolean add(E e) { final ReentrantLock lock = this.lock; // 使用新版锁 lock.lock(); try { Object[] elements = getArray(); int len = elements.length; // 先复制一份 Object[] newElements = Arrays.copyOf(elements, len + 1); // 写入 newElements[len] = e; // 再从新设置回去 setArray(newElements); return true; } finally { lock.unlock(); } }</string>
HashSet & CopyOnWriteHashSet
HashSet也是会导致并发修改异常,可以使用以下两种方式解决:
- Collections.synchronizedList(new HashSet<>())
- CopyOnWriteHashSet
HashMap & ConcurrentHashMap
HashMap也是线程不安全的,而且还要明白:
- 工作中一般不用 HashMap
- 默认等价于 new HashMap<>(16,0.75)
解决方法同上。
- Callable ( 简单 )
Callable 和 Runnable 的使用方法大同小异, 区别在于:
- Callable 使用 call() 方法, Runnable 使用 run() 方法
- call() 可以返回值, 而 run()方法不能返回。
- call() 可以抛出受检查的异常,比如ClassNotFoundException, 而run()不能抛出受检查的异常。
Callable使用
- 我们比较好的启动线程的方式new Thread().start();
- 但是Thread类只能传入Runnable接口,而无法传入Callable
- 所以我们应该要借助FutureTask,使得Runnable和Callable可以建立联系
- FutureTask是Runnable的实现类,在它的构造方法中可以传入Callable
使用方法:
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 资源类
MyThread myThread = new MyThread();
// 适配类
FutureTask futureTask = new FutureTask(myThread);
new Thread(futureTask, "A").start();
new Thread(futureTask, "B").start();
Integer o = (Integer)futureTask.get(); // 这个get 方法可能会产生阻塞!把他放到最后\
System.out.println(o);
}
}
// 泛型是它返回值的类型
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("call()");
return 1024;
}
}- futureTask里面用的是一个state存放线程状态,outcome存放线程结果。所以只会打印一次call。
- 结果会被阻塞,可能会需要等待。
- 常用的辅助类
CountDownLatch
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
// 总计数是6,等到任务要执行的时候再使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + " Go out");
// 计数减一
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
// 等待计数完成,继续下面的操作
countDownLatch.await();
System.out.println("Close door");
}
}每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续执行!
CyclicBarrier
其实这是一个加法计数器,到达一定数量之后才可以继续进行。
public class CyclicBarrierTest {
public static void main(String[] args) {
// 到达七个线程才可以继续
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{
System.out.println("到达计数器之后的操作!");
});
for (int i = 1; i <= 10 ; i++) {
new Thread(()->{
System.out.println("This is Thread-" + Thread.currentThread().getName());
try{
// 等待
cyclicBarrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch (BrokenBarrierException e){
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}- 就是在线程内部设置一个屏障,阻隔这些线程的下一步操作,然后当达到计数标准后,统一放行。
Semaphore
Semaphore:信号量
一般使用在多个共享资源互斥的使用!并发限流,控制最大的线程数!
public class SemaphoreTest {
public static void main(String[] args) { Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 6 ; i++) { new Thread(()->{ try{ // 尝试获取信号量,假设如果已经满了,等待,等待被释放为止 semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "get"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "out"); }catch(Exception e){ e.printStackTrace(); }finally{ // 释放信号量,会将当前的信号量释放 + 1,然后唤醒等待的线程 semaphore.release(); } }, String.valueOf(i)).start(); } }}
- 读写锁
/**
* 独占锁(写锁) 一次只能被一个线程占有
* 共享锁(读锁) 多个线程可以同时占有
* ReadWriteLock
* 读-读 可以共存!
* 读-写 不能共存!
* 写-写 不能共存!
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
// 读取
for (int i = 1; i <= 5 ; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
// 自定义缓存
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
// 读写锁: 更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 存,写入的时候,只希望同时只有一个线程写
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入OK");
}catch(Exception e){
e.printStackTrace();
}finally{
readWriteLock.writeLock().unlock();
}
}
// 取,读,所有人都可以读!
public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}- 阻塞队列
FIFO,队列满了必须要阻塞等待才可进入,队列空了必须阻塞等待队列有元素了才可以取
使用场景:多线程并发处理,线程池
队列的相关继承树
四组API
方式 抛出异常 有返回值,不抛出异常 阻塞 等待 超时等待 添加 add offer() put() offer(,,) 移除 remove poll() take() poll(,)
检测队首元素 element peek - -
/**
* 抛出异常
*/
public static void test1(){
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// IllegalStateException: Queue full 抛出异常!
// System.out.println(blockingQueue.add("d"));
System.out.println("=-===========");
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// java.util.NoSuchElementException 抛出异常!
// System.out.println(blockingQueue.remove());
}
/**
* 有返回值,没有异常
*/
public static void test2(){
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("d")); // false 不抛出异常!
System.out.println("============================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll()); // null 不抛出异常!
}
/**
* 等待,阻塞(一直阻塞)
*/
public static void test3() throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
// 一直阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d"); // 队列没有位置了,一直阻塞
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take()); // 没有这个元素,一直阻塞
}
/**
* 等待,阻塞(等待超时)
*/
public static void test4() throws InterruptedException {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
// blockingQueue.offer("d",2,TimeUnit.SECONDS); // 等待超过2秒就退出
System.out.println("===============");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
blockingQueue.poll(2,TimeUnit.SECONDS); // 等待超过2秒就退出
}SynchronousQueue 同步队列
没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
/**
* 同步队列
* 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素
* put了一个元素,必须从里面先take取出来,否则不能在put进去值!
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try{
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
}catch(InterruptedException e){
e.printStackTrace();
}
}, "T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}- 线程池
线程池的好处
- 降低资源的消耗。可以重复利用已创建的线程,减少大量的线程池创建和销毁的消耗。
- 提高响应速度。不需要等待线程创建完成,任务到达可以直接立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
- 创建和销毁线程由线程池管理,能设置非核心线程的存活时间。
- 能够控制最大线程数。
- 提供定时执行的功能。
Executors的三大方法
/**
* 1.创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程
* 2.当任务数增加时,此线程池又可以智能的添加新线程来处理任务
* 3.此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
* 4.适用:执行很多短期异步的小程序或者负载较轻的服务器
*/
public void testCacheThreadPool(){
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i <= 10; i++) {
final int ii = i;
try {
Thread.sleep(ii * 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(()->System.out.println("线程名称:" + Thread.currentThread().getName() + ",执行" + ii));
}
}
/**
* 1.创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小
* 2.线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程
* 3.因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字,和线程名称
* 4.适用:执行长期的任务,性能好很多
*/
public void testFixedThreadPool(){
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int ii = i;
fixedThreadPool.execute(()-> System.out.println("线程名称:" + Thread.currentThread().getName() + ",执行" + ii));
try {
Thread.sleep(ii);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
* 适用:一个任务一个任务执行的场景
*/
public void testSingleThreadExecutor(){
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int ii = i;
try {
Thread.sleep(ii);
} catch (InterruptedException e) {
e.printStackTrace();
}
singleThreadExecutor.execute(()-> System.out.println("线程名称:" + Thread.currentThread().getName() + ",执行" + ii));
}
}
/**
* 创建一个定长线程池,支持定时及周期性任务执行。延迟执行
* 这是一种按照超时时间排序的队列结构
* 适用:周期性执行任务的场景
*/
public void testScheduledThreadPool(){
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
Runnable r1 = () -> System.out.println("线程名称:" + Thread.currentThread().getName() + ",执行:3秒后执行");
scheduledThreadPool.schedule(r1, 3, TimeUnit.SECONDS);
Runnable r2 = () -> System.out.println("线程名称:" + Thread.currentThread().getName() + ",执行:延迟2秒后每3秒执行一次");
scheduledThreadPool.scheduleAtFixedRate(r2, 2, 3, TimeUnit.SECONDS);
Runnable r3 = () -> System.out.println("线程名称:" + Thread.currentThread().getName() + ",执行:普通任务");
for (int i = 0; i < 5; i++) {
scheduledThreadPool.execute(r3);
}
}ThreadPoolExecutor七大参数
阿里巴巴的规范
三大方法源码分析:
其实都是调用了ThreadPoolExecutor创建的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 本质ThreadPoolExecutor(),使用七大参数进行控制
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
int maximumPoolSize, // 最大核心线程池大小
long keepAliveTime, // 超时了没有人调用就会释放
TimeUnit unit, // 超时单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂:创建线程的,一般不用动
RejectedExecutionHandler handle // 拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}自定义线程池(推荐)
/**
* new ThreadPoolExecutor.AbortPolicy() // 银行满了,还有人进来,不处理这个人的,抛出异
常
* new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!
* new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
* new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了,尝试去和最早的竞争,也不会
抛出异常!
*/
public class Demo01 {
public static void main(String[] args) {
// 自定义线程池!工作 ThreadPoolExecutor
ExecutorService threadPool = new ThreadPoolExecutor(2, 5, 3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了,尝试去和最早的竞争,也不会抛出异常!
try {
// 最大承载:Deque + max
// 超过 RejectedExecutionException
for (int i = 1; i <= 9; i++) {
// 使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}线程池的工作原理
先看一个例子帮忙理解:
- 一家银行总共有5个前台服务(模拟线程池最大线程数),但是一开始只开启2个(模拟核心线程数)。
- 人来了,如果开放的前台服务有空闲,直接去前台办理业务。
- 如果前台已经满了,就到候客区进行等待(模拟阻塞队列)。
- 如果阻塞队列也满了,那么就要开放其他的前台服务,不然不够用。
- 如果所有的5个前台服务都已经开放,且已经有人在办理业务,那么你就只能是在门口等,或者直接回家,或者去其他银行办理(模拟拒绝策略)。
线程池工作流程:
- 一个线程进入线程池,先判断核心线程线程池里的线程是否都在执行状态,如果还没满,那么久建立一个新的工作线程来执行,如果已满就执行下一步。
- 查看工作队列是否已经满了,如果还没满,那么就加入工作队列,如果已经满了,那么只能执行第三步。
- 查看线程池是否所有线程都在工作状态,如果不是,创建一个新线程执行,如果已满,执行饱和策略。
线程池饱和策略
- AbortPolicy:为java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
- DiscardPolicy:直接抛弃,任务不执行,空方法
- DiscardOldestPolicy:从队列里面抛弃head的一个任务,并再次execute 此task。
- CallerRunsPolicy
在调用execute的线程里面执行此command,会阻塞入口 - 用户自定义拒绝策略(最常用)实现RejectedExecutionHandler,并自己定义策略模式
线程池调优
其实就是两种调整最大线程数量的方向
- CPU密集型:几核,就是几,可以保持CPU的效率最高!
- IO密集型:判断你程序中十分耗IO的线程,然后比它更多的线程数量,一般是两倍。
- 四大函数式接口
函数式接口: 只有一个方法的接口
- Function:函数型接口
- Predicate:断定型接口
- Consumer:消费型接口
- Supplier:供给型接口
Function函数式接口
/**
* Function 函数型接口, 有一个输入参数,有一个输出
* 只要是 函数型接口 可以 用 lambda表达式简化
*/
public class Demo1 {
public static void main(String[] args) {
// Function<String, String> function = new Function<String, String>() {
// @Override
// public String apply(String s) {
// return "hello! " + s;
// }
// };
Function<String, String> function = (s) -> {return "hello! " + s;};
System.out.println(function.apply("wex"));
}
}Predicate断定型接口
有一个输入参数,返回值只能是 布尔值!
/**
* 断定型接口:有一个输入参数,返回值只能是 布尔值!
*/
public class Demo02 {
public static void main(String[] args) {
// 判断字符串是否为空
// Predicate<String> predicate = new Predicate<String>(){
//// @Override
//// public boolean test(String str) {
//// return str.isEmpty();
//// }
//// };
Predicate<String> predicate = (str)->{return str.isEmpty(); };
System.out.println(predicate.test(""));
}
}Consumer 消费型接口
public class Demo3 {
public static void main(String[] args) {
// Consumer<String> consumer = new Consumer<String>() {
// @Override
// public void accept(String str) {
// System.out.println(str);
// }
// };
Consumer<String> consumer = (str)->{System.out.println(str);};
consumer.accept("wex");
}
}Supplier 供给型接口
/**
* Supplier 供给型接口 没有参数,只有返回值
*/
public class Demo4 {
public static void main(String[] args) {
// Supplier supplier = new Supplier<Integer>() {
// @Override
// public Integer get() {
// System.out.println("get()");
// return 1024;
// }
// };
Supplier supplier = ()->{ return 1024; };
System.out.println(supplier.get());
}
}- Stream流式计算
大数据:存储 + 计算
集合、MySQL 本质就是存储东西的。
计算都应该交给流来操作。
/**
* 题目要求:一分钟内完成此题,只能用一行代码实现!
* 现在有5个用户!筛选:
* 1、ID 必须是偶数
* 2、年龄必须大于23岁
* 3、用户名转为大写字母
* 4、用户名字母倒着排序
* 5、只输出一个用户!
*/
public class StreamTest {
public static void main(String[] args) {
User u1 = new User(1,"a",21);
User u2 = new User(2,"b",22);
User u3 = new User(3,"c",23);
User u4 = new User(4,"d",24);
User u5 = new User(6,"e",25);
// 集合就是存储
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
// 计算交给Stream流
// lambda表达式、链式编程、函数式接口、Stream流式计算
list.stream()
.filter(u->{return u.getId() % 2 == 0;})
.filter(u->{return u.getAge() > 23;})
.map(u->{return u.getName().toUpperCase();})
.sorted((uu1, uu2)->{return uu1.compareTo(uu2);})
.limit(1)
.forEach(System.out::println);
}
}- ForkJoin 分支合并
ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。大数据量!
大数据:Map Reduce (把大任务拆分为小任务)
工作窃取
- 线程里面维护的都是双端队列
- 当一个线程执行完之后,可以窃取其他线程未完成的任务进行完成
ForkJoin使用
- 执行传入ForkJoinTask,有点像之前说的那个FutureTask
- ForkJoinTask的常用实现类
/**
* 求和计算的任务!
* 3000 6000(ForkJoin) 9000(Stream并行流)
* 如何使用 forkjoin:
*
* 1、forkjoinPool 通过它来执行
* 2、计算任务 forkjoinPool.execute(ForkJoinTask task)
* 3. 计算类要继承 ForkJoinTask
*/
public class ForkJoinTest extends RecursiveTask<Long> {
private Long start;
private Long end;
// 临界值
private Long temp = 10000L;
public ForkJoinTest(Long start, Long end) {
this.start = start;
this.end = end;
}
public static void main(String[] args) {
}
@Override
protected Long compute() {
if ((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else { // forkjoin 递归
long middle = (start + end) / 2; // 中间值
ForkJoinTest task1 = new ForkJoinTest(start, middle);
task1.fork(); // 拆分任务,把任务压入线程队列
ForkJoinTest task2 = new ForkJoinTest(middle+1, end);
task2.fork(); // 拆分任务,把任务压入线程队列
return task1.join() + task2.join();
}
}
}测试:
public class ForkTest {
public static void main(String[] args) throws ExecutionException, InterruptedException{
// test1(); // 12224
// test2(); // 10038
// test3(); // 153
}
// 普通程序员
public static void test1(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum="+sum+" 时间:"+(end-start));
}
// 会使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinTest(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任务
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum="+sum+" 时间:"+(end-start));
}
public static void test3(){
long start = System.currentTimeMillis();
// Stream并行流 () (]
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum=" + sum +"时间:"+ (end-start));
}
}- 异步回调
Future 设计的初衷: 对将来的某个事件的结果进行建模
- 用下面这个类进行异步回调
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 没有返回值的 runAsync 异步回调
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName()+"runAsync=>Void");
// });
//
// System.out.println("1111");
// completableFuture.get(); // 获取阻塞执行结果
// 有返回值的 supplyAsync 异步回调
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
int i = 10/0;
return 1024;
});
System.out.println(completableFuture.whenComplete((t, u)->{
System.out.println("t=>" + t); // 正常的返回结果
System.out.println("u=>" + u); // 错误信息:
}).exceptionally((e)->{
// java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
System.out.println(e.getMessage());
return 233; // 可以获取到错误的返回结果
}).get());
}
}- JMM - Java内存模型
Java的并发采用的是共享内存模型
JMM的存在
在现代计算机中,cpu的指令速度远超内存的存取速度,由于计算机的存储设备与处理器的运算速度有几个数量级的差距,所以现代计算机系统都不得不加入一层读写速度尽可能接近处理器运算速度的高速缓存(Cache)来作为内存与处理器之间的缓冲:将运算需要使用到的数据复制到缓存中,让运算能快速进行,当运算结束后再从缓存同步回内存之中,这样处理器就无须等待缓慢的内存读写了。
基于高速缓存的存储交互很好地解决了处理器与内存的速度矛盾,但是也为计算机系统带来更高的复杂度,因为它引入了一个新的问题:缓存一致性(Cache Coherence)。
JMM定义了Java 虚拟机(JVM)在计算机内存(RAM)中的工作方式。JVM是整个计算机虚拟模型,所以JMM是隶属于JVM的。从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory),本地内存中存储了该线程以读/写共享变量的副本。本地内存是JMM的一个抽象概念,并不真实存在。它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。
JMM的一些同步的约定
- 线程解锁前,必须把共享变量立刻刷回主存。
- 线程加锁前,必须读取主存中的最新值到工作内存中!
- 加锁和解锁是同一把锁
八种操作
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是对变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
- Volatile
Volatile 是 Java 虚拟机提供轻量级的同步机制
保证可见性
public class JMMDemo {
// 不加 volatile 程序就会死循环!
// 加 volatile 可以保证可见性
private static int num = 0;
public static void main(String[] args) { // main
new Thread(()->{ // 线程 1 对主内存的变化不知道的
while (num==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 修改完以上线程还是不会停止,因为它们保存的是之前的副本
num = 1;
System.out.println(num);
}
}volatile修饰的变量,如果改变的话,会通知所有的线程,然后进行更新。
不保证原子性
原子性 : 不可分割
线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败。
// volatile 不保证原子性
public class VDemo02 {
// volatile 不保证原子性
// 可以使用原子类解决,使用CAS
private volatile static int num = 0;
public static void add(){
// 不能实现原子性,还是可能多个线程取到同个值,然后加一放进去
num++;
}
public static void main(String[] args) {
//理论上num结果应该为 2 万
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000 ; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){ // main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " + num);
}
}禁止指令重排
指令重排:你写的程序,计算机并不是按照你写的那样去执行的。
源代码-->编译器优化的重排--> 指令并行也可能会重排--> 内存系统也会重排---> 执行
处理器在进行指令重排的时候,考虑:数据之间的依赖性!
但是在多线程下,可能会由于指令重排导致异常结果。可能每个线程中数据无依赖性,但是不同线程中会操作同个变量。
实现: volatile使用内存屏障禁止指令重排。
- 单例模式
饿汉式
一上来就先创建对象,这样可能会导致空间上的浪费
无线程安全问题
public class Hungry {
// 一定要有的私有无参构造器
private Hungry() {
}
private static final Hungry HUNGRY = new Hungry();
public static Hungry getInstance(){
return HUNGRY;
}
}懒汉式
双重检验,线程安全,增加效率
public class LazyMan {
private LazyMan(){
}
// 防止指令重排
/**
* 1. 分配内存空间
* 2、执行构造方法,初始化对象
* 3、把这个对象指向这个空间
*
* 123
* 132 A
* B // 此时lazyMan还没有完成构造
*/
private volatile static LazyMan lazyMan;
// 双重检测锁模式的 懒汉式单例 DCL懒汉式
public static LazyMan getInstance(){
if (lazyMan == null)
synchronized (LazyMan.class){
if (lazyMan == null){
lazyMan = new LazyMan(); // 不是一个原子性操作
}
}
return lazyMan;
}
}但是这并不安全,因为可因通过反射区获取单例对象
public static void main(String[] args) throws IllegalAccessException, InvocationTargetException, InstantiationException, NoSuchMethodException {
LazyMan lazyMan1 = LazyMan.getInstance(); Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null); declaredConstructor.setAccessible(true); LazyMan lazyMan2 = declaredConstructor.newInstance(); // 两个对象是不一样的 System.out.println(lazyMan1); System.out.println(lazyMan2); }
我们可以在构造器加锁,然后判断是否lazyMan属性已经有了对应的值,有就返回。(但这只能阻止系统创建完单例模式后,无法用反射区进行创建,但只要在系统创建前,属性未赋值之前都可以)
private LazyMan(){
synchronized (LazyMan.class){ if (lazyMan != null) throw new RuntimeException("不要试图用反射区破坏异常!"); }}
使用一个加密或者外部位置的属性标识。(但是这也不是最安全的,因为别人也可以解密获得这个属性)
private static boolean flag = false;
private LazyMan(){
synchronized (LazyMan.class){ if (flag == false) flag = true; else throw new RuntimeException("不要试图用反射区破坏异常!"); }}
静态内部类
静态内部类只有在创建的时候再加载
线程安全
public class Holder {
private Holder(){
}
public static Holder getInstance(){
return InnerClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}枚举
进入newInstance方法,我们看到,当类是枚举类型的时候,就会提醒不能使用反射破坏枚举,也就是枚举类型是自带单例模式的。
// enum 是一个什么? 本身也是一个Class类
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws NoSuchMethodException,
IllegalAccessException, InvocationTargetException, InstantiationException {
EnumSingle instance1 = EnumSingle.INSTANCE;
Constructor<EnumSingle> declaredConstructor =
EnumSingle.class.getDeclaredConstructor(String.class,int.class);
declaredConstructor.setAccessible(true);
EnumSingle instance2 = declaredConstructor.newInstance();
// NoSuchMethodException: com.kuang.single.EnumSingle.<init>()
System.out.println(instance1);
System.out.println(instance2);
}
}直接使用javap反编译会发现也是有一个空参构造器,但是其实是没有的,我们用jad进行反编译,最后发现它是有两个参数的:
public final class EnumSingle extends Enum
{
public static EnumSingle[] values()
{
return (EnumSingle[])$VALUES.clone();
}
public static EnumSingle valueOf(String name)
{
return (EnumSingle)Enum.valueOf(com/kuang/single/EnumSingle, name);
}
private EnumSingle(String s, int i)
{
super(s, i);
}
public EnumSingle getInstance()
{
return INSTANCE;
}
public static final EnumSingle INSTANCE;
private static final EnumSingle $VALUES[];
static
{
INSTANCE = new EnumSingle("INSTANCE", 0);
$VALUES = (new EnumSingle[] {
INSTANCE
});
}
}- CAS
CAS:是CPU的并发原语,compareAndSet,比较并设置
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
// 期望、更新
// public final boolean compareAndSet(int expect, int update)
// 如果我期望的值达到了,那么就更新,否则,就不更新, CAS 是CPU的并发原语!
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
}
}
------------------------------------------------------
true
2021
false
2021Unsafe类
Java无法直接操作内存,但是可以调用C++的native本地方法操作内存
unsafe相当于是一个后门,里面的方法大多数都是本地native方法
解读源码:atomicInteger.getAndIncrement()
public final int getAndIncrement() {
// 对应三个参数,一个是自身的对象,一个是偏移值,还有一个是自增的数
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
// 通过对象和偏移量获取内存中的值
var5 = this.getIntVolatile(var1, var2);
// 用一个while循环来实现自旋锁
// 通过var1和var2获取值与var5做比较,如果相等,把值设置为var5+var4,也就是加一
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}CAS : 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环!
缺点:
- 循环会耗时
- 一次性只能保证一个共享变量的原子性
- ABA问题
- 原子引用
带版本号 的原子操作!
public class CASDemo {
//AtomicStampedReference 注意,如果泛型是一个包装类,注意对象的引用问题
// 正常在业务操作,这里面比较的都是一个个对象
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);
// CAS compareAndSet : 比较并交换!
public static void main(String[] args) {
new Thread(()->{
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("a1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(1, 2,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1);
System.out.println("a2=>"+atomicStampedReference.getStamp());
System.out.println(atomicStampedReference.compareAndSet(2, 1,
atomicStampedReference.getStamp(),
atomicStampedReference.getStamp() + 1));
System.out.println("a3=>"+atomicStampedReference.getStamp());
},"a").start();
// 乐观锁的原理相同!
new Thread(()->{
int stamp = atomicStampedReference.getStamp(); // 获得版本号
System.out.println("b1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicStampedReference.compareAndSet(1, 6,
stamp, stamp + 1));
System.out.println("b2=>"+atomicStampedReference.getStamp());
},"b").start();
}
}注意:
Integer 使用了对象缓存机制,默认范围是 -128 ~ 127 ,推荐使用静态工厂方法valueOf 获取对象实
例,而不是 new,因为 valueOf 使用缓存,而 new 一定会创建新的对象分配新的内存空间;
- 各种锁的理解
公平锁、非公平锁
公平锁: 非常公平, 不能够插队,必须先来后到!
非公平锁:非常不公平,可以插队 (默认都是非公平)
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}可重入锁(递归锁)
可重入就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。
Synchronized
// Synchronized
public class Demo01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}
class Phone{
public synchronized void sms(){
System.out.println(Thread.currentThread().getName() + "sms");
call(); // 这里也有锁
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName() + "call");
}
}Lock 版(需要手动加锁释放锁)
public class Demo02 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(()->{
phone.sms();
},"A").start();
new Thread(()->{
phone.sms();
},"B").start();
}
}
class Phone2{
Lock lock = new ReentrantLock();
public void sms(){
lock.lock(); // 细节问题:lock.lock(); lock.unlock(); // lock 锁必须配对,否则就会死在里面
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "sms");
call(); // 这里也有锁
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "call");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}自旋锁
自定义一个锁测试
/**
* 自旋锁
*/
public class SpinlockDemo {
// int 0
// Thread null
AtomicReference<Thread> atomicReference = new AtomicReference<>();
// 加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "==> mylock");
// 自旋锁
while (!atomicReference.compareAndSet(null,thread)){
}
}
// 解锁
// 加锁
public void myUnLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "==> myUnlock");
atomicReference.compareAndSet(thread,null);
}
}
public class TestSpinLock {
public static void main(String[] args) throws InterruptedException {
// 底层使用的自旋锁CAS
SpinlockDemo lock = new SpinlockDemo();
new Thread(()-> {
lock.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnLock();
}
},"T1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> {
lock.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnLock();
}
},"T2").start();
}
}死锁
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA, lockB), "T1").start();
new Thread(new MyThread(lockB, lockA), "T2").start();
}
}
class MyThread implements Runnable{
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName() +
"lock:"+lockA+"=>get"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName() +
"lock:"+lockB+"=>get"+lockA);
}
}
}
}解决问题:
使用 jps -l 定位进程号
使用 jstack 进程号 找到死锁问题
锁相关
从Atomic到CAS
Java内存模型要保证可见性、原子性和有序性。
Java 虚拟机又提供了一个轻量级的同步机制——volatile
但是 volatile 算是乞丐版的 synchronized,并不能保证原子性 ,所以,又增加了java.util.concurrent.atomic包, 这个包下提供了一系列原子类。
Atomic原子类
Atomic原子类可以保证在多线程的环境下,当某个线程在执行atomic的方法时,其他的线程不会打断它,只会像自选算自旋锁一样,等到该方法完成之后才等分配线程。Atomic 类在软件层面上是非阻塞的,它的原子性其实是在硬件层面上借助相关的指令来保证的。
Atomic类可以分成四组:
- 基本类型:AtomicBoolean,AtomicInteger,AtomicLong
- 数组类型:tomicIntegerArray,AtomicLongArray,AtomicReferenceArray
- 引用类型:AtomicReference,AtomicMarkableReference,AtomicStampedReference
- 对象的属性修改类型 :AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater
- JDK1.8新增:DoubleAccumulator、LongAccumulator、DoubleAdder、LongAdder、Striped64
以 AtomicInteger 为例了解常用方法
方法 描述
get() 直接返回值
addAndGet(int) 增加指定的数据后返回增加后的数据,相当于 i++
getAndAdd(int) 增加指定的数据,返回变化前的数据,相当于 ++i
getAndIncrement() 增加1,返回增加前的数据
getAndDecrement() 减少1,返回减少前的数据
getAndSet(int) 设置指定的数据,返回设置前的数据
decrementAndGet() 减少1,返回减少后的值
incrementAndGet() 增加1,返回增加后的值
floatValue() 转化为浮点数返回
intValue() 转化为int 类型返回
set(int) 设置为给定值
lazySet(int) 仅仅当get时才会set http://ifeve.com/juc-atomic-class-lazyset-que/
compareAndSet(int, int) 尝试新增后对比,若增加成功则返回true否则返回false
/*
compareAndSet() 尝试新增后对比,若增加成功则返回true否则返回false。其实就是比较并交换,判断用当前值和期望值 (第一个参数),是否一致,如果一致,修改为更新值(第二个参数),这就是大名鼎鼎的 CAS。
*/
public class CASDemo {
public static void main(String[] args) {
System.out.println(num.compareAndSet(6, 7) + "\t + current num:" + num);
System.out.println(num.compareAndSet(6, 7) + "\t current num:" + num);
}
}
------------------------------------------------------
true + current num:7
false current num:7CAS
- CAS:全称 Compare and swap,即比较并交换,它是一条 CPU 同步原语。是一种硬件对并发的支持,针对多处理器操作而设计的一种特殊指令,用于管理对共享数据的并发访问。
- CAS 是一种无锁的非阻塞算法的实现。
- CAS 包含了 3 个操作数:
- 需要读写的内存值 V
- 旧的预期值 A
- 要修改的更新值 B
- 当且仅当 V 的值等于 A 时,CAS 通过原子方式用新值 B 来更新 V 的 值,否则不会执行任何操作(他的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的。)
CAS 并发原语体现在 Java 语言中的 sum.misc.Unsafe 类中的各个方法。调用 Unsafe 类中的 CAS 方法, JVM 会帮助我们实现出 CAS 汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于 CAS是一种系统原语,原语属于操作系统用于范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,CAS 是一条 CPU 的原子指令,不会造成数据不一致问题。
我们常用的 java.util.concurrent 包就建立在CAS之上。
用 CAS 分析 AtomicInteger 类:
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
// 该类下的方法大部分是 调用了 Unsafe 类
private static final Unsafe unsafe = Unsafe.getUnsafe();
// valueOffset 表示该变量值在内存中的偏移地址,因为 UnSafe 就是根据内存偏移地址获取数据。
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// volatile保证了多线程之间的内存可见性。
private volatile int value;
// ... (methods) ...
}UnSafe类:
是 CAS 的核心类,由于 Java 方法无法直接访问底层系统,需要通过本地(native)方法来访问,UnSafe 相当于一个后门,基于该类可以直接操作特定内存的数据。UnSafe 类存在与 sum.misc 包中,其内部方法可以像 C 语言的指针一样直接操作内存,因为 Java 中 CAS 操作的执行依赖于 UnSafe 类的方法。
UnSafe 类中的所有方法都是 native 修饰的,也就是说该类中的方法都是直接调用操作系统底层资源执行相应任务。
// Unsafe 类为一单例实现,提供静态方法 getUnsafe 获取 Unsafe 实例,当且仅当调用 getUnsafe 方法的类为引导类加载器所加载时才合法,否则抛出 SecurityException 异常
public final class Unsafe {
private static final Unsafe theUnsafe;
// ......
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
public native int getInt(Object var1, long var2);
public native void putInt(Object var1, long var2, int var4);
public native Object getObject(Object var1, long var2);
public native void putObject(Object var1, long var2, Object var4);
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
// ......
}逐层看 Unsafe 类中的 getAndAdd() 的源码如下:
// val1:AtomicInteger 对象本身
// var2:该对象值的引用地址,内存偏移量
// var4:需要变动的数量,即 ++i 的 i
public final int getAndAddInt(Object var1, long var2, int var4) {
// 用var1, var2 找出的主内存中真实的值(通过内存偏移量)
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
// this.compareAndSwapInt 用该对象当前的值与 var5 比较,如果相同,更新 var5 + var4 并且返回 true,如果不同,继续取值然后再比较,直到更新完成。
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}这一操作没有加锁,反复执行,既保证了一致性,又保证了并发性。
假设线程A和线程B两个线程同时执行 getAndAddInt 操作(分别跑在不同CPU上):
- AtomicInteger 里面的 value 原始值为 3,即主内存中 AtomicInteger 的 value 为 3,根据 JMM 模型,线程A和线程B各自持有一份值为 3 的 value 的副本分别到各自的工作内存;
- 线程A通过 getIntVolatile(var1,var2) 拿到 value 值3,这时线程A被挂起;
- 线程B也通过 getIntVolatile(var1,var2) 方法获取到 value 值 3,此时刚好线程B没有被挂起并执行compareAndSwapInt 方法比较内存值为 3,成功修改内存值为 4,线程B结束,一切正常
- 这时线程A恢复,执行compareAndSwapInt() 方法比较,发现自己手里的3和主内存的值4不一致,说明该值已经被其他线程抢先一步修改过了,那线程A本次修改失败,重新读取;
- 线程A重新获取value值,因为变量value 被 volatile 修饰,所以其他线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt进行比较替换,直到成功
获取UnSafe类
从getUnsafe 方法的使用限制条件出发,通过Java命令行命令 -Xbootclasspath/a 把调用 Unsafe 相关方法的类A所在 jar 包路径追加到默认的 bootstrap 路径中,使得A被引导类加载器加载,从而通过Unsafe.getUnsafe方法安全的获取 Unsafe 实例。
java -Xbootclasspath/a: ${path} // 其中path为调用Unsafe相关方法的类所在jar包路径
通过反射技术暴力获取 Unsafe 对象
private static Unsafe reflectGetUnsafe() { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); return (Unsafe) field.get(null); } catch (Exception e) { log.error(e.getMessage(), e); return null; } }
缺点
- 循环时间长,开销很大。CAS算法需要不断地自旋来读取最新的内存值,长时间读取不到就会造成不必要的CPU开销。do while 如果CAS失败,会一直进行尝试,如果CAS长时间一直不成功,可能会给CPU带来很大的开销
- 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是,对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性。
- ABA 问题
比如线程1从内存位置 V 中取出A,这时线程2也从内存中取出A,并且线程2进行了一些操作将值变成了B,然后线程2又将V位置的数据变成A,这个时候线程1进行CAS操作发现内存中仍然是A,线程1就会误认为它没有被修改过,这个漏洞就是CAS操作的"ABA"问题。
解决:
各种乐观锁的实现中通常都会用版本戳 version 来对记录或对象标记,避免并发操作带来的问题
在Java中,AtomicStampedReference<v> 也实现了这个作用,它通过包装[E,int]的元组来对对象标记版本戳stamp,从而避免ABA问题</v>



京公网安备 11010502036488号