6 读写锁
ReadWriteLock
package com.snowdong;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 独占锁(写锁)一次只能被一个线程占有
* 共享锁(读锁)多个线程可以同步占有
* 读-读,可以共存
* 读-写,不能共存
* 写-写,不能共存
*/
public class TestReadWriteLockDemo {
public static void main(String[] args) {
//MyCache mycache=new MyCache();
MyCacheLock myCache=new MyCacheLock();
//写入
for (int i = 0; i < 5; i++) {
final int temp=i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
//读取
for (int i = 0; i < 5; i++) {
final int temp=i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyCacheLock{
private volatile Map<String,Object> map=new HashMap<>();
//读写锁,更加细粒度的控制
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private Lock lock=new ReentrantLock();//常用的锁,锁住调用对象
//存,写
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try{
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写入OK");
}catch (Exception e){
e.printStackTrace();
}finally{
readWriteLock.writeLock().unlock();
}
}
//取,读
public void get(String key){
readWriteLock.readLock().lock();
try{
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o=map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
}catch (Exception e){
e.printStackTrace();
}finally {
readWriteLock.readLock().unlock();
}
}
}
//不加锁锁状态会出现,读写混乱
class MyCache{
private volatile Map<String,Object> map=new HashMap<>();
//存,写
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写入OK");
}
//取,读
public void get(String key){
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object o=map.get(key);
System.out.println(Thread.currentThread().getName()+"读取OK");
}
} 7 阻塞队列
写入:如果队列满了,则阻塞等待。读取:如果队列空,则阻塞等待
- 什么情况下我们会使用 阻塞队列:多线程并发处理,线程池!
- 学会使用队列: 添加、移除
- 四组API
7.1 ArrayBlockingQueue数组阻塞队列
代码具体实现展示:
package com.snowdong;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class TestBlockingQueue {
public static void main(String[] args) throws InterruptedException {
//test1();
//test2();
test4();
}
/**
* 抛出异常
*/
public static void test1(){
/**
* public ArrayBlockingQueue(int capacity) {
* this(capacity, false);
* }
*/
BlockingQueue blockingQueue=new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//再添加元素则报错,java.lang.IllegalStateException: Queue full抛出异常
//System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//再移除元素则报错,java.util.NoSuchElementException抛出异常
System.out.println(blockingQueue.remove());
}
/**
* 有返回值,没有异常
*/
public static void test2(){
BlockingQueue blockingQueue=new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
//再次添加,则会返回false,不报错
//System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//再次删除,则会返回null,不报错
System.out.println(blockingQueue.poll());
}
/**
* 等待,阻塞(一直阻塞)
*/
public static void test3() throws InterruptedException {
BlockingQueue blockingQueue=new ArrayBlockingQueue<>(2);
blockingQueue.put("a");
blockingQueue.put("b");
//再次往队列中添加,队列满,则一直阻塞
//blockingQueue.put("c");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//再次删除,队列空,则一直阻塞
System.out.println(blockingQueue.take());
}
/**
* 等待,阻塞(等待超时)
*/
public static void test4() throws InterruptedException {
BlockingQueue blockingQueue=new ArrayBlockingQueue<>(2);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
//等待2秒就推出,不报错
/**
* boolean offer(E e, long timeout, TimeUnit unit)
* throws InterruptedException;
*/
blockingQueue.offer("c",2, TimeUnit.SECONDS);
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//等待3秒,还不能等到队列中来值,则推出,不报错
/**
*
*E poll ( long timeout, TimeUnit unit)
* throws InterruptedException;
*/
blockingQueue.poll(2,TimeUnit.SECONDS);
}
} 7.2 SynchronousQueue同步队列
没有容量设置,进去一个元素,必须等待取出来后,才能往里面放元素
put,take(即阻塞,等待)
package com.snowdong;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* 同步队列和其他BlockingQueue不一样,SynchronousQueue不存储元素
* put了一个元素,必须从里面take出来,否则不能再put进去
*/
public class TestSynchronousQueue {
public static void main(String[] args) {
//同步队列
BlockingQueue blockingQueue=new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put a");
blockingQueue.put("a");
System.out.println(Thread.currentThread().getName()+" put b");
blockingQueue.put("b");
System.out.println(Thread.currentThread().getName()+" put c");
blockingQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
System.out.println(Thread.currentThread().getName()+" take "+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
} 有问题待研究
8 线程池
3大方法,7大参数,4中拒绝策略
线程池的好处
- 降低资源的消耗
- 提高响应的速度
- 方便管理
- 线程复用,可以控制最大并发数,管理线程*
8.1 三大方法
具体代码说明:
package com.snowdong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//Executors工具类,三大方法
public class TestThreadTool {
public static void main(String[] args) {
//单个线程的线程池
ExecutorService threadPool1= Executors.newSingleThreadExecutor();
//创建一个固定大小的线程池
ExecutorService threadPool2=Executors.newFixedThreadPool(5);
//可伸缩的线程池
ExecutorService threadPool3=Executors.newCachedThreadPool();
try{
for (int i = 0; i < 10; i++) {
threadPool2.execute(() -> {
System.out.println(Thread.currentThread().getName() + " OK");
});
}
}catch (Exception e){
e.printStackTrace();
}finally{
//线程池用完,关闭线程池
threadPool2.shutdown();
}
}
} 8.2 七大参数
三大方法的构造函数源码说明:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
} ThreadPoolExecutor()的具体源码说明:
public ThreadPoolExecutor(int corePoolSize,// 核心线程池大小
int maximumPoolSize,// 最大核心线程池大小
long keepAliveTime,// 超时了没有人调用就会释放,(线程池中非核心线程多久没被调用了即释放)
TimeUnit unit,// 超时单位
BlockingQueue<Runnable> workQueue,// 阻塞队列
ThreadFactory threadFactory,// 线程工厂:创建线程的,一般
不用动
RejectedExecutionHandler handler) {// 拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
} 对应线程池的现实场景对比
8.3 四种拒绝策略
具体代码展示:
package com.snowdong;
import java.util.concurrent.*;
// Executors 工具类、3大方法
/**
* //Abort:流产,终止,使夭折
* new ThreadPoolExecutor.AbortPolicy()//银行满了,还有人进来,不处理这个人,抛出异常
* java.util.concurrent.RejectedExecutionException
*
* new ThreadPoolExecutor.CallerRunsPolicy()//哪里来去哪里,即把这个返回原线程中.再本例中会返回给主线程main
* new ThreadPoolExecutor.DiscardPolicy()//队列满了,丢掉任务,不会抛出异常
* new ThreadPoolExecutor.DiscardOldestPolicy()//队列满了,丢掉阻塞队列中最老的那个任务,不抛出异常。不影响正在执行的任务
*/
//手动创建一个线程池
public class TestThreadPool01 {
public static void main(String[] args) {
//自定义线程池,工作ThreadPoolExecutor
ExecutorService threadPool=new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
try{
// 最大承载:Deque + max
for (int i = 1; i <= 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " OK");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
} 池的最大的大小如何去设置!
了解:IO密集型,CPU密集型:(调优)
// 最大线程到底该如何定义
// 1、CPU 密集型,几核,就是几,可以保持CPu的效率最高!
// 2、IO 密集型 > 判断你程序中十分耗IO的线程,
// 程序 15个大型任务 io十分占用资源!
//获取CPU的核数
System.out.println(Runtime.getRuntime().availableProcessors());
//自定义线程池,工作ThreadPoolExecutor
ExecutorService threadPool=new ThreadPoolExecutor(
2,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy()); lambda表达式,链式编程,函数式接口,Stream流式计算
9 四大函数式接口
9.1 Function函数式接口
只有一个方法的接口
代码测试Function函数式接口
package com.snowdong;
import java.util.function.Function;
public class TestFunctionInterface {
public static void main(String[] args) {
/* Function<String,String> function = new Function<String,String>(){
@Override
public String apply(String str) {
return str;
}
};*/
Function<String,String> function=(str)->{
return str;
};
System.out.println(function.apply("snow"));
}
} 9.2 Predicate断定型接口
有一个输入参数,返回值是一个布尔值
代码测试如下:
package com.snowdong;
import java.util.function.Function;
import java.util.function.Predicate;
public class TestFunctionInterface {
public static void main(String[] args) {
/* //测试字符串是否为空
Predicate<String> predicate = new Predicate<String>(){
@Override
public boolean test(String s) {
return s.isEmpty();
}
};*/
Predicate<String> predicate=(s)->{
return s.isEmpty();
};
System.out.println(predicate.test("a"));
}
} 9.3 Consumer消费型接口
只有一个输入值,没有返回值
代码测试如下:
package com.snowdong;
import java.util.function.Consumer;
public class TestFunctionInterface {
public static void main(String[] args) {
/* Consumer<String> consumer = new Consumer<String>(){
@Override
public void accept(String s) {
System.out.println(s);
}
};*/
Consumer<String> consumer=(s)->{
System.out.println(s);
};
consumer.accept("snow");
}
} 9.4 Supplier供给型接口
无参数输入,只有返回值
代码测试如下:
package com.snowdong;
import java.util.function.Supplier;
public class TestFunctionInterface {
public static void main(String[] args) {
/* Supplier<String> supplier=new Supplier<String>(){
@Override
public String get() {
return "snow";
}
};*/
Supplier<String> supplier=()->{
return "snow";
};
System.out.println(supplier.get());
}
} 10 Stream流式计算
计算通过流来操作。
代码测试如下:
package com.snowdong;
import java.util.Arrays;
import java.util.List;
/**
* 题目要求:一分钟内完成此题,只能用一行代码实现!
* 现在有5个用户!筛选:
* 1、ID 必须是偶数
* 2、年龄必须大于23岁
* 3、用户名转为大写字母
* 4、用户名字母倒着排序
* 5、只输出一个用户!
*/
public class TestStream {
public static void main(String[] args) {
//新建几个User实例
User u1=new User(1,"a",21);
User u2=new User(2,"b",22);
User u3=new User(3,"c",23);
User u4=new User(4,"d",24);
User u5=new User(6,"e",25);
//集合就是存储
List<User> list= Arrays.asList(u1,u2,u3,u4,u5);
//计算交给Stream流
list.stream()
.filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>23;})
.map(u->{return u.getName().toUpperCase();})
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
.limit(1)
.forEach(System.out::println);
/**
* default Stream<E> stream() {
* return StreamSupport.stream(spliterator(), false);
* }
*
* Stream<T> filter(Predicate<? super T> predicate);
*
* <R> Stream<R> map(Function<? super T, ? extends R> mapper);
*
* Stream<T> sorted(Comparator<? super T> comparator);
*
* void forEach(Consumer<? super T> action);
*/
}
}
//User类需要写 


京公网安备 11010502036488号