Java 9中新增了反应式/响应式编程的Api-Flow
。反应式/响应式编程
是一种关注于数据流
和变化传递
的异步编程方式,通过响应式编程可以很好的解决背压
问题。响应式编程
的实现方式是基于观察者模式
的扩展,在Flow
中存在Publisher(发布者)
、Subscriber(订阅者)
、Subscription(订阅)
和Processor(处理器)
。Flow
结构如下:
背压:由消费者控制生产者的生产速度,以解决生产者生产的速度远大于消费者消费的速度时所造成的消息的积压
Publisher
Publisher
是函数式接口
,负责发布数据。Publisher
中函数subscribe
负责绑定Subscriber
Subscriber
Subscriber
是静态内部接口,负责订阅消费数据。Subscriber
中定义四个方法,分别是onSubscribe
、onNext
、onError
和onComplete
,这四个方***在相应情况下触发
onSubscribe
;订阅成功后触发,并且表明可以开始接收订阅数据了onNext
:获取接受数据的下一项onError
:在发布者或订阅遇到错误时触发
= onComplete
:接受完所有的数据后触发
Subscription
Subscription
关联Publisher
和Subscriber
,表示Publisher
和Subscriber
的订阅关系。Subscription
使用request
和cancel
方法管理Publisher
到Subscriber
的消费
request
:请求获取数据的次数cancel
:取消订阅,订阅者不在接受数据
在 cancel 方法调用之后,发布者仍然有可能继续发布通知。但订阅最终会被取消
Processor
Processor
继承了Publisher
和Subscriber
接口,表示既是生产者,又是订阅者的特殊对象。Processor
一般作为数据的中转处理站
, 将数据处理之后发给下个订阅者
使用示例
- 通过实现
Publisher
接口自定义一个发布者
public class IntPublisher implements Flow.Publisher<Integer>{
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
for(int i = 1; i <= 5; i++) {
System.out.println("Publishing = " + i);
// 将数据发给订阅者
subscriber.onNext(i);
}
// 发出完成信号
subscriber.onComplete();
}
}
复制代码
- 通过实现
Subscriber
接口自定义一个订阅者
public class IntSubscriber implements Flow.Subscriber<Integer>{
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(10);
}
@Override
public void onNext(Integer item) {
System.out.println("订阅者接收数据: " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("订阅者接收数据出现异常:" + throwable);
}
@Override
public void onComplete() {
System.out.println("订阅完成");
}
}
复制代码
- new出发布者和订阅者,订阅者订阅发布者
public class FlowTest {
@Test
public void test(){
IntPublisher intPublisher = new IntPublisher();
IntSubscriber intSubscriber = new IntSubscriber();
intPublisher.subscribe(intSubscriber);
}
}
复制代码
背压演示
- 通过实现
Subscriber
接口自定义一个订阅者,请使用request
方法控制请求数据的次数
public class IntSubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(Thread.currentThread().getName() + " | 订阅,开始请求数据");
this.subscription = subscription;
// 第一次请求获取数据的个数,如果不行将不会获取数据
this.subscription.request(1);
}
@SneakyThrows
@Override
public void onNext(Integer item) {
// 消费者5秒消费一个
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " | 订阅者接收数据: " + item);
// 这里只在item等于0的时候使用request请求五次,以演示`request`的作用
if (item == 0){
// 下一次请求获取数据的个数,如果这里不写将不在请求数据
this.subscription.request(5);
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("订阅者接收数据出现异常:" + throwable);
}
@Override
public void onComplete() {
System.out.println(Thread.currentThread().getName() + " | 订阅完成");
}
}
复制代码
- 使用jdk自带的
Publisher
实现类SubmissionPublisher
发布数据
@Test
public void test() throws InterruptedException {
// 定义有一个线程的线程池,使生产者和消费者在两个线程上工作
ExecutorService executorService = Executors.newSingleThreadExecutor();
// SubmissionPublisher第二个参数控制缓存数据的个数(内部有个计算的公式)
SubmissionPublisher<Integer> sb = new SubmissionPublisher<>(executorService, 10);
System.out.println("getMaxBufferCapacity: " + sb.getMaxBufferCapacity());
IntSubscriber intSubscriber = new IntSubscriber();
sb.subscribe(intSubscriber);
for (int i = 0; i < 20; i++) {
System.out.println(Thread.currentThread().getName() + " | 发布数据: " + i);
sb.submit(i);
}
sb.close();
executorService.shutdown();
Thread.sleep(10000);
}
}
复制代码
运行结果
发布者
在主线程上发布数据,订阅者
在子线程上订阅消费数据。主线程发布数据的速率远大于订阅者消费的速率,如运行结果所示,发布者
很快的发布18个数据达到最大容量
,之后消费者
每消费一个数据发布者
才发布一个数据
作者:不可食用盐
链接:https://juejin.cn/post/6994712328197373989
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。