6 读写锁

ReadWriteLock
图片说明

package com.snowdong;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 独占锁(写锁)一次只能被一个线程占有
 * 共享锁(读锁)多个线程可以同步占有
 * 读-读,可以共存
 * 读-写,不能共存
 * 写-写,不能共存
 */
public class TestReadWriteLockDemo {
    public static void main(String[] args) {
        //MyCache mycache=new MyCache();
        MyCacheLock myCache=new MyCacheLock();
        //写入
        for (int i = 0; i < 5; i++) {
            final int temp=i;
            new Thread(()->{
                myCache.put(temp+"",temp+"");
            },String.valueOf(i)).start();
        }
        //读取
        for (int i = 0; i < 5; i++) {
            final int temp=i;
            new Thread(()->{
                myCache.get(temp+"");
            },String.valueOf(i)).start();
        }
    }
}
class MyCacheLock{
    private volatile Map<String,Object> map=new HashMap<>();
    //读写锁,更加细粒度的控制
    private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    private Lock lock=new ReentrantLock();//常用的锁,锁住调用对象
    //存,写
    public void put(String key,Object value){
        readWriteLock.writeLock().lock();
        try{
            System.out.println(Thread.currentThread().getName()+"写入"+key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName()+"写入OK");
        }catch (Exception e){
            e.printStackTrace();
        }finally{
            readWriteLock.writeLock().unlock();
        }
    }
    //取,读
    public void get(String key){
        readWriteLock.readLock().lock();
        try{
            System.out.println(Thread.currentThread().getName()+"读取"+key);
            Object o=map.get(key);
            System.out.println(Thread.currentThread().getName()+"读取OK");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            readWriteLock.readLock().unlock();
        }

    }
}
//不加锁锁状态会出现,读写混乱
class MyCache{
    private volatile Map<String,Object> map=new HashMap<>();
    //存,写
    public void put(String key,Object value){
        System.out.println(Thread.currentThread().getName()+"写入"+key);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName()+"写入OK");
    }
    //取,读
    public void get(String key){
        System.out.println(Thread.currentThread().getName()+"读取"+key);
        Object o=map.get(key);
        System.out.println(Thread.currentThread().getName()+"读取OK");
    }
}

7 阻塞队列

写入:如果队列满了,则阻塞等待。读取:如果队列空,则阻塞等待

图片说明

  • 什么情况下我们会使用 阻塞队列:多线程并发处理,线程池!
  • 学会使用队列: 添加、移除
  • 四组API
    图片说明

7.1 ArrayBlockingQueue数组阻塞队列

代码具体实现展示:

 package com.snowdong;

 import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

 public class TestBlockingQueue {
    public static void main(String[] args) throws InterruptedException {
        //test1();
        //test2();
        test4();
    }

    /**
     * 抛出异常
     */
    public static void test1(){
        /**
         *     public ArrayBlockingQueue(int capacity) {
         *         this(capacity, false);
         *     }
         */
        BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        //再添加元素则报错,java.lang.IllegalStateException: Queue full抛出异常
        //System.out.println(blockingQueue.add("d"));

        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //再移除元素则报错,java.util.NoSuchElementException抛出异常
        System.out.println(blockingQueue.remove());
    }
    /**
     * 有返回值,没有异常
     */
    public static void test2(){
        BlockingQueue blockingQueue=new ArrayBlockingQueue<>(2);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        //再次添加,则会返回false,不报错
        //System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        //再次删除,则会返回null,不报错
        System.out.println(blockingQueue.poll());
    }
    /**
     * 等待,阻塞(一直阻塞)
     */
    public static void test3() throws InterruptedException {
        BlockingQueue blockingQueue=new ArrayBlockingQueue<>(2);
        blockingQueue.put("a");
        blockingQueue.put("b");
        //再次往队列中添加,队列满,则一直阻塞
        //blockingQueue.put("c");
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        //再次删除,队列空,则一直阻塞
        System.out.println(blockingQueue.take());
    }
    /**
     * 等待,阻塞(等待超时)
     */
    public static void test4() throws InterruptedException {
        BlockingQueue blockingQueue=new ArrayBlockingQueue<>(2);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        //等待2秒就推出,不报错
        /**
         *     boolean offer(E e, long timeout, TimeUnit unit)
         *         throws InterruptedException;
         */
        blockingQueue.offer("c",2, TimeUnit.SECONDS);
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        //等待3秒,还不能等到队列中来值,则推出,不报错
        /**
         *      
         *E poll ( long timeout, TimeUnit unit)
         *         throws InterruptedException;
         */
        blockingQueue.poll(2,TimeUnit.SECONDS);
    }
}

