Java 9中新增了反应式/响应式编程的Api-Flow反应式/响应式编程是一种关注于数据流变化传递的异步编程方式,通过响应式编程可以很好的解决背压问题。响应式编程的实现方式是基于观察者模式的扩展,在Flow中存在Publisher(发布者)Subscriber(订阅者)Subscription(订阅)Processor(处理器)Flow结构如下:

背压:由消费者控制生产者的生产速度,以解决生产者生产的速度远大于消费者消费的速度时所造成的消息的积压

Publisher

Publisher函数式接口,负责发布数据。Publisher中函数subscribe负责绑定Subscriber

Subscriber

Subscriber是静态内部接口,负责订阅消费数据。Subscriber中定义四个方法,分别是onSubscribeonNextonErroronComplete,这四个方***在相应情况下触发

  • onSubscribe;订阅成功后触发,并且表明可以开始接收订阅数据了
  • onNext:获取接受数据的下一项
  • onError:在发布者或订阅遇到错误时触发

= onComplete:接受完所有的数据后触发

Subscription

Subscription关联PublisherSubscriber,表示PublisherSubscriber的订阅关系。Subscription使用requestcancel方法管理PublisherSubscriber的消费

  • request:请求获取数据的次数
  • cancel:取消订阅,订阅者不在接受数据

在 cancel 方法调用之后,发布者仍然有可能继续发布通知。但订阅最终会被取消

Processor

Processor继承了PublisherSubscriber接口,表示既是生产者,又是订阅者的特殊对象。Processor一般作为数据的中转处理站, 将数据处理之后发给下个订阅者

使用示例

  1. 通过实现Publisher接口自定义一个发布者
public class IntPublisher implements Flow.Publisher<Integer>{

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {

        for(int i = 1; i <= 5; i++) {
            System.out.println("Publishing = " + i);
            // 将数据发给订阅者
            subscriber.onNext(i);
        }
        // 发出完成信号
        subscriber.onComplete();
    }
}
复制代码
  1. 通过实现Subscriber接口自定义一个订阅者
public class IntSubscriber implements Flow.Subscriber<Integer>{

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(10);
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("订阅者接收数据: " + item);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("订阅者接收数据出现异常:" + throwable);
    }

    @Override
    public void onComplete() {
        System.out.println("订阅完成");
    }
}
复制代码
  1. new出发布者和订阅者,订阅者订阅发布者
public class FlowTest {

    @Test
    public void test(){

        IntPublisher intPublisher = new IntPublisher();
        IntSubscriber intSubscriber = new IntSubscriber();
       
        intPublisher.subscribe(intSubscriber);
        
    }
}
复制代码

背压演示

  1. 通过实现Subscriber接口自定义一个订阅者,请使用request方法控制请求数据的次数
public class IntSubscriber implements Flow.Subscriber<Integer> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {

        System.out.println(Thread.currentThread().getName() + " | 订阅,开始请求数据");
        this.subscription = subscription;

        // 第一次请求获取数据的个数,如果不行将不会获取数据
        this.subscription.request(1);
    }

    @SneakyThrows
    @Override
    public void onNext(Integer item) {

        // 消费者5秒消费一个
        Thread.sleep(5000);
        System.out.println(Thread.currentThread().getName() + " | 订阅者接收数据: " + item);
        
        // 这里只在item等于0的时候使用request请求五次,以演示`request`的作用
        if (item == 0){
            // 下一次请求获取数据的个数,如果这里不写将不在请求数据
            this.subscription.request(5);
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("订阅者接收数据出现异常:" + throwable);
    }

    @Override
    public void onComplete() {

        System.out.println(Thread.currentThread().getName() + " | 订阅完成");
    }
}
复制代码
  1. 使用jdk自带的Publisher实现类SubmissionPublisher发布数据
    @Test
    public void test() throws InterruptedException {
        
        // 定义有一个线程的线程池,使生产者和消费者在两个线程上工作
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        // SubmissionPublisher第二个参数控制缓存数据的个数(内部有个计算的公式)
        SubmissionPublisher<Integer> sb = new SubmissionPublisher<>(executorService, 10);
        System.out.println("getMaxBufferCapacity: " + sb.getMaxBufferCapacity());

        IntSubscriber intSubscriber = new IntSubscriber();
        sb.subscribe(intSubscriber);
        for (int i = 0; i < 20; i++) {
            System.out.println(Thread.currentThread().getName() + " |  发布数据: " + i);
            sb.submit(i);
        }

        sb.close();
        executorService.shutdown();

        Thread.sleep(10000);
    }

}
复制代码

运行结果

发布者在主线程上发布数据,订阅者在子线程上订阅消费数据。主线程发布数据的速率远大于订阅者消费的速率,如运行结果所示,发布者很快的发布18个数据达到最大容量,之后消费者每消费一个数据发布者才发布一个数据


作者:不可食用盐
链接:https://juejin.cn/post/6994712328197373989
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。