6 读写锁
ReadWriteLock
package com.snowdong; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 独占锁(写锁)一次只能被一个线程占有 * 共享锁(读锁)多个线程可以同步占有 * 读-读,可以共存 * 读-写,不能共存 * 写-写,不能共存 */ public class TestReadWriteLockDemo { public static void main(String[] args) { //MyCache mycache=new MyCache(); MyCacheLock myCache=new MyCacheLock(); //写入 for (int i = 0; i < 5; i++) { final int temp=i; new Thread(()->{ myCache.put(temp+"",temp+""); },String.valueOf(i)).start(); } //读取 for (int i = 0; i < 5; i++) { final int temp=i; new Thread(()->{ myCache.get(temp+""); },String.valueOf(i)).start(); } } } class MyCacheLock{ private volatile Map<String,Object> map=new HashMap<>(); //读写锁,更加细粒度的控制 private ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); private Lock lock=new ReentrantLock();//常用的锁,锁住调用对象 //存,写 public void put(String key,Object value){ readWriteLock.writeLock().lock(); try{ System.out.println(Thread.currentThread().getName()+"写入"+key); map.put(key, value); System.out.println(Thread.currentThread().getName()+"写入OK"); }catch (Exception e){ e.printStackTrace(); }finally{ readWriteLock.writeLock().unlock(); } } //取,读 public void get(String key){ readWriteLock.readLock().lock(); try{ System.out.println(Thread.currentThread().getName()+"读取"+key); Object o=map.get(key); System.out.println(Thread.currentThread().getName()+"读取OK"); }catch (Exception e){ e.printStackTrace(); }finally { readWriteLock.readLock().unlock(); } } } //不加锁锁状态会出现,读写混乱 class MyCache{ private volatile Map<String,Object> map=new HashMap<>(); //存,写 public void put(String key,Object value){ System.out.println(Thread.currentThread().getName()+"写入"+key); map.put(key, value); System.out.println(Thread.currentThread().getName()+"写入OK"); } //取,读 public void get(String key){ System.out.println(Thread.currentThread().getName()+"读取"+key); Object o=map.get(key); System.out.println(Thread.currentThread().getName()+"读取OK"); } }
7 阻塞队列
写入:如果队列满了,则阻塞等待。读取:如果队列空,则阻塞等待
- 什么情况下我们会使用 阻塞队列:多线程并发处理,线程池!
- 学会使用队列: 添加、移除
- 四组API
7.1 ArrayBlockingQueue数组阻塞队列
代码具体实现展示:
package com.snowdong; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class TestBlockingQueue { public static void main(String[] args) throws InterruptedException { //test1(); //test2(); test4(); } /** * 抛出异常 */ public static void test1(){ /** * public ArrayBlockingQueue(int capacity) { * this(capacity, false); * } */ BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); //再添加元素则报错,java.lang.IllegalStateException: Queue full抛出异常 //System.out.println(blockingQueue.add("d")); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); //再移除元素则报错,java.util.NoSuchElementException抛出异常 System.out.println(blockingQueue.remove()); } /** * 有返回值,没有异常 */ public static void test2(){ BlockingQueue blockingQueue=new ArrayBlockingQueue<>(2); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); //再次添加,则会返回false,不报错 //System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); //再次删除,则会返回null,不报错 System.out.println(blockingQueue.poll()); } /** * 等待,阻塞(一直阻塞) */ public static void test3() throws InterruptedException { BlockingQueue blockingQueue=new ArrayBlockingQueue<>(2); blockingQueue.put("a"); blockingQueue.put("b"); //再次往队列中添加,队列满,则一直阻塞 //blockingQueue.put("c"); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); //再次删除,队列空,则一直阻塞 System.out.println(blockingQueue.take()); } /** * 等待,阻塞(等待超时) */ public static void test4() throws InterruptedException { BlockingQueue blockingQueue=new ArrayBlockingQueue<>(2); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); //等待2秒就推出,不报错 /** * boolean offer(E e, long timeout, TimeUnit unit) * throws InterruptedException; */ blockingQueue.offer("c",2, TimeUnit.SECONDS); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); //等待3秒,还不能等到队列中来值,则推出,不报错 /** * *E poll ( long timeout, TimeUnit unit) * throws InterruptedException; */ blockingQueue.poll(2,TimeUnit.SECONDS); } }
7.2 SynchronousQueue同步队列
没有容量设置,进去一个元素,必须等待取出来后,才能往里面放元素
put,take(即阻塞,等待)
package com.snowdong; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * 同步队列和其他BlockingQueue不一样,SynchronousQueue不存储元素 * put了一个元素,必须从里面take出来,否则不能再put进去 */ public class TestSynchronousQueue { public static void main(String[] args) { //同步队列 BlockingQueue blockingQueue=new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" put a"); blockingQueue.put("a"); System.out.println(Thread.currentThread().getName()+" put b"); blockingQueue.put("b"); System.out.println(Thread.currentThread().getName()+" put c"); blockingQueue.put("c"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take()); System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take()); System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
有问题待研究
8 线程池
3大方法,7大参数,4中拒绝策略
线程池的好处
- 降低资源的消耗
- 提高响应的速度
- 方便管理
- 线程复用,可以控制最大并发数,管理线程*
8.1 三大方法
具体代码说明:
package com.snowdong; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; //Executors工具类,三大方法 public class TestThreadTool { public static void main(String[] args) { //单个线程的线程池 ExecutorService threadPool1= Executors.newSingleThreadExecutor(); //创建一个固定大小的线程池 ExecutorService threadPool2=Executors.newFixedThreadPool(5); //可伸缩的线程池 ExecutorService threadPool3=Executors.newCachedThreadPool(); try{ for (int i = 0; i < 10; i++) { threadPool2.execute(() -> { System.out.println(Thread.currentThread().getName() + " OK"); }); } }catch (Exception e){ e.printStackTrace(); }finally{ //线程池用完,关闭线程池 threadPool2.shutdown(); } } }
8.2 七大参数
三大方法的构造函数源码说明:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
ThreadPoolExecutor()的具体源码说明:
public ThreadPoolExecutor(int corePoolSize,// 核心线程池大小 int maximumPoolSize,// 最大核心线程池大小 long keepAliveTime,// 超时了没有人调用就会释放,(线程池中非核心线程多久没被调用了即释放) TimeUnit unit,// 超时单位 BlockingQueue<Runnable> workQueue,// 阻塞队列 ThreadFactory threadFactory,// 线程工厂:创建线程的,一般 不用动 RejectedExecutionHandler handler) {// 拒绝策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
对应线程池的现实场景对比
8.3 四种拒绝策略
具体代码展示:
package com.snowdong; import java.util.concurrent.*; // Executors 工具类、3大方法 /** * //Abort:流产,终止,使夭折 * new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人进来,不处理这个人,抛出异常 * java.util.concurrent.RejectedExecutionException * * new ThreadPoolExecutor.CallerRunsPolicy()//哪里来去哪里,即把这个返回原线程中.再本例中会返回给主线程main * new ThreadPoolExecutor.DiscardPolicy()//队列满了,丢掉任务,不会抛出异常 * new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了,丢掉阻塞队列中最老的那个任务,不抛出异常。不影响正在执行的任务 */ //手动创建一个线程池 public class TestThreadPool01 { public static void main(String[] args) { //自定义线程池,工作ThreadPoolExecutor ExecutorService threadPool=new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy()); try{ // 最大承载:Deque + max for (int i = 1; i <= 9; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + " OK"); }); } }catch (Exception e){ e.printStackTrace(); }finally { threadPool.shutdown(); } } }
池的最大的大小如何去设置!
了解:IO密集型,CPU密集型:(调优)
// 最大线程到底该如何定义 // 1、CPU 密集型,几核,就是几,可以保持CPu的效率最高! // 2、IO 密集型 > 判断你程序中十分耗IO的线程, // 程序 15个大型任务 io十分占用资源! //获取CPU的核数 System.out.println(Runtime.getRuntime().availableProcessors()); //自定义线程池,工作ThreadPoolExecutor ExecutorService threadPool=new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(), 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
lambda表达式,链式编程,函数式接口,Stream流式计算
9 四大函数式接口
9.1 Function函数式接口
只有一个方法的接口
代码测试Function函数式接口
package com.snowdong; import java.util.function.Function; public class TestFunctionInterface { public static void main(String[] args) { /* Function<String,String> function = new Function<String,String>(){ @Override public String apply(String str) { return str; } };*/ Function<String,String> function=(str)->{ return str; }; System.out.println(function.apply("snow")); } }
9.2 Predicate断定型接口
有一个输入参数,返回值是一个布尔值
代码测试如下:
package com.snowdong; import java.util.function.Function; import java.util.function.Predicate; public class TestFunctionInterface { public static void main(String[] args) { /* //测试字符串是否为空 Predicate<String> predicate = new Predicate<String>(){ @Override public boolean test(String s) { return s.isEmpty(); } };*/ Predicate<String> predicate=(s)->{ return s.isEmpty(); }; System.out.println(predicate.test("a")); } }
9.3 Consumer消费型接口
只有一个输入值,没有返回值
代码测试如下:
package com.snowdong; import java.util.function.Consumer; public class TestFunctionInterface { public static void main(String[] args) { /* Consumer<String> consumer = new Consumer<String>(){ @Override public void accept(String s) { System.out.println(s); } };*/ Consumer<String> consumer=(s)->{ System.out.println(s); }; consumer.accept("snow"); } }
9.4 Supplier供给型接口
无参数输入,只有返回值
代码测试如下:
package com.snowdong; import java.util.function.Supplier; public class TestFunctionInterface { public static void main(String[] args) { /* Supplier<String> supplier=new Supplier<String>(){ @Override public String get() { return "snow"; } };*/ Supplier<String> supplier=()->{ return "snow"; }; System.out.println(supplier.get()); } }
10 Stream流式计算
计算通过流来操作。
代码测试如下:
package com.snowdong; import java.util.Arrays; import java.util.List; /** * 题目要求:一分钟内完成此题,只能用一行代码实现! * 现在有5个用户!筛选: * 1、ID 必须是偶数 * 2、年龄必须大于23岁 * 3、用户名转为大写字母 * 4、用户名字母倒着排序 * 5、只输出一个用户! */ public class TestStream { public static void main(String[] args) { //新建几个User实例 User u1=new User(1,"a",21); User u2=new User(2,"b",22); User u3=new User(3,"c",23); User u4=new User(4,"d",24); User u5=new User(6,"e",25); //集合就是存储 List<User> list= Arrays.asList(u1,u2,u3,u4,u5); //计算交给Stream流 list.stream() .filter(u->{return u.getId()%2==0;}) .filter(u->{return u.getAge()>23;}) .map(u->{return u.getName().toUpperCase();}) .sorted((uu1,uu2)->{return uu2.compareTo(uu1);}) .limit(1) .forEach(System.out::println); /** * default Stream<E> stream() { * return StreamSupport.stream(spliterator(), false); * } * * Stream<T> filter(Predicate<? super T> predicate); * * <R> Stream<R> map(Function<? super T, ? extends R> mapper); * * Stream<T> sorted(Comparator<? super T> comparator); * * void forEach(Consumer<? super T> action); */ } } //User类需要写