7.2 SynchronousQueue同步队列

没有容量设置,进去一个元素,必须等待取出来后,才能往里面放元素
put,take(即阻塞,等待)

package com.snowdong;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * 同步队列和其他BlockingQueue不一样,SynchronousQueue不存储元素
 * put了一个元素,必须从里面take出来,否则不能再put进去
 */
public class TestSynchronousQueue {
    public static void main(String[] args) {
        //同步队列
        BlockingQueue blockingQueue=new SynchronousQueue<>();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+" put a");
                blockingQueue.put("a");
                System.out.println(Thread.currentThread().getName()+" put b");
                blockingQueue.put("b");
                System.out.println(Thread.currentThread().getName()+" put c");
                blockingQueue.put("c");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
                System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
                System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

有问题待研究

8 线程池

3大方法,7大参数,4中拒绝策略

线程池的好处

  • 降低资源的消耗
  • 提高响应的速度
  • 方便管理
  • 线程复用,可以控制最大并发数,管理线程*

8.1 三大方法

图片说明
具体代码说明:

package com.snowdong;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//Executors工具类,三大方法
public class TestThreadTool {
    public static void main(String[] args) {
        //单个线程的线程池
        ExecutorService threadPool1= Executors.newSingleThreadExecutor();
        //创建一个固定大小的线程池
        ExecutorService threadPool2=Executors.newFixedThreadPool(5);
        //可伸缩的线程池
        ExecutorService threadPool3=Executors.newCachedThreadPool();
        try{
            for (int i = 0; i < 10; i++) {
                threadPool2.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " OK");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally{
            //线程池用完,关闭线程池
            threadPool2.shutdown();
        }
    }
}

8.2 七大参数

三大方法的构造函数源码说明:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

ThreadPoolExecutor()的具体源码说明:

