Reactor-Core源码解析(三)

本章解析reactor.core包下的顶层接口,有助于理解reactorAPI的顶层设计思路和理念

1. Fuseable

控制变量解析 [?]

/** Indicates the QueueSubscription can't support the requested mode. */
int NONE = 0;
/** Indicates the QueueSubscription can perform sync-fusion. */
int SYNC = 1;
/** Indicates the QueueSubscription can perform only async-fusion. */
int ASYNC = 2;
/** Indicates the QueueSubscription should decide what fusion it performs (input only). */
int ANY = 3;
/** * Indicates that the queue will be drained from another thread * thus any queue-exit computation may be invalid at that point. * <p> * For example, an {@code asyncSource.map().publishOn().subscribe()} sequence where {@code asyncSource} * is async-fuseable: publishOn may fuse the whole sequence into a single Queue. That in turn * could invoke the mapper function from its {@code poll()} method from another thread, * whereas the unfused sequence would have invoked the mapper on the previous thread. * If such mapper invocation is costly, it would escape its thread boundary this way. */
int THREAD_BARRIER = 4;

融合模式

内部接口解析

ConditionalSubscriber [?]
interface ConditionalSubscriber<T> extends CoreSubscriber<T> {
    

	boolean tryOnNext(T t);
}

一种订阅者变体,它可以立即判断是否使用了该值,如果没有使用,则直接允许发送新值。这就避免了对丢弃的值进行通常的请求(1)往返。

QueueSubscription [?]
interface QueueSubscription<T> extends Queue<T>, Subscription {
   

	int requestFusion(int requestedMode);

	......
}

支持基于订阅的基于队列融合优化的契约。 同步源具有固定的大小,可以以拉方式发出它们的项,因此在许多情况下避免了请求计算开销。 异步源可以同时充当队列和订阅,节省了分配另一个队列的大部分时间。

int requestFusion(int requestedMode);

从该队列订阅请求特定的融合模式。 一个人应该请求SYNC, ASYNC或任何模式(从不是NONE),实现者应该返回NONE, SYNC或ASYNC(从不是ANY)。 例如,如果一个源只支持异步融合,而中间操作符只支持同步融合源,那么操作符可以请求同步融合,源可以通过NONE拒绝它,这样操作符也可以向下游返回NONE,而融合不会发生。

SynchronousSubscription
interface SynchronousSubscription<T> extends QueueSubscription<T> {
   
	@Override
	default int requestFusion(int requestedMode) {
   
		if ((requestedMode & Fuseable.SYNC) != 0) {
   
			return Fuseable.SYNC;
		}
		return NONE;
	}
}

同步源的基类,它有固定的大小,可以以拉的方式发出它们的项,因此在许多情况下避免了请求计算开销。

仅当请求模式为SYNC时,返回SYNC进行握手;否则,返回NONE拒绝。

ScalarCallable
interface ScalarCallable<T> extends Callable<T> {
    }

指示目标可以返回值或null的标记接口,否则立即失败,从而成为程序集时优化的可行目标。

2. CoreSubscriber

public interface CoreSubscriber<T> extends Subscriber<T> {
   
	/** * Request a {@link Context} from dependent components which can include downstream * operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}. * * @return a resolved context or {@link Context#empty()} */
	default Context currentContext(){
   
		return Context.empty();
	}
	/** * Implementors should initialize any state used by {@link #onNext(Object)} before * calling {@link Subscription#request(long)}. Should further {@code onNext} related * state modification occur, thread-safety will be required. * <p> * Note that an invalid request {@code <= 0} will not produce an onError and * will simply be ignored or reported through a debug-enabled * {@link reactor.util.Logger}. * * {@inheritDoc} */
	@Override
	void onSubscribe(Subscription s);
}

一个上下文感知的订阅者,与反应流的原始订阅者相比,放宽了§1.3和§3.9的规则。如果在接收到的订阅上执行了一个无效的请求<= 0,该请求将不会产生一个onError,并且会被忽略。 规则松弛是在反应流公共域下初步建立的。

一个持有上下文的订阅者

3. CorePublisher

public interface CorePublisher<T> extends Publisher<T> {
   
	/** * An internal {@link Publisher#subscribe(Subscriber)} that will bypass * {@link Hooks#onLastOperator(Function)} pointcut. * <p> * In addition to behave as expected by {@link Publisher#subscribe(Subscriber)} * in a controlled manner, it supports direct subscribe-time {@link Context} passing. * * @param subscriber the {@link Subscriber} interested into the published sequence * @see Publisher#subscribe(Subscriber) */
	void subscribe(CoreSubscriber<? super T> subscriber);
}

支持CoreSubscriber的发布者。

允许包含上下文的发布者

4. Disposable

表示可以取消/释放任务或资源。 dispose方法的调用是/应该是幂等的。

常见于Task对象(例如SchedulerTask)或Subscriber对象,提供可取消执行的功能

例如,通过状态机的方式,当调用dispose方法时将状态置为取消,同时调用Future.cancel试图取消任务

当在实际执行时,需要先检查状态机的状态是否可执行

/** * Cancel or dispose the underlying task or resource. * <p> * Implementations are required to make this method idempotent. */
void dispose();
/** * Optionally return {@literal true} when the resource or task is disposed. * <p> * Implementations are not required to track disposition and as such may never * return {@literal true} even when disposed. However, they MUST only return true * when there's a guarantee the resource or task is disposed. * * @return {@literal true} when there's a guarantee the resource or task is disposed. */
default boolean isDisposed() {
   
	return false;
}

void dispose();

取消或释放底层任务或资源。 需要实现使这个方法幂等。

default boolean isDisposed()

当资源或任务被释放时,可选地返回true。 实现不需要跟踪处理,因此即使处理也可能永远不会返回true。但是,只有在保证资源或任务被释放时,它们才必须返回true。

Disposables支持类

一个支持类,它为专用的可丢弃子接口(可丢弃的子接口)的实现提供工厂方法。复合,Disposable.Swap)。

  1. static final class ListCompositeDisposable implements Disposable.Composite, Scannable

使用List维护的复合容器

  1. static final class SwapDisposable implements Disposable.Swap

使用AtomicReferenceFieldUpdater进行原子更新的一次性容器

  1. static final class SimpleDisposable extends AtomicBoolean implements Disposable

简单可处置实例,在调用dispose时进行处理

  1. static final class AlwaysDisposable implements Disposable

始终被处理的容器,调用dispose()不做任何操作,isdispose()总是返回true

  1. static final class NeverDisposable implements Disposable

永远不被处理的实例

内部接口解析

Swap

interface Swap extends Disposable, Supplier<Disposable> {
   

	boolean update(@Nullable Disposable next);

	boolean replace(@Nullable Disposable next);
}

一种一次性容器,允许原子地更新/替换其内部一次性容器,同时允许处理容器本身。

常用于定时器执行,当符合时间条件时,使用新的Task替换原有的定时任务

Composite

interface Composite extends Disposable 

一次性容器本身就是一次性的。通过使用dispose()将一次性物品累积起来并一次性处理掉。使用add(Disposable)方法将所有权交给容器,容器现在负责处理这些方法。但是,您可以通过保留一个引用并使用remove(可丢弃的)来重新获得单个元素的所有权,这就把处理所述元素的责任交给了您。请注意,一旦丢弃,容器就不能再使用,您需要一个新的Disposable.Composite。

复合容器,用于执行一组DisposableTask,并在单个Task执行后从复合容器中移除