在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和外部维表进行关联,以获得额外的维度数据。由于外部系统的响应时间和网络延迟可能会很高,如果采用同步调用的方式,那么外部调用的高延迟势必会影响到系统的吞吐量,进而成为系统的瓶颈。这种情况下,我们需要采用异步调用的方式。异步调用相比于同步调用,不同请求的等待时间可以重叠,从而提升了吞吐率。
Async I/O 的使用方式
/** * An implementation of the 'AsyncFunction' that sends requests and sets the callback. */ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { /** The database specific client that can issue concurrent requests with callbacks */ lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials) /** The context used for the future callbacks */ implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = { // issue the asynchronous request, receive a future for the result // 发起异步请求,返回结果是一个 Future val resultFutureRequested: Future[String] = client.query(str) // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future // 请求完成时的回调,将结果交给 ResultFuture resultFutureRequested.onSuccess { case result: String => resultFuture.complete(Iterable((str, result))) } } } // create the original stream val stream: DataStream[String] = ... // 应用 async I/O 转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量 val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)
AsyncDataStream 提供了两种调用方法,分别是 orderedWait 和 unorderedWait,这分别对应了有序和无序两种输出模式。之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证 watermark 的正常处理,即在两个 watermark 之间的消息的异步请求结果可能是异步提交的,但在 watermark 之后的消息不能先于该 watermark 之前的消息提交。
由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。
Async I/O 的实现
AsyncDataStream 在运行时被转换为 AsyncWaitOperator 算子,它是 AbstractUdfStreamOperator 的子类。下面我们来看看 AsyncWaitOperator 的实现原理。
基本原理
AsyncWaitOperator 算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。因此,在 AsyncWaitOperator 内部采用了一种 “生产者-消费者” 模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue 提供了一种队列的抽象,一个“消费者”线程 Emitter 从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色。基本的处理逻辑如下图所示。
public class AsyncWaitOperator<IN, OUT> extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT>, OperatorActions { /** Queue to store the currently in-flight stream elements into. */ private transient StreamElementQueue queue; /** Pending stream element which could not yet added to the queue. */ private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry; private transient ExecutorService executor; /** Emitter for the completed stream element queue entries. */ private transient Emitter<OUT> emitter; /** Thread running the emitter. */ private transient Thread emitterThread; @Override public void processElement(StreamRecord<IN> element) throws Exception { final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element); //注册一个定时器,在超时时调用 timeout 方法 if (timeout > 0L) { // register a timeout for this AsyncStreamRecordBufferEntry long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer( timeoutTimestamp, new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { userFunction.timeout(element.getValue(), streamRecordBufferEntry); } }); // Cancel the timer once we've completed the stream record buffer entry. This will remove // the register trigger task streamRecordBufferEntry.onComplete( (StreamElementQueueEntry<Collection<OUT>> value) -> { timerFuture.cancel(true); }, executor); } //加入队列 addAsyncBufferEntry(streamRecordBufferEntry); //发送异步请求 userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry); } //尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞 private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException { assert(Thread.holdsLock(checkpointingLock)); pendingStreamElementQueueEntry = streamElementQueueEntry; while (!queue.tryPut(streamElementQueueEntry)) { // we wait for the emitter to notify us if the queue has space left again checkpointingLock.wait(); } pendingStreamElementQueueEntry = null; } } public class Emitter<OUT> implements Runnable { @Override public void run() { try { while (running) { //从队列阻塞地获取元素 AsyncResult streamElementEntry = streamElementQueue.peekBlockingly(); output(streamElementEntry); } } } }
AsyncWaitOperator 可以工作在两种模式下,即 ORDERED 和 UNORDERED。Flink 通过 StreamElementQueue 的不同实现实现了这两种模式。
“有序”模式
在“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,StreamElementQueue 的具体是实现是 OrderedStreamElementQueue。OrderedStreamElementQueue 的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。
public class OrderedStreamElementQueue implements StreamElementQueue { /** Capacity of this queue. */ private final int capacity; /** Queue for the inserted StreamElementQueueEntries. */ private final ArrayDeque<StreamElementQueueEntry<?>> queue; @Override public AsyncResult peekBlockingly() throws InterruptedException { lock.lockInterruptibly(); try { //只有队列头部的请求完成后才解除阻塞状态 while (queue.isEmpty() || !queue.peek().isDone()) { headIsCompleted.await(); } return queue.peek(); } finally { lock.unlock(); } } @Override public AsyncResult poll() throws InterruptedException { lock.lockInterruptibly(); try { while (queue.isEmpty() || !queue.peek().isDone()) { headIsCompleted.await(); } notFull.signalAll(); return queue.poll(); } finally { lock.unlock(); } } @Override public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (queue.size() < capacity) { //未达容量上限 addEntry(streamElementQueueEntry); return true; } else { return false; } } finally { lock.unlock(); } } }
“无序”模式
在“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。当然,在使用“事件时间”的情况下,要保证 watermark 语义的正确性。在使用“处理时间”的情况下,由于不存在 Watermark,因此可以看作一种特殊的情况。在 UnorderedStreamElementQueue 中巧妙地实现了这两种情况。
从上图中可以看出,在 UnorderedStreamElementQueue 内部使用了两个队列,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue 中保存未完成的异步请求计算结果,而 completedQueue 中保存已完成的异步请求计算结果。注意,ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue 这个队列中的元素是异步请求计算结果的散列集合,从图中也可以看出, watermarkSet 作为一种特殊的集合,其内部只有一个元素,即 Watermark,充当了不同散列集合之间的分界。这样就保证了在一个 Watermark 之后的异步请求的计算结果不会先于该 Watermark 之前进行提交。firstSet 中完成异步请求的计算结果会被转移到 completedQueue 队列中,firstSet 内部的所有异步请求的计算结果都是可以乱序提交的。
如果不使用“事件时间”,那么没有 Watermark 产生,所有的异步请求都会进入 firstSet 中,因而所有的结果都是乱序提交的。
具体代码实现逻辑如下,结合上面的示意图应该不难理解。
public class UnorderedStreamElementQueue implements StreamElementQueue { /** Queue of uncompleted stream element queue entries segmented by watermarks. */ private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue; /** Queue of completed stream element queue entries. */ private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue; /** First (chronologically oldest) uncompleted set of stream element queue entries. */ private Set<StreamElementQueueEntry<?>> firstSet; // Last (chronologically youngest) uncompleted set of stream element queue entries. New // stream element queue entries are inserted into this set. private Set<StreamElementQueueEntry<?>> lastSet; @Override public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { if (numberEntries < capacity) { addEntry(streamElementQueueEntry); return true; } else { return false; } } finally { lock.unlock(); } } @Override public AsyncResult poll() throws InterruptedException { lock.lockInterruptibly(); try { //等待 completedQueue 中的元素 while (completedQueue.isEmpty()) { hasCompletedEntries.await(); } numberEntries--; notFull.signalAll(); return completedQueue.poll(); } finally { lock.unlock(); } } //异步请求完成的回调 public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException { lock.lockInterruptibly(); try { //如果完成的异步请求在 firstSet 中,那么就将 firstSet 中已完成的异步请求转移到 completedQueue 中 if (firstSet.remove(streamElementQueueEntry)) { completedQueue.offer(streamElementQueueEntry); while (firstSet.isEmpty() && firstSet != lastSet) { //如果firset中所有的异步请求都完成了,那么就从 uncompletedQueue 获取下一个集合作为 firstSet firstSet = uncompletedQueue.poll(); Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator(); while (it.hasNext()) { StreamElementQueueEntry<?> bufferEntry = it.next(); if (bufferEntry.isDone()) { completedQueue.offer(bufferEntry); it.remove(); } } } hasCompletedEntries.signalAll(); } } finally { lock.unlock(); } } private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) { assert(lock.isHeldByCurrentThread()); if (streamElementQueueEntry.isWatermark()) { //如果是watermark,就要构造一个只包含这个 watermark 的 set 加入到 uncompletedQueue 队列中 lastSet = new HashSet<>(capacity); if (firstSet.isEmpty()) { firstSet.add(streamElementQueueEntry); } else { Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1); watermarkSet.add(streamElementQueueEntry); uncompletedQueue.offer(watermarkSet); } uncompletedQueue.offer(lastSet); } else { //正常记录,加入lastSet中 lastSet.add(streamElementQueueEntry); } //设置异步请求完成后的回调 streamElementQueueEntry.onComplete( (StreamElementQueueEntry<T> value) -> { try { onCompleteHandler(value); } catch (InterruptedException e) { } catch (Throwable t) { operatorActions.failOperator(new Exception("Could not complete the " + "stream element queue entry: " + value + '.', t)); } }, executor); numberEntries++; } }
容错
在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态总取出这些消息,再重新处理一遍。为了保证 exactly-once 特性,对于异步调用已经完成,且结果已经由 emitter 提交给下游的消息就无需保存在快照中。
public class AsyncWaitOperator<IN, OUT> extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT>, OperatorActions { /** Recovered input stream elements. */ private transient ListState<StreamElement> recoveredStreamElements; @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); recoveredStreamElements = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); } @Override public void open() throws Exception { super.open(); //...... // 状态恢复的时候,从状态中取出所有为完成的消息,重新处理一遍 if (recoveredStreamElements != null) { for (StreamElement element : recoveredStreamElements.get()) { if (element.isRecord()) { processElement(element.<IN>asRecord()); } else if (element.isWatermark()) { processWatermark(element.asWatermark()); } else if (element.isLatencyMarker()) { processLatencyMarker(element.asLatencyMarker()); } else { throw new IllegalStateException("Unknown record type " + element.getClass() + " encountered while opening the operator."); } } recoveredStreamElements = null; } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); //先清除状态 ListState<StreamElement> partitionableState = getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer)); partitionableState.clear(); //将所有未完成处理请求对应的消息加入状态中 Collection<StreamElementQueueEntry<?>> values = queue.values(); try { for (StreamElementQueueEntry<?> value : values) { partitionableState.add(value.getStreamElement()); } // add the pending stream element queue entry if the stream element queue is currently full if (pendingStreamElementQueueEntry != null) { partitionableState.add(pendingStreamElementQueueEntry.getStreamElement()); } } catch (Exception e) { partitionableState.clear(); throw new Exception("Could not add stream element queue entries to operator state " + "backend of operator " + getOperatorName() + '.', e); } } }
小结
在需要和外部系统进行交互的场景下,Flink 的 Async I/O 机制可以有效地降低延迟并提高吞吐率。本文对 Async I/O 的基本实现原理进行了介绍。
参考
小结
在需要和外部系统进行交互的场景下,Flink 的 Async I/O 机制可以有效地降低延迟并提高吞吐率。本文对 Async I/O 的基本实现原理进行了介绍。