    public ThreadPoolExecutor(int corePoolSize,// 核心线程池大小
                              int maximumPoolSize,// 最大核心线程池大小
                              long keepAliveTime,// 超时了没有人调用就会释放,(线程池中非核心线程多久没被调用了即释放)

                              TimeUnit unit,// 超时单位

                              BlockingQueue<Runnable> workQueue,// 阻塞队列
                              ThreadFactory threadFactory,// 线程工厂:创建线程的,一般
不用动
                              RejectedExecutionHandler handler) {// 拒绝策略
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

对应线程池的现实场景对比
图片说明

8.3 四种拒绝策略

图片说明
具体代码展示:

package com.snowdong;
import java.util.concurrent.*;
// Executors 工具类、3大方法

/**
 * //Abort:流产,终止,使夭折
 * new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人进来,不处理这个人,抛出异常
 * java.util.concurrent.RejectedExecutionException
 *
 * new ThreadPoolExecutor.CallerRunsPolicy()//哪里来去哪里,即把这个返回原线程中.再本例中会返回给主线程main
 * new ThreadPoolExecutor.DiscardPolicy()//队列满了,丢掉任务,不会抛出异常
 * new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了,丢掉阻塞队列中最老的那个任务,不抛出异常。不影响正在执行的任务
 */

//手动创建一个线程池
public class TestThreadPool01 {
    public static void main(String[] args) {
        //自定义线程池,工作ThreadPoolExecutor
        ExecutorService threadPool=new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());
            try{
                // 最大承载:Deque + max
                for (int i = 1; i <= 9; i++) {
                    threadPool.execute(() -> {
                        System.out.println(Thread.currentThread().getName() + " OK");
                    });
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                threadPool.shutdown();
            }
    }
}

池的最大的大小如何去设置!
了解:IO密集型,CPU密集型:(调优)

// 最大线程到底该如何定义
// 1、CPU 密集型,几核,就是几,可以保持CPu的效率最高!
// 2、IO 密集型 > 判断你程序中十分耗IO的线程,
// 程序 15个大型任务 io十分占用资源!

        //获取CPU的核数
        System.out.println(Runtime.getRuntime().availableProcessors());

        //自定义线程池,工作ThreadPoolExecutor
        ExecutorService threadPool=new ThreadPoolExecutor(
                2,
                Runtime.getRuntime().availableProcessors(),
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());

lambda表达式,链式编程,函数式接口,Stream流式计算

9 四大函数式接口

9.1 Function函数式接口

只有一个方法的接口

代码测试Function函数式接口
图片说明

package com.snowdong;

import java.util.function.Function;

public class TestFunctionInterface {
    public static void main(String[] args) {
/*        Function<String,String> function = new Function<String,String>(){
            @Override
            public String apply(String str) {
                return str;
            }
        };*/
        Function<String,String> function=(str)->{
            return str;
        };
        System.out.println(function.apply("snow"));
    }
}

9.2 Predicate断定型接口

有一个输入参数,返回值是一个布尔值

图片说明
代码测试如下:

package com.snowdong;

import java.util.function.Function;
import java.util.function.Predicate;

public class TestFunctionInterface {
    public static void main(String[] args) {
/*        //测试字符串是否为空
        Predicate<String> predicate = new Predicate<String>(){
            @Override
            public boolean test(String s) {
                return s.isEmpty();
            }
        };*/
        Predicate<String> predicate=(s)->{
            return s.isEmpty();
        };
        System.out.println(predicate.test("a"));
    }
}

9.3 Consumer消费型接口

只有一个输入值,没有返回值

图片说明
代码测试如下:

package com.snowdong;

import java.util.function.Consumer;

public class TestFunctionInterface {
    public static void main(String[] args) {
/*        Consumer<String> consumer = new Consumer<String>(){
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        };*/
        Consumer<String> consumer=(s)->{
            System.out.println(s);
        };
        consumer.accept("snow");
    }
}

9.4 Supplier供给型接口

无参数输入,只有返回值

代码测试如下:

package com.snowdong;
import java.util.function.Supplier;

public class TestFunctionInterface {
    public static void main(String[] args) {
/*        Supplier<String> supplier=new Supplier<String>(){
            @Override
            public String get() {
                return "snow";
            }
        };*/
        Supplier<String> supplier=()->{
            return "snow";
        };
        System.out.println(supplier.get());
    }
}

10 Stream流式计算

计算通过流来操作。

代码测试如下:

package com.snowdong;

import java.util.Arrays;
import java.util.List;

/**
 * 题目要求:一分钟内完成此题,只能用一行代码实现!
 * 现在有5个用户!筛选:
 * 1、ID 必须是偶数
 * 2、年龄必须大于23岁
 * 3、用户名转为大写字母
 * 4、用户名字母倒着排序
 * 5、只输出一个用户!
 */

public class TestStream {
    public static void main(String[] args) {
        //新建几个User实例
        User u1=new User(1,"a",21);
        User u2=new User(2,"b",22);
        User u3=new User(3,"c",23);
        User u4=new User(4,"d",24);
        User u5=new User(6,"e",25);
        //集合就是存储
        List<User> list= Arrays.asList(u1,u2,u3,u4,u5);
        //计算交给Stream流
        list.stream()
                .filter(u->{return u.getId()%2==0;})
                .filter(u->{return u.getAge()>23;})
                .map(u->{return u.getName().toUpperCase();})
                .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
                .limit(1)
                .forEach(System.out::println);

        /**
         *     default Stream<E> stream() {
         *         return StreamSupport.stream(spliterator(), false);
         *     }
         *
         *     Stream<T> filter(Predicate<? super T> predicate);
         *
         *     <R> Stream<R> map(Function<? super T, ? extends R> mapper);
         *
         *     Stream<T> sorted(Comparator<? super T> comparator);
         *
         *     void forEach(Consumer<? super T> action);
         */

    }
}
//User类需要写