大家好,我是小寒~
原文链接: Flink的状态管理
在上一篇文章,我们分享了 FLink 中时间和窗口的相关技术细节,今天我们来分享一下 FLink 中的状态管理相关的内容。
状态管理
状态在 FLink 中叫作 State,用来保存中间计算结果或者缓存数据。
对于流计算而言,事件待续不断地产生,如果每次计算都是相互独立的,不依赖于上下游的事件,则是无状态计算。 如果计算常要依赖于之前或者后续的事件,则是有状态的计算。 State 是实现有状态计算下的 Exactly-Once 的基础。
Flink 中的 State 提供了对状态的操作接口,向上对接 Flink DataStream Api,让用户在开发 Flink 应用的时候,可以将临时数据保存在 State 中 和 从 State 中读取数据。在运行的时候,与算子、Function 体系融合,自动对 State 进行备份(CheckPoint),一旦出现异常,能够从保存的 State 中恢复状态,实现 Exactly-Once。
状态的类型
按照数据结构的不同,Flink 中定义了多种 State ,应用于不同的场景。
- ValueState < T >
即类型为 T 的单值状态。这个状态与对应的 key 绑定,是最简单的状态了。它可以通过 update 方法更新状态值,通过 value() 方法获取状态值。
- ListState < T >
即 key 上的状态值为一个列表。可以通过 add 方法往列表中添加值;也可以通过 get() 方法返回一个 Iterable 来遍历状态值。
- ReducingState < T >
这种状态通过用户传入的 reduceFunction,每次调用 add 方法添加值的时候,会调用 reduceFunction,最后合并到一个单一的状态值。
- MapState<UK, UV>
使用 Map 存储 Key- Value 类型的数据 ,通过 put() 或 putAll() 方法添加元素,通过 get() 方法来获取元素。
- AggregatingState < IN,OUT>
聚合State,和 ReducingState 不同的是,这里聚合的类型可以是不同的元素类型,使用 add( IN ) 来加入元素,并使用 AggregateFunction 函数计算聚合结果。
KeyedState 和 OperatorState
State 按照是否有 Key 划分为 KeyedState 和 OperatorState 两种。
1、KeyedState
-
状态跟特定的 key 绑定。
-
keyStream 流上的每一个 key 对应一个 state 对象。若一个 operator 实例处理多个 key,访问相应的多个 State。
-
keyedState 保存在 StateBackend 中
-
通过 RuntimeContext 访问,实现 RichFunction 接口。
-
支持多种数据结构:ValueState、ListState、ReducingState、AggregatingState、MapState.
2、OperatorState
-
和 KeyedState 不同,OperatorState 和一个特定的算子绑定。整个算子只对应一个 state。
-
OperatorState 目前只支持 ListState 数据结构。
原始状态和托管状态
按照由 Flink 管理还是用户自行管理,状态可以分为原始状态 ( Raw State ) 和托管状态 (Managed State)。
1、原始状态
原始状态,即用户自定义的 State。Flink 在做快照的时候,把整个 State 当做一个整体,需要开发者自己管理,使用 byte 数组来读写状态内容。
2、托管状态
托管状态是由 Flink 框架管理的 State,如 ValueState、ListState 等,其序列化和反序列化由 Flink 框架提供支持,无需用户感知、干预。
通常在 DataStream 上的状态,推荐使用托管状态,一般情况下,在实现自定义算子时,才会使用到原始状态。
广播状态
广播状态在 Flink 中叫作 BroadcastState,在广播状态模式中使用。
所谓的广播状态模式,就是来自一个流的数据需要被广播到所有下游任务,在算子本地存储,在处理另一个流的时候依赖于广播的数据。下面以一个示例来说明广播状态模式,如图所示。
如图所示,业务数据流是一个普通的数据流,规则数据流是广播数据流,这样就可以满足实时性、规则更新的要求。
规则算子将规则缓存在本地内存中,在业务数据流记录到来时,能够使用规则处理数据。
广播 State 必须是 MapState 类型,广播状态模式需要使用广播函数进行处理,广播函数提供了处理广播数据流和普通数据流的接口。
状态储存
Flink 无论是哪种类型的 State,都需要被持久化到可靠存储中,才具备应用级的容错能力。State 的存储在 Flink 中叫作 StateBackend。StateBackend 需要具备如下两种能力。
-
在计算过程中提供访问 State 的能力,开发者在编写业务逻辑中能够使用 StateBackend 的接口读写数据。
-
能够将 State 持久化到外部存储,提供容错能力。
根据使用场景的不同, Flink 内置了 3 种 StateBackend 。其体系结构如下图所示。
- 纯内存:MemoryStateBackend,适用于验证、测试,不推荐生产环境。
- 内存+文件:FsStateBackend,适用于长周期大规模的数据。
- RocksDB:RocksDBStateBackend,适用于长周期大规模的数据。
上面提到的 StateBackend 是面向用户的,那么在 Flink 内部 3 种 State 的关系如下图所示。
在运行时,MemoryStateBackend 和 FsStateBackend 本地的 State 都保存在 TaskManager 的内存中,所以其底层依赖于 HeapKeyedStateBackend。HeapKeyedStateBackend 面向 Flink 引擎内部,使用者无须感知。
内存型和文件型状态存储
1、内存型状态存储
MemoryStateBackend 运行时所需要的 State 数据保存在 TaskManager JVM 堆上内存中,KV 类型的State,窗口算子的 State 使用 HashTable 来保存数据、触发器等。执行检查点的时候,会把 State 的快照数据保存到 JobManager 进程的内存中。 MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。
基于内存的 StateBackend 在生产环境下不建议使用,可以在本地开发调试测试 。
注意点如下 :
- State 存储在 JobManager 的内存中,受限于 JobManager 的内存大小。
- 每个 State 默认 5MB,可通过 MemoryStateBackend 构造函数调整。
- 每个 State 不能超过 Akka Frame 大小。
2、文件型状态存储
文件型状态存储 FSStateBackend,运行时所需的 State 数据全部保存在 TaskManager 的内存中, 执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中,如使用 HDFS 的路径为 “hdfs://namenode:40010/flink/checkpoints”,使用本地文件系统的路径为:“file:///data/flink/checkpoints”。
FSStateBackend 适用于处理大状态、长窗口,或大键值状态的有状态处理任务。
下面有几点需要注意一下
- State 数据首先会被存在 TaskManager 内存中。
- State 大小不能超过 TaskManager 内存。
- TaskManager 异步将 State 数据写入外部存储结构。
3、内存型和文件型状态存储
内存型和文件型 StateBackend 都依赖与 HeapKeyedStateBackend,HeapKeyedStateBackend 使用 StateTable 存储数据。
StateTable 体系如下图所示。
NestedMapsStateTable 使用两层嵌套的 HashMap 保存状态数据,支持同步快照。CopyOnWriteStateTable 使用 CopyOnWriteStateMap 来保存状态数据,支持异步快照,可以避免在保存快照的过程中持续写入导致不一致的问题。
基于 RocksDB 的 StateBackend
RocksDBStateBackend 跟内存型和文件型 StateBackend 不同,其使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中,不会受限于 TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中,在 JobManager 内存中会存储少量的检查点元数据。RocksDB 克服了 State 受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
但是 RocksDBStateBackend 相比基于内存的 StateBackcnd ,访问 State 的成本高很多,可能导致数据流的吞吐量剧烈下降,甚至可能降低为原来的 1/10。
- 适用场景:
- 最适合用于处理大状态、长窗口,或大键值状态的有状态处理任务。
- RocksDBStateBackend 非常适合用于高可用方案。
- RocksDBStateBackend 是目前唯一支持增量检查点的后端,增量检查点非常适用于超大状态的场景。
- 注意点
- 总 State 大小仅限于磁盘大小,不受内存限制。
- RocksDBStateBackend 也需要配置外部文件系统,集中保存 State。
- RocksDB的 JNI API 基于 byte数组,单 key 和单 Value 的大小不能超过 231 字节。
- 对于使用具有合并操作状态的应用程序,如 ListState ,随着时间可能会累积到超过 231 字节大小,这将会导致在接下来的查询中失败。
状态持久化
StateBackend 中的数据最终需要持久化到第三方存储中,确保集群故障或者作业故障能够恢复。
HeapSnapshotStrategy 策略对应于 HeapKeyedStateBackend,RocksDBStateBackend 的持久化策略有两种:全量持久化策略(RocksFullSnapshotStrategy)和 增量持久化策略 (RocksIncementalSnapshotStrategy)。
-
全量持久化策略
全盘持久化,也就是说每次把全量的 Slate 写人到状态存储中 (如 HDFS)。内存型、文件型、 RocksDB 类型的 StatcBackend 支持全量持久化策略。
在执行持久化策略的时候,使用异步机制,每个算子启动 1 个独立的线程,将自身的状态写入分布式存储中。在做持久化的过程中,状态可能会被持续修改,基于内存的状态后端使用 CopyOnWriteStateTable 来保证线程安全,RocksDBStateBackend 则使用 RocksDB 的快照机制,使用快照来保证线程安全。
-
增量持久化策略
增量持久化就是每次持久化增量的 State,只有 RocksDBStateBackend 支持增量持久化。Flink 增量式的检查点以 RocksDB 为基础, RocksDB 是一个基于 LSM-Tree 的 KV 存储。新的数据保存在内存中, 称为 memtable。如果 Key 相同,后到的数据将覆盖之前的数据,一旦 memtable 写满了,RocksDB 就会将数据压缩并写入磁盘。memtable 的数据持久化到磁盘后,就变成了不可变的 sstable。
因为 sstable 是不可变的,Flink 对比前一个检查点创建和删除的 RocksDB sstable 文件就可以计算出状态有哪些发生改变。
为了确保 sstable 是不可变的,**Flink 会在 RocksDB 触发刷新操作,强制将 memtable 刷新到磁盘上 。在 Flink 执行检查点时,会将新的 sstable 持久化到 HDFS 中,同时保留引用。这个过程中 Flink 并不会持久化本地所有的 sstable,因为本地的一部分历史 sstable 在之前的检查点中已经持久化到存储中了,只需增加对 sstable 文件的引用次数就可以。
RocksDB 会在后台合并 sstable 并删除其中重复的数据。然后在 RocksDB 删除原来的 sstable,替换成新合成的 sstable.。新的 sstable 包含了被删除的 sstable中的信息,通过合并历史的 sstable 会合并成一个新的 sstable,并删除这些历史sstable。可以减少检查点的历史文件,避免大量小文件的产生。
状态重分布
在实际的生产环绕中,作业预先设置的并行度很多时候并不合理,太多则浪费资源,太少则资源不足,可能导致数据积压延迟变大或者处理时间太长,所以在运维过程中,需要根据作业的运行监控数据调整其并行度。调整并行度的关键是处理 State。回想一下前文中的内容,State 位于算子内,改变了并行度,则意味着算子个数改变了,需要将 State 重新分配给算子。下面从 OperatorState 和 KeyedState 两种 State 角度,介绍如何将 State 重新分配给算子。
OperatorState 重分布
1、ListState
并行度在改变的时候,会将并发上的每个 List 都取出,然后把这些 List 合并到一个新的 List,根据元素的个数均匀分配给新的 Task。
2、UnionListState
比 ListState 更加灵活, 把划分的方式交给用户去做,当改变并发的时候,会将原来的 List 拼接起来,然后不做划分,直接交给用户。
3、BroadcastState
操作 BroadcastState 的 UDF 需要保证不可变性,所以各个算子的同一个 BroadcastState 完全一样。在改变并发的时候,把这些数据分发到新的 Task 即可。
KeyedState 重分布
基于 Key-Group ,每个 Key 隶属于唯一的 Key-Group。Key Group 分配给 Task 实例,每个 Task 至少有 一个 Key-Group 。
Key-Group 数量取决于最大并行度 (MaxParallism) 。 KeyedStream 并发的上限是 Key-Group 的数量,等于最大并行度。
状态过期
1、DataStream 中状态过期
可以对 每一个 State 设置 清理策略 StateTtlConfig,可以设置的内容如下:
- 过期时间:超过多长时间未访问,视为 State 过期,类似于缓存。
- 过期时间更新策略:创建和写时更新、读取和写时更新。
- State 可见性:未清理可用,超时则不可用。
2、Flink SQL 中状态过期
Flink SQL 在流 Join、聚合类的场景中,使用了 State,如果 State 不定时清理。 则可能会导致 State 过多,内存溢出。
为了稳妥起见,最好为每个 FLink SQL 作业提供 State 清理的策略。如果定时清理 State,则存在可能因为 State 被清理而导致计算结果不完全准确的风险。FLink 的 Table API 和 SQL 接口中提供了参数设置选项,能够让使用者在精确和资源消耗做折中。
StreamQueryConfig qConfig = ...
//设置过期时间为 min = 12 小时 ,max = 24 小时
qConfig.withIdleStateRetentionTime(Time.hours(12),Time.hours(24));