​前言介绍

Flink的检查点和恢复机制定期的会保存应用程序状态的一致性检查点。在故障的情况下,应用程序的状态将会从最近一次完成的检查点恢复,并继续处理。尽管如此,可以使用检查点来重置应用程序的状态无法完全达到令人满意的一致性保证。相反,source和sink的连接器需要和Flink的检查点和恢复机制进行集成才能提供有意义的一致性保证。

为了给应用程序提供恰好处理一次语义的状态一致性保证,应用程序的source连接器需要能够将source的读位置重置到之前保存的检查点位置。当处理一次检查点时,source操作符将会把source的读位置持久化,并在恢复的时候从这些读位置开始重新读取。支持读位置的检查点的source连接器一般来说是基于文件的存储系统,如:文件流或者Kafka source(检查点会持久化某个正在消费的topic的读偏移量)。如果一个应用程序从一个无法存储和重置读位置的source连接器摄入数据,那么当任务出现故障的时候,数据就会丢失。也就是说我们只能提供at-most-once)的一致性保证。

Fink的检查点和恢复机制和可以重置读位置的source连接器结合使用,可以保证应用程序不会丢失任何数据。尽管如此,应用程序可能会发出两次计算结果,因为从上一次检查点恢复的应用程序所计算的结果将会被重新发送一次(一些结果已经发送出去了,这时任务故障,然后从上一次检查点恢复,这些结果将被重新计算一次然后发送出去)。所以,可重置读位置的source和Flink的恢复机制不足以提供端到端的恰好处理一次语义,即使应用程序的状态是恰好处理一次一致性级别。

Flink 中的一个大的特性就是exactly-once的特性,我们在一般的流处理程序中,会有三种处理语义

  • AT-MOST-ONCE(最多一次)当故障发生的时候,什么都不干。就是说每条消息就只消费一次。

  • AT-LEAST-ONCE(至少一次)为了确保数据不丢失,确保每个时间都得到处理,一些时间可能会被处理多次。

  • EXACTLY-ONCE(精确一次)每个时间都精确处理一次

端到端的保证:

  • 内部保证--- checkpoint

  • source端---可重设数据的读取位置

  • sink端---从故障恢复时,数据不会重复写入外部系统

Flink(checkpoint)和source端(Kafka)可以保证不出问题。但一个志在提供端到端恰好处理一次语义一致性的应用程序需要特殊的sink连接器。sink连接器可以在不同的情况下使用两种技术来达到恰好处理一次一致性语义:幂等性写入和事务性写入。

幂等与事务

1. 幂等性写入

一个幂等操作无论执行多少次都会返回同样的结果。例如,重复的向hashmap中插入同样的key-value对就是幂等操作,因为头一次插入操作之后所有的插入操作都不会改变这个hashmap,因为hashmap已经包含这个key-value对了。另一方面,append操作就不是幂等操作了,因为多次append同一个元素将会导致列表每次都会添加一个元素。在流处理程序中,幂等写入操作是很有意思的,因为幂等写入操作可以执行多次但不改变结果。所以它们可以在某种程度上缓和Flink检查点机制带来的重播计算结果的效应。

需要注意的是,依赖于幂等性sink来达到exactly-once语义的应用程序,必须保证在从检查点恢复以后,它将会覆盖之前已经写入的结果。例如,一个包含有sink操作的应用在sink到一个key-value存储时必须保证它能够确定的计算出将要更新的key值。同时,从Flink程序sink到的key-value存储中读取数据的应用,在Flink从检查点恢复的过程中,可能会看到不想看到的结果。当重播开始时,之前已经发出的计算结果可能会被更早的结果所覆盖(因为在恢复过程中)。所以,一个消费Flink程序输出数据的应用,可能会观察到时间回退,例如读到了比之前小的计数。也就是说,当流处理程序处于恢复过程中时,流处理程序的结果将处于不稳定的状态,因为一些结果被覆盖掉,而另一些结果还没有被覆盖。一旦重播完成,也就是说应用程序已经通过了之前出故障的点,结果将会继续保持一致性。

2. 事务性写入

实现端到端的恰好处理一次一致性语义的方法基于事务性写入。其思想是只将最近一次成功保存的检查点之前的计算结果写入到外部系统中去。这样就保证了在任务故障的情况下,端到端恰好处理一次语义。应用将被重置到最近一次的检查点,而在这个检查点之后并没有向外部系统发出任何计算结果。通过只有当检查点保存完成以后再写入数据这种方法,事务性的方法将不会遭受幂等性写入所遭受的重播不一致的问题。尽管如此,事务性写入却带来了延迟,因为只有在检查点完成以后,我们才能看到计算结果。

Flink提供了两种构建模块来实现事务性sink连接器:write-ahead-log(WAL,预写式日志)sink和两阶段提交sink。WAL式sink将会把所有计算结果写入到应用程序的状态中,等接到检查点完成的通知,才会将计算结果发送到sink系统。因为sink操作会把数据都缓存在状态后段,所以WAL可以使用在任何外部sink系统上。尽管如此,WAL还是无法提供刀枪不入的恰好处理一次语义的保证,再加上由于要缓存数据带来的状态后段的状态大小的问题,WAL模型并不十分完美。

与之形成对比的,2PC sink需要sink系统提供事务的支持或者可以模拟出事务特性的模块。对于每一个检查点,sink开始一个事务,然后将所有的接收到的数据都添加到事务中,并将这些数据写入到sink系统,但并没有提交(commit)它们。当事务接收到检查点完成的通知时,事务将被commit,数据将被真正的写入sink系统。这项机制主要依赖于一次sink可以在检查点完成之前开始事务,并在应用程序从一次故障中恢复以后再commit的能力。

2PC协议依赖于Flink的检查点机制。检查点屏障是开始一个新的事务的通知,所有操作符自己的检查点成功的通知是它们可以commit的投票,而作业管理器通知一个检查点成功的消息是commit事务的指令。于WAL sink形成对比的是,2PC sinks依赖于sink系统和sink本身的实现可以实现恰好处理一次语义。更多的,2PC sink不断的将数据写入到sink系统中,而WAL写模型就会有之前所述的问题。

事务写的方式能提供端到端的Exactly-Once一致性,它的代价也是非常明显的,就是牺牲了延迟。输出数据不再是实时写入到外部系统,而是分批次地提交。目前来说,没有完美的故障恢复和Exactly-Once保障机制,对于开发者来说,需要在不同需求之间权衡

开发理解

3. 开发过程的理解

3.1 Flink如何管理Kafka consumer offsets

checkpoint是Flink的内部机制,可以从故障中恢复。通俗的理解是checkpoint是Flink应用程序状态的一致性副本,包括输入的读取位置(offset)。如果发生故障,Flink将通过从checkpoint加载状态后端并从恢复的读取位置继续恢复应用程序,可以做到所谓的断点续传。

checkpoint使Flink具有容错能力,并确保在发生故障时具有容错的能力。应用程序可以定期触发检查点。

Flink中的Kafka消费者将Flink的检查点机制与有状态运算符集成在一起,其状态是所有Kafka分区中的读取偏移量。触发checkpoint时,每个分区的偏移量都存储在checkpoint中。Flink的checkpoint机制确保所有操作员任务的存储状态是一致的,即它们基于相同的输入数据。当所有操作员任务成功存储其状态时,检查点完成。因此,当从潜在的系统故障重新启动时,系统提供一次性状态更新保证。

3.2 两阶段提交

在分布式系统中,为了让每个节点都能够感知到其他节点的事务执行状况,需要引入一个中心节点来统一处理所有节点的执行逻辑,这个中心节点叫做协调者(coordinator),被中心节点调度的其他业务节点叫做参与者(participant)。

接下来正式介绍2PC。顾名思义,2PC将分布式事务分成了两个阶段,两个阶段分别为提交请求和提交。协调者根据参与者的响应来决定是否需要真正地执行事务

提交请求阶段

  • 协调者向所有参与者发送prepare请求与事务内容,询问是否可以准备事务提交,并等待参与者的响应。

  • 参与者执行事务中包含的操作,并记录undo日志(用于回滚)和redo日志(用于重放),但不真正提交。

  • 参与者向协调者返回事务操作的执行结果,执行成功返回yes,否则返回no

提交执行阶段

分为成功与失败两种情况。

若所有参与者都返回yes,说明事务可以提交:

  • 协调者向所有参与者发送commit请求。

  • 参与者收到commit请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回ack。

  • 协调者收到所有参与者的ack消息,事务成功完成。

若有参与者返回no或者超时未返回,说明事务中断,需要回滚:

  • 协调者向所有参与者发送rollback请求

  • 参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack

  • 协调者收到所有参与者的ack消息,事务回滚完成

3.3图解

通俗的理解过程:

  • 从kafka获取数据源:(1,2,3)这个时候数据1会经过source,热后进行到算子阶段,jobmanager触发checkpoint操作,这个时候offset会记录source消费的位置

  • 后续数据会继续跟进,1的数据进行算子操作(比如sum),这个时候数据继续往下走进行sink,jobmanager也会触发记录算子的状态

  • 数据1走进sink阶段,然后sink值写到一个临时文件中去,这个时候也会记录sink的状态,然后发送到kafka去(预提交),这个时候不会正真地去消费使用,因为不确定两边是否同步,这个时候会告诉flink这边已经有了,这个时候会正在的进行提交(第二阶段提交)

  • 这个时候整个checkpoint过程才算完整,如果某个过程失败,也即checkpoint过程不完整(jobmanager会丢弃),flink重启后会记录offset继续消费即可

按照术语来讲的话,这样理解:每当需要做checkpoint时,JobManager就在数据流中打入一个屏障(barrier),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端(state BackEnd)的动作。当屏障到达Kafka sink后,触发preCommit(实际上是KafkaProducer.flush())方法刷写消息数据,但还未真正提交。接下来还是需要通过检查点来触发提交阶段。

最后总结

  • souce:使用执行ExactlyOnce的数据源,比如kafka等

  • 内部使用FlinkKafakConsumer,并开启CheckPoint,偏移量会保存到StateBackend中,并且默认会将偏移量写入到topic中去,即_consumer_offsets Flink设置CheckepointingModel.EXACTLY_ONCE

  • sink:

    存储系统支持覆盖也即幂等性:如Redis,Hbase,ES等

    存储系统不支持覆:需要支持事务(预写式日志或者两阶段提交),两阶段提交可参考Flink集成的kafka sink的实现

微信公众号:大数据左右手

人要去的地方,除了远方,还有未来
欢迎关注我,一起学习,一起进步!