个人技术博客:www.zhenganwen.top
引言
众所周知,JUC出自并发大师Doug Lea之手,他对Java并发性能的提升做出了巨大的贡献。而在JDK1.5未引入JUC之前,Doug Lea其实就已经写了一套JUC类库并受到社区的大力支持。本文就是介绍Doug Lea写JUC之前提出的一些方法论,JUC是基于这些方法论实践的结果(外国学者的这一点品质值得我们学习:先研究出一套可行的方法论,那么实践便是有据可依,必有成效)。
观察者模式
如图所示观察者模式角色关系图,主题暴露addObserver
方法让任何观察者都可以观察自己感兴趣的主题,当主题发生变动时自己会主动通知通过addObserver
注册到自己身上的所有观察者。相比较观察者不断轮询主题而言,这种机制能够大大减轻观察者的负担而使观察者专注于当主题发生变化后应执行的业务。
模板
Observablle/Subject
public interface Observable {
void addObserver(Observer observer);
void deleteObserver(Observer observer);
void notifyAllObservers();
}
复制代码
Observer
public interface Observer {
void notifyUpdate();
}
复制代码
IntegerObservable(concrete subject)
public class IntegerObservable implements Observable {
private int state;
private Collection<Observer> observers;
public IntegerObservable() {
this.observers = new ArrayList<>();
}
public int getState() {
return state;
}
public void setState(int state) {
if (state == this.state) {
return;
}
this.state = state;
notifyAllObservers();
}
@Override
public void addObserver(Observer observer) {
observers.add(observer);
}
@Override
public void deleteObserver(Observer observer) {
observers.remove(observer);
}
@Override
public void notifyAllObservers() {
observers.forEach(Observer::notifyUpdate);
}
}
复制代码
BinaryObserver(concrete observer)
public class BinaryObserver implements Observer {
private IntegerObservable observable;
public BinaryObserver(IntegerObservable observable) {
this.observable = observable;
observable.addObserver(this);
}
@Override
public void notifyUpdate() {
System.out.println(this.getClass().getName() + " received the notify: state -> " + Integer.toBinaryString(this.observable.getState()));
}
}
复制代码
OctalObserver(concrete observer)
public class OctalObserver implements Observer {
private IntegerObservable observable;
public OctalObserver(IntegerObservable observable) {
this.observable = observable;
observable.addObserver(this);
}
@Override
public void notifyUpdate() {
System.out.println(this.getClass().getName() + " received the notify: state -> " + Integer.toOctalString(this.observable.getState()));
}
}
复制代码
ObserverPatternClient(for test)
public class ObserverPatternClient {
public static void main(String[] args) {
IntegerObservable integerObservable = new IntegerObservable();
Observer observer1 = new OctalObserver(integerObservable);
Observer observer2 = new BinaryObserver(integerObservable);
integerObservable.setState(5); // changed
integerObservable.setState(5); // do write, but not changed
integerObservable.setState(10); // changed
}
}
designpattern.observer.OctalObserver received the notify: state -> 5
designpattern.observer.BinaryObserver received the notify: state -> 101
designpattern.observer.OctalObserver received the notify: state -> 12
designpattern.observer.BinaryObserver received the notify: state -> 1010
复制代码
Observer to monitor Thread Lifecycle
需求
我们需要监控任务的执行状态,比如任务何时开始被执行、何时执行结束、是否正常执行完毕、如果执行时出现异常那么原因是什么。
解决方案
-
不断轮询
我们可以不断轮询判断该线程是否存活,
Thread#join
就是一个典型的例子,它通过while(isAlive)
轮询判断它等待的线程是否终结从而使当前线程从join
的调用中返回。这种方案的缺点是观察线程的负担较重从而导致CPU负担较重并且灵活性较低。 -
使用观察者模式将要观察的可执行任务封装成一个可观察的对象
当线程的状态发生改变时主动通知观察线程,这样不仅减小了观察线程的负担,与此同时观察线程还能着手其他的事而没必要在一直无限循环中耗着。
代码示例
我们不一定要按照模板生搬硬套,而应根据需求灵活变更,比如模板中的notifyUpdate()
无參方法在此例中变成onEvent(T event)
,通过一个事件对象将观察者和事件源解耦。
-
可观察的任务
public class ObservableRunnable implements Runnable{ private Runnable task; private LifecycleObserver observer; public ObservableRunnable(Runnable task, LifecycleObserver observer) { this.task = task; this.observer = observer; } public void notifyObserver(Event event) { observer.onEvent(event); } public static enum ThreadState { RUNNING,DONE,ERROR } public static class Event{ private ThreadState state; private Throwable cause; public ThreadState getState() { return state; } public Throwable getCause() { return cause; } public Event(ThreadState state, Throwable cause) { this.state = state; this.cause = cause; } } @Override public void run() { notifyObserver(new Event(ThreadState.RUNNING,null)); try { task.run(); } catch (Throwable e) { notifyObserver(new Event(ThreadState.ERROR, e)); } notifyObserver(new Event(ThreadState.DONE, null)); } } 复制代码
-
生命周期观察者抽象,在目标生命周期发生变化时根据接收的状态做相应的处理
public interface LifecycleObserver<T> { void onEvent(T event); } 复制代码
-
具体的生命周期观察者,观察任务的执行状态:
public class RunnableLifecycleObserver implements LifecycleObserver<ObservableRunnable.Event>{ @Override public void onEvent(ObservableRunnable.Event event) { if (event.getCause() == null) { System.out.println(Thread.currentThread().getName() + " notify the state of task : state -> " + event.getState()); } else { System.err.println(Thread.currentThread().getName() +" execute fail and the cause is " + event.getCause().getMessage()); try { // you can deal with the failing task TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } } 复制代码
-
测试1:任务正常执行完毕:
public class RunnableLifecycleMonitorClient { public static void main(String[] args) { conccurentQueryForIds(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9), new RunnableLifecycleObserver()); } public static void conccurentQueryForIds(List<Integer> ids, RunnableLifecycleObserver listener) { ids.stream().forEach(id -> { new Thread(new ObservableRunnable(() -> { try { // sleep some seconds to simulate doing work TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5)); } catch (InterruptedException e) { e.printStackTrace(); } },listener)).start(); }); } } Thread-5 notify the state of task : state -> RUNNING Thread-2 notify the state of task : state -> RUNNING Thread-6 notify the state of task : state -> RUNNING Thread-1 notify the state of task : state -> RUNNING Thread-3 notify the state of task : state -> RUNNING Thread-4 notify the state of task : state -> RUNNING Thread-0 notify the state of task : state -> RUNNING Thread-7 notify the state of task : state -> RUNNING Thread-8 notify the state of task : state -> RUNNING Thread-8 notify the state of task : state -> DONE Thread-4 notify the state of task : state -> DONE Thread-6 notify the state of task : state -> DONE Thread-2 notify the state of task : state -> DONE Thread-1 notify the state of task : state -> DONE Thread-3 notify the state of task : state -> DONE Thread-0 notify the state of task : state -> DONE Thread-7 notify the state of task : state -> DONE Thread-5 notify the state of task : state -> DONE 复制代码
-
测试2:添加算术异常模拟程序执行异常:
public static void conccurentQueryForIds(List<Integer> ids, RunnableLifecycleObserver listener) { ids.stream().forEach(id -> { new Thread(new ObservableRunnable(() -> { try { // sleep some seconds to simulate doing work TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5)); if (id % 2 == 0) { int i = 1 / 0; } } catch (InterruptedException e) { e.printStackTrace(); } },listener)).start(); }); } 复制代码
测试结果:
Thread-2 notify the state of task : state -> RUNNING Thread-3 notify the state of task : state -> RUNNING Thread-5 execute fail and the cause is / by zero Thread-1 execute fail and the cause is / by zero Thread-1 notify the state of task : state -> RUNNING Thread-0 notify the state of task : state -> RUNNING Thread-4 notify the state of task : state -> RUNNING Thread-5 notify the state of task : state -> RUNNING Thread-6 notify the state of task : state -> RUNNING Thread-7 notify the state of task : state -> RUNNING Thread-8 notify the state of task : state -> RUNNING Thread-4 notify the state of task : state -> DONE Thread-7 execute fail and the cause is / by zero Thread-0 notify the state of task : state -> DONE Thread-2 notify the state of task : state -> DONE Thread-8 notify the state of task : state -> DONE Thread-5 notify the state of task : state -> DONE Thread-3 execute fail and the cause is / by zero Thread-1 notify the state of task : state -> DONE Thread-6 notify the state of task : state -> DONE Thread-7 notify the state of task : state -> DONE Thread-3 notify the state of task : state -> DONE 复制代码
单线程执行设计模式
假设我们有这样一个门:每个人想通过这扇门的时候需要报上自己的名号和去往的地方(第9~10
行),只有当名号和目的地的首字母相同时才允许通过(第15
行)
public class Gate {
private String name = "nobody";
private String destination = "nowhere";
private int counter;
public void pass(String name, String destination) {
counter++;
this.name = name;
this.destination = destination;
verify();
}
private void verify() {
if (name.charAt(0) != destination.charAt(0)) {
System.out.println("No." + counter + "==============broken : name -> " + name + " , destination -> " + destination);
}
}
}
复制代码
有门自然就有过门者,假设过门者的使命就是不断地重复通过一扇门:
public class Walker extends Thread{
private String name;
private String destination;
private Gate gate;
public Walker(String name, String destination,Gate gate) {
this.name = name;
this.destination = destination;
this.gate = gate;
}
@Override
public void run() {
System.out.println(name + " start walking...");
while (true) {
gate.pass(name, destination);
}
}
}
复制代码
在这个故事中,世界上只有一扇门,来者都需从此门过:
public class ConcurrentPassOneGate {
public static void main(String[] args) {
Gate gate = new Gate();
Walker bob = new Walker("bob","beijing",gate);
Walker jack = new Walker("jack","jinan",gate);
Walker tom = new Walker("tom", "tianjin", gate);
bob.start();
jack.start();
tom.start();
}
}
bob start walking...
tom start walking...
jack start walking...
No.255==============broken : name -> tom , destination -> tianjin
No.460==============broken : name -> tom , destination -> tianjin
No.736==============broken : name -> tom , destination -> tianjin
No.935==============broken : name -> tom , destination -> tianjin
No.1167==============broken : name -> bob , destination -> beijing
No.1386==============broken : name -> jack , destination -> jinan
No.1658==============broken : name -> tom , destination -> tianjin
No.1887==============broken : name -> tom , destination -> tianjin
......
复制代码
你会发现虽然这三个人都符合过门的条件,但是当三个人一同涌入时(同时报名号和目的地,而守门者只能听清其中的一对)可能会发生:守门者只听见了其中的bob
和tianjin
(对应Gate
类的第9,10
行喊了bob
和beijing
,但在第16
行被tom
喊的tianjin
盖住了beijing
)于是不让他过。
这也就是为什么各大景区入口都要设置单行检票口的原因,不然检票员拿着张三的身份证对着李四的脸看那不得闹误会吗,自然只能一个一个来。Single Threaded Execution Pattern
就对应这个单行检票口,而技巧就是使用synchronized
修饰临界区,就本例而言,临界区是pass
方法,verify
是private
且内联入了pass
因此可以不予考虑。
public synchronized void pass(String name, String destination) {
counter++;
this.name = name;
this.destination = destination;
verify();
}
复制代码
再次运行:发现再也没有过门失败的了
bob start walking...
jack start walking...
tom start walking...
复制代码
不可变设计模式
以下摘自Oracle官网:
An object is considered immutable if its state cannot change after it is constructed. Maximum reliance on immutable objects is widely accepted as a sound strategy for creating simple, reliable code.
Immutable objects are particularly useful in concurrent applications. Since they cannot change state, they cannot be corrupted by thread interference or observed in an inconsistent state.
Programmers are often reluctant to employ immutable objects, because they worry about the cost of creating a new object as opposed to updating an object in place. The impact of object creation is often overestimated, and can be offset by some of the efficiencies associated with immutable objects. These include decreased overhead due to garbage collection, and the elimination of code needed to protect mutable objects from corruption.
当一个对象的状态自其被创建之后就不可改变那么该对象就成为不可变对象。推崇尽可能地依赖不可变对象,因为其创建简单且线程安全。
不可变对象尤其适合用在并发应用程序,因为他们不会改变自身的状态,自然就不会出现在多线程访问下一个线程因为执行写操作而导致另一个线程读取到不一致的数据(对后者来说并不知道前者的存在,前者的写操作导致后者的读操作就像见了鬼一样数据发生了莫名其妙的变化)。
开发者通常不希望使用不可变对象,他们担心的是创建对象时就赋予不可变的状态而不是创建后无法更新状态。对象创建所带来的开销通常是无法估量的(自动内存管理系统需要维护它),但是如果有效的使用不可变对象就能够降低甚至消除一些开销(比如降低垃圾回收的开销和消除保证可变对象线程安全的同步代码块)。
规则
- 类不可被继承,使用
final
修饰 - 实例字段私有化且不可改变,使用
private final
修饰,静态字段常量化static final
- 实例字段需显式初始化或在构造方法中初始化,且仅可提供读方法而不不提供写方法
- 引用字段的读方法需返回一个不可变对象或者返回一个可变对象的副本
以下就是一个不可变对象的例子:
public final class ImmutableObject {
private static final String COUNTRY = "CHINA";
private final String name;
private final String sex;
private final Collection<String> hobbies;
public ImmutableObject(String name, String sex, Collection<String> hobbies) {
this.name = name;
this.sex = sex;
this.hobbies = hobbies;
}
public String getName() {
return name;
}
public String getSex() {
return sex;
}
public Collection<String> getHobbies() {
return Collections.unmodifiableCollection(hobbies);
}
}
复制代码
绝对安全和相对安全
不可变对象是绝对线程安全的,也就是说在任何时候调用它的行为都能够得到一致的结果。像我们常说的线程安全的StringBuffer
则是相对安全的,即保证单次操作是原子的、不***扰的,但复合操作如str += “xxx”
就不一定了。
读写锁分离设计模式
有的并发系统中通常是读多写少的,门户新闻网站就是一个典例。而读操作是不会产生线程间干扰的,如果张三的读请求还要等李四的读请求处理完之后响应,那么系统就是自己给自己找不快了。因此我们需要分析一下此种情况下的同步能否优化:
read | write | |
---|---|---|
read | yes | no |
write | no | no |
经分析可知,一个线程的读并不会干扰另一个线程的读,因此我们可以考虑将锁一分为二:一把读锁和一把写锁,当读锁被持有时,后续的所有读线程可继续获取读锁从而实现并发地读,只有当写锁被持有时后续的读写线程才会被阻塞在临界区之外。
读共享
public class ReadWriteLock {
private HashSet<Thread> readers; //可以有多个线程同时获取读锁
private Thread writer; //同一时刻只有一个线程能够持有写锁
public ReadWriteLock() {
readers = new HashSet<>();
writer = null;
}
public synchronized void getReadLock() throws InterruptedException {
while (writer != null) {
this.wait();
}
readers.add(Thread.currentThread());
System.out.println(Thread.currentThread().getName() + " got the read lock and do working...");
}
public synchronized void releaseReadLock() {
Thread currentThread = Thread.currentThread();
if (!readers.contains(currentThread)) {
throw new IllegalStateException(currentThread.getName()+" didn't hold the read lock");
}
readers.remove(currentThread);
System.out.println(currentThread.getName() + " release the read lock");
this.notifyAll();
}
public synchronized void getWriteLock() throws InterruptedException {
while (!readers.isEmpty() || writer != null) {
this.wait();
}
writer = Thread.currentThread();
System.out.println(writer.getName()+" got the write lock and start working...");
}
public synchronized void releaseWriteLock() {
Thread currentThread = Thread.currentThread();
if (currentThread != writer) {
throw new IllegalStateException(currentThread.getName() + " is not owner of the write lock");
}
System.out.println(writer.getName() + " release the write lock");
writer = null;
this.notifyAll();
}
}
复制代码
并发测试:10个线程读、1个线程写
public class ReadWriteLockTest {
public static void main(String[] args) {
ReadWriteLock readWriteLock = new ReadWriteLock();
IntStream.range(0, 10).forEach(id -> {
new Thread(() -> {
try {
readWriteLock.getReadLock();
// do read
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}finally { // ensure the release of lock
readWriteLock.releaseReadLock();
}
}).start();
});
new Thread(() -> {
try {
readWriteLock.getWriteLock();
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
readWriteLock.releaseWriteLock();
}
}).start();
}
}
Thread-0 got the read lock and do working...
Thread-2 got the read lock and do working...
Thread-1 got the read lock and do working...
Thread-3 got the read lock and do working...
Thread-4 got the read lock and do working...
Thread-5 got the read lock and do working...
Thread-6 got the read lock and do working...
Thread-7 got the read lock and do working...
Thread-8 got the read lock and do working...
Thread-9 got the read lock and do working...
Thread-5 release the read lock
Thread-8 release the read lock
Thread-1 release the read lock
Thread-9 release the read lock
Thread-7 release the read lock
Thread-0 release the read lock
Thread-2 release the read lock
Thread-4 release the read lock
Thread-6 release the read lock
Thread-3 release the read lock
Thread-10 got the write lock and start working...
Thread-10 release the write lock
复制代码
写饥饿
至此读写分离的功能我们是实现了,但是上述运行结果表露出一个问题,在读锁以被持有后加入的写线程会因为源源不断加入的读线程一直霸占读锁而迟迟不能执行写操作,这叫写饥饿。
写饥饿势必会造成更新操作的延迟,这对讯息顺时万变的互联网并不友好,那么我们能否再改进一下,当有写线程等待时优先让其获取写锁。
public class ReadWriteLock {
private HashSet<Thread> readers;
private Thread writer;
private boolean preferWrite;
private int waitingWriters;
public ReadWriteLock() {
this(true);
}
public ReadWriteLock(boolean preferWrite) {
readers = new HashSet<>();
writer = null;
this.preferWrite = preferWrite;
waitingWriters = 0;
}
public synchronized void getReadLock() throws InterruptedException {
while (writer != null || (preferWrite && waitingWriters > 0)) {
this.wait();
}
readers.add(Thread.currentThread());
System.out.println(Thread.currentThread().getName() + " got the read lock and do working...");
}
public synchronized void releaseReadLock() {
Thread currentThread = Thread.currentThread();
if (!readers.contains(currentThread)) {
throw new IllegalStateException(currentThread.getName() + " didn't hold the read lock");
}
readers.remove(currentThread);
System.out.println(currentThread.getName() + " release the read lock");
this.notifyAll();
}
public synchronized void getWriteLock() throws InterruptedException {
waitingWriters++;
while (!readers.isEmpty() || writer != null) {
this.wait();
}
waitingWriters--;
writer = Thread.currentThread();
System.out.println(writer.getName() + " got the write lock and start working...");
}
public synchronized void releaseWriteLock() {
Thread currentThread = Thread.currentThread();
if (currentThread != writer) {
throw new IllegalStateException(currentThread.getName() + " is not owner of the write lock");
}
System.out.println(writer.getName() + " release the write lock");
writer = null;
this.notifyAll();
}
}
复制代码
并发测试:
public class ReadWriteLockTest {
public static void main(String[] args) {
ReadWriteLock readWriteLock = new ReadWriteLock();
IntStream.range(0, 10).forEach(id -> {
new Thread(() -> {
if (id % 4 != 0) {
try {
readWriteLock.getReadLock();
// do read
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
} finally { // ensure the release of lock
readWriteLock.releaseReadLock();
}
} else {
try {
readWriteLock.getWriteLock();
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readWriteLock.releaseWriteLock();
}
}
}).start();
});
}
}
Thread-1 got the read lock and do working...
Thread-2 got the read lock and do working...
Thread-2 release the read lock
Thread-1 release the read lock
Thread-0 got the write lock and start working...
Thread-0 release the write lock
Thread-8 got the write lock and start working...
Thread-8 release the write lock
Thread-4 got the write lock and start working...
Thread-4 release the write lock
Thread-9 got the read lock and do working...
Thread-7 got the read lock and do working...
Thread-6 got the read lock and do working...
Thread-5 got the read lock and do working...
Thread-3 got the read lock and do working...
Thread-3 release the read lock
Thread-6 release the read lock
Thread-5 release the read lock
Thread-9 release the read lock
Thread-7 release the read lock
复制代码
测试发现,有写请求时,读写锁会根据preferWrite
的设置来判断是否偏向写请求而搁置读请求,倘若设置成false
则输出如下:
Thread-0 got the write lock and start working...
Thread-0 release the write lock
Thread-9 got the read lock and do working...
Thread-7 got the read lock and do working...
Thread-6 got the read lock and do working...
Thread-5 got the read lock and do working...
Thread-5 release the read lock
Thread-3 got the read lock and do working...
Thread-2 got the read lock and do working...
Thread-1 got the read lock and do working...
Thread-2 release the read lock
Thread-9 release the read lock
Thread-3 release the read lock
Thread-1 release the read lock
Thread-6 release the read lock
Thread-7 release the read lock
Thread-8 got the write lock and start working...
Thread-8 release the write lock
Thread-4 got the write lock and start working...
Thread-4 release the write lock
复制代码
Future设计模式
即未来者模式,当前线程不想阻塞着执行某个任务时可以将其提交给FutureService
(未来者模式的入口),FutureService
会立即返回一个票据(Future
子类),当前线程可以通过此票据在未来的某个时间点调用get
方法获取任务执行结果(异步票据的了逻辑是:任务执行完时会主动将执行结果注入到票据中,因此如果任务未执行完那么get
将会陷入阻塞)。
比如当前线程可能需要从数据库或磁盘或网络请求一份数据,这通常会花费一些时间,于是当前线程可以该任务提交给FutureService
,而FutureService
则另开一个线程异步的执行该任务并立即返回一个Future
给当前线程让其在未来的某个时间点通过Future
获取结果,当前线程就不用阻塞在这个任务上而可以先去做其他事从而高效利用线程资源。
Future接口,票据
public interface Future<T> {
T get();
}
复制代码
T
即任务返回的结果类型
AsyncFuture,异步返回的票据
提交任务后获得AsyncFuture
并不代表已经获取了执行结果,只有当任务被执行完并主动将结果注入到AsyncFuture
时,方可通过get
无阻塞地拿到结果,否则提前调用get
仍会陷入阻塞
public class AsyncFuture<T> implements Future<T> {
private T result;
private boolean done;
public AsyncFuture() {
result = null;
done = false;
}
public synchronized void done(T result) {
this.result = result;
done = true;
this.notifyAll();
}
@Override
public synchronized T get() throws InterruptedException {
while (!done) {
this.wait();
}
return result;
}
}
复制代码
FutureService,屏蔽Future实现,接受异步执行请求
public class FutureService<T> {
public Future<T> submit(FutureTask<T> task) {
AsyncFuture future = new AsyncFuture();
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+" execute task asynchronously...");
T result = task.call();
System.out.println(Thread.currentThread().getName()+" execute task asynchronously done...");
future.done(result);
}).start();
return future;
}
}
复制代码
第5-10
行另开一个线程异步执行提交的任务,第11
立即返回创建的票据,任务异步执行结束时主动将结果注入回票据。
测试
public class AsyncFutureTest {
public static void main(String[] args) throws InterruptedException {
FutureService futureService = new FutureService();
Future future = futureService.submit(() -> {
try {
// as if query from db
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "document";
});
System.out.println(Thread.currentThread().getName() + " continue do other things...");
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " continue do other things finished");
System.out.println(Thread.currentThread().getName() + " try to get future result:" + future.get());
}
}
main continue do other things...
Thread-0 execute task asynchronously...
Thread-0 execute task asynchronously done...
main continue do other things finished
main try to get future result:document
复制代码
增加异步回调
上述模式仍存在一个缺点,那就是当前线程并不知道异步任务何时会执行结束,也就是说当前线程如果在异步任务执行结束之前调用future.get
仍会陷入阻塞,上例测试中第15
行预判异步任务5秒后执行结束因此17
行的get
能够直接返回,但实际上如果第7
行异步任务耗时较久呢,当前线程仍将阻塞,这也是JDK8之前Future
为人所诟病的地方。
对于get
的不确定性,我们可以引入异步回调机制,让异步线程在执行完异步任务后直接执行我们提供的回调函数对执行结果进行消费:
public interface Consumer<T> {
void consum(T result);
}
public class FutureService<T> {
public Future<T> submit(FutureTask<T> task, Consumer<T> consumer) {
AsyncFuture future = new AsyncFuture();
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+" execute task asynchronously...");
T result = task.call();
System.out.println(Thread.currentThread().getName() + " execute task asynchronously done...");
if (consumer != null) {
consumer.consum(result);
}
future.done(result);
}).start();
return future;
}
}
public class AsyncFutureTest {
public static void main(String[] args) throws InterruptedException {
FutureService futureService = new FutureService();
Future future = futureService.submit(() -> {
try {
// as if query from db
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "document";
}, System.out::println);
System.out.println(Thread.currentThread().getName() + " continue do other things and don't care the task");
TimeUnit.SECONDS.sleep(10);
}
}
main continue do other things and don't care the task
Thread-0 execute task asynchronously...
Thread-0 execute task asynchronously done...
document
复制代码
通过第12-14
行、32
行,将异步任务执行结果的处理封装成回调函数交给FutureService
,你异步执行完后也带着处理结果好了,当前线程(main
线程)就无需关心异步执行结果了,因为它直到FutureService
根据自己设置的回调(第32
行)帮忙处理的。
于此同时,main
线程还是能通过future
去主动地拿结果。
Guarded Suspension设计模式
现实生活中有很多Guarded Suspension
的典型应用,诸如:
- 我正在厨房做饭,此时有快递需要签收但我抽不开身,因此让其代签并将快递放在门口,我稍后会拿进来
- 送信小哥会在按门铃没有响应后将信件放入门口的信箱中并做上标记,家主看到后会去查收
以上暂放快递、信件的门口、邮箱就是该设计模式的核心,也即消息缓冲,用于平衡消息发送、接收两份速率不对等的问题,以使生产者仅关注于消息的生产,而消费者仅关注于消息的接收,不必受双方速率不对等的牵制。该模式也被广泛用于服务端接收客户端请求的消息队列。
MessageQueue
常用的消息缓冲的实现方案就是队列:
public class MessageQueue<T> {
private LinkedList<T> queue;
private int messageSize;
public static final int DEFAULT_MAX_QUEUE_SIZE = Integer.MAX_VALUE;
public MessageQueue() {
this(DEFAULT_MAX_QUEUE_SIZE);
}
public MessageQueue(int messageSize) {
this.queue = new LinkedList<>();
this.messageSize = messageSize;
}
public synchronized void put(T message) throws InterruptedException {
while (queue.size() == messageSize) {
wait();
}
queue.addLast(message);
notifyAll();
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait();
}
T message = queue.pollFirst();
notifyAll();
return message;
}
}
复制代码
测试:
public class MessageQueueTest {
public static void main(String[] args) {
MessageQueue<String> messageQueue = new MessageQueue<>();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
while (true) {
try {
System.out.println(Thread.currentThread().getName() + " consume the message : " + messageQueue.take());
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "consumer-" + i).start();
}
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(3));
String message = "message-" + counter.getAndIncrement();
System.out.println(Thread.currentThread().getName() + " produce the message : " + message);
messageQueue.put(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "producer-" + i).start();
}
}
}
producer-1 produce the message : message-0
consumer-1 consume the message : message-0
producer-1 produce the message : message-1
consumer-0 consume the message : message-1
producer-1 produce the message : message-2
consumer-0 consume the message : message-2
producer-0 produce the message : message-3
....
复制代码
Thread-Specific Storage
多线程引发的线程安全问题就是因为共享数据,如果我们能够让各线程仅访问各自的本地变量,那么就算有再多的线程也无需顾虑同步问题,因为线程间互不干扰。
线程保险箱ThreadLocal
public class ThreadLocalTest {
public static void main(String[] args) {
ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
IntStream.range(0,10).forEach(i->{
new Thread(() -> {
threadLocal.set(i);
System.out.println(Thread.currentThread().getName()+" save value to threadLocal : "+i);
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
System.out.println(Thread.currentThread().getName() + " get value from threadLocal : " + threadLocal.get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
});
}
}
Thread-0 save value to threadLocal : 0
Thread-1 save value to threadLocal : 1
Thread-2 save value to threadLocal : 2
Thread-3 save value to threadLocal : 3
Thread-4 save value to threadLocal : 4
Thread-5 save value to threadLocal : 5
Thread-6 save value to threadLocal : 6
Thread-7 save value to threadLocal : 7
Thread-8 save value to threadLocal : 8
Thread-0 get value from threadLocal : 0
Thread-4 get value from threadLocal : 4
Thread-9 save value to threadLocal : 9
Thread-6 get value from threadLocal : 6
Thread-3 get value from threadLocal : 3
Thread-5 get value from threadLocal : 5
Thread-9 get value from threadLocal : 9
Thread-2 get value from threadLocal : 2
Thread-8 get value from threadLocal : 8
Thread-7 get value from threadLocal : 7
Thread-1 get value from threadLocal : 1
复制代码
上述代码通过ThreadLocal
,每个线程都设置了一个Integer
变量到本地内存(JMM的抽象概念,对应缓存行,线程间不可见)中,每个变量都只能被其隶属的线程访问,因此无论这些线程读写如何交错,都不会发生线程安全问题。
我们来来追溯一下threadLocal.set
(JDK8):
public class ThreadLocal<T>{
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
}
class Thread implements Runnable {
ThreadLocal.ThreadLocalMap threadLocals = null;
}
复制代码
发现threadLocal
会取出当前线程的ThreadLocal.ThreadLocalMap map
,然后以threadLocal
为key
将value
保存到该map
中。
threadLocal.get
则是到当前线程的ThreadLocal.ThreadLocalMap map
以threadLocal
为key
取出对应的value
。如果当前线程的map
为空或者map
中没有对应的key
,那么就会帮当前线程初始化一个map
并返回null
。
public class ThreadLocal<T>{
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
protected T initialValue() {
return null;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
}
复制代码
每个线程的ThreadLocal.ThreadLocalMap map
就是该线程的保险箱(Thread-Specific Storage
),可以以ThreadLocal<T>
实例为key
不限数量地保存各种类型的变量,这些变量只有map
隶属的线程能够访问到。
借鉴这种机制,你可以很容易地自定义一个线程保险箱:
public class MyThreadStorage<T> {
private Map<Thread, T> storage = new HashMap<>();
public synchronized void put(T value) {
Thread currentThread = Thread.currentThread();
storage.put(currentThread, value);
}
public synchronized T take() {
Thread currentThread = Thread.currentThread();
if (!storage.containsKey(currentThread)) {
return null;
}
return storage.get(currentThread);
}
}
复制代码
多线程运行上下文
我们在MVC
的开发中,常会有如下的编码场景:
public class User { //model
private String name;
private long cardId;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getCardId() {
return cardId;
}
public void setCardId(long cardId) {
this.cardId = cardId;
}
}
public class QueryNameFromDB { //dao
public void execute(User user) {
System.out.println(Thread.currentThread().getName() + " query name from db by username...");
try {
TimeUnit.SECONDS.sleep(3);
user.setName("Cowboy");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class QueryCardIdFromHttp { //dao
public void execute(User user) {
if (user.getName() == null) {
throw new IllegalArgumentException("name can't be null");
}
try {
System.out.println(Thread.currentThread().getName()+" query card id from http");
TimeUnit.SECONDS.sleep(5);
user.setCardId(13546543156L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class UserAction { //controller
QueryNameFromDB queryNameFromDB = new QueryNameFromDB();
QueryCardIdFromHttp queryCardIdFromHttp = new QueryCardIdFromHttp();
public void query() {
User user = new User();
queryNameFromDB.execute(user);
queryCardIdFromHttp.execute(user);
System.out.println("query finished, the name is " + user.getName() + " and the card id is " + user.getCardId());
}
public static void main(String[] args) {
new UserAction().query();
}
}
复制代码
在这个例子中我们不知不觉的就用了线程保险箱这个设计模式,只是我们不知道而已:55
中创建了一个局部的context
,因此并发调用query
的多个线程在整个查询逻辑中都只是在访问自己本地内存中的context
,因此不会引发线程安全问题。
但是这样写有一个瑕疵,那就是局部变量context
会被频繁地在方法之间以入参的形式传递,这样未免显得有些重复赘余,这时我们可以通过线程保险箱ThreadLocal
实现线程上下文,使得在需要的时候直接通过它来拿。
核心思想就是利用ThreadLocal
实现一个对应上下文的静态工厂类:
public final class UserContext {
private ThreadLocal<User> threadLocal = new ThreadLocal<User>() {
@Override
protected User initialValue() {
return new User();
}
};
private UserContext() {}
private static class ContextHolder{
private static final UserContext USER_CONTEXT = new UserContext();
}
public static UserContext getUserContext() {
return ContextHolder.USER_CONTEXT;
}
public User getUser() {
return threadLocal.get();
}
}
public class UserAction {
QueryNameFromDB queryNameFromDB = new QueryNameFromDB();
QueryCardIdFromHttp queryCardIdFromHttp = new QueryCardIdFromHttp();
public void query() {
queryNameFromDB.execute();
queryCardIdFromHttp.execute();
User user = UserContext.getUserContext().getUser();
System.out.println("query finished, the name is " + user.getName() + " and the card id is " + user.getCardId());
}
public static void main(String[] args) {
new UserAction().query();
}
}
public class QueryNameFromDB {
public void execute() {
System.out.println(Thread.currentThread().getName() + " query name from db by username...");
try {
TimeUnit.SECONDS.sleep(3);
UserContext.getUserContext().getUser().setName("Cowboy");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class QueryCardIdFromHttp {
public void execute() {
User user = UserContext.getUserContext().getUser();
if (user.getName() == null) {
throw new IllegalArgumentException("name can't be null");
}
try {
System.out.println(Thread.currentThread().getName()+" query card id from http...");
TimeUnit.SECONDS.sleep(5);
user.setCardId(13546543156L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
Balking设计模式
balk
的意思是突然止步不前,该设计模式可以联想生活中的如下场景:
- 场景1:咖啡厅中有两类服务员,一类是对自己接待的顾客周期性的(比如每半小时)主动给顾客续杯;另一类是巡视服务员,职责是巡逻观察是否有主动提出服务要求的客户以及时满足。这就好比有两个线程,一个线程看到我招手示意来给我续杯,这被另一个从远处走来的周期性给我续杯的线程看到了,于是后者立即
balk
(突然止步不前,取消准备执行的行为) - 场景2:很多文档编辑都有周期性自动保存的设置,假设我们设置了每30秒自动保存,如果在某个间隔中我们手动保存了一次并且在本次间隔的自动保存到来之前没有其它修改,那么自动保存被触发时会首先检查一下文件是否是修改状态,如果不是则并不会执行更新写入磁盘的操作(
balk
)。
下面以代码的形式模拟场景2:
- 文档(共享数据)
public class Document {
private String filename;
private String content;
private boolean changed;
public Document(String filename) {
this.filename = filename;
content = "";
changed = false;
}
public String getContent() {
return content;
}
public void setContent(String content) {
if (content.equals(this.content)) {
return;
}
this.content = content;
changed = true;
}
public void save() {
if (!changed) {
System.out.println(Thread.currentThread().getName()+"'s update operation is balking");
return; // balk
}
doSave();
changed = false;
}
private void doSave() {
File file = new File(filename);
try (FileWriter writer = new FileWriter(file)) {
writer.write(content);
System.out.println(Thread.currentThread().getName()+" execute update operation successfully");
} catch (IOException e) {
e.printStackTrace();
}
}
}
复制代码
- 编辑文档的用户
public class CustomerThread extends Thread { // 使用文档编辑软件的顾客
private Document document;
public CustomerThread(Document document) {
super("Customer");
this.document = document;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
document.setContent("No." + i);
System.out.println("Customer change content to "+document.getContent());
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10)); //顾客可能不定期手动保存
document.save();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制代码
- 编辑软件后台自动保存服务
public class WaiterThread extends Thread{ //文档软件后台服务线程,按照顾客设定执行周期性自动保存
static final int AUTOSAVE_INTERVAL = 3000; //ms
private Document document;
public WaiterThread(Document document) {
super("Waiter");
this.document = document;
}
@Override
public void run() {
while (true) {
synchronized (this) {
try {
wait(AUTOSAVE_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Waiter execute auto save");
document.save();
}
}
}
复制代码
- 测试
public class BalkingTest {
public static void main(String[] args) {
Document document = new Document("C:\\Users\\zaw\\Desktop\\balk.txt");
new CustomerThread(document).start();
new WaiterThread(document).start();
}
}
Customer change content to No.0
Waiter execute auto save
Waiter execute update operation successfully
Waiter execute auto save
Waiter's update operation is balking
Waiter execute auto save
Waiter's update operation is balking
Customer's update operation is balking
Customer change content to No.1
Customer execute update operation successfully
Customer change content to No.2
Waiter execute auto save
Waiter execute update operation successfully
...
复制代码
CountDonw设计模式
CountDown
就是一个计数器,每调用一次down()
就将其递减1,只有当若干个线程调用其down()
使得计数器为0时,所有阻塞在其await
上的方法才能够返回。
public class CountDown {
private int counter;
public CountDown(int counter) {
if (counter<=0) {
throw new IllegalArgumentException("counter must >= 0");
}
this.counter = counter;
}
public synchronized void down() {
counter--;
notifyAll();
}
public synchronized void await() throws InterruptedException {
while (counter > 0) {
wait();
}
}
}
复制代码
测试:
public class CountDownTest {
public static void main(String[] args) {
int threadCount = 5;
CountDown countDown = new CountDown(threadCount);
IntStream.range(0, threadCount).forEach(id -> {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " start working...");
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
System.out.println(Thread.currentThread().getName() + " finished done");
countDown.down();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
});
try {
countDown.await();
System.out.println("all finished done");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Thread-0 start working...
Thread-1 start working...
Thread-2 start working...
Thread-3 start working...
Thread-4 start working...
Thread-3 finished done
Thread-0 finished done
Thread-4 finished done
Thread-2 finished done
Thread-1 finished done
all finished done
复制代码