状态 State 是 Flink 中用于构建复杂逻辑的重要组件。本文将从源码角度来分析 Flink 的状态管理是如何实现的。状态 (State)需要配合检查点(Checkpoint)机制来保证 Flink 作业失败后能正确地进行错误恢复。由于Flink的 State 和 Checkpoint 机制相对比较复杂,这篇文章主要关注状态的管理,下一篇文章再结合 checkpoint 进行分析。

State 概述
关于 State 的使用,Flink 的官方文档提供了详细的使用指导,也可以参考我之前的文章Flink 的状态管理和检查点机制。这里简单地做一下概括性的介绍。

Keyed State 和 Operator State
Flink 中的状态分为两类,Keyed State 和 Operator State 。 Keyed State 是和具体的 Key 相绑定的,只能在 KeyedStream 上的函数和算子中使用。 Opeartor State 则是和 Operator 的一个特定的并行实例相绑定的,例如 Kafka Connector 中,每一个并行的 Kafka Consumer 都在 Operator State 中维护当前 Consumer 订阅的 partiton 和 offset。由于 Flink 中的 keyBy 操作保证了每一个键相关联的所有消息都会送给下游算子的同一个并行实例处理,因此 Keyed State 也可以看作是 Operator State 的一种分区(partitioned)形式,每一个 key 都关联一个状态分区(state-partition)。

从另一个角度来看,无论 Operator State 还是 Keyed State,都有两种形式,Managed State 和 Raw State。 Managed State 的数据结构由 Flink 进行托管,而 Raw State 的数据结构对 Flink 是透明的。 Flink 的建议是尽量使用 Managed State, 这样 Flink 可以在并行度改变等情况下重新分布状态,并且可以更好地进行内存管理。

StateBackend 定义了状态是如何存储的,不同的 State Backedn 会采用不同的方式来存储状态,目前 Flink 提供了三种不同形式的存储后端,分别是 MemoryStateBackend, FsStateBackend 和 RocksDBStateBackend。

使用方法
在自定义函数或算子中使用状态,大致有一下几种方式:

CheckpointedFunction 接口
CheckpointedFunction 接口是一个较为通用的接口,既可以管理 Operator State, 也可以管理 Keyed State。

 //在创建检查点的时候调用
void snapshotState(FunctionSnapshotContext context) throws Exception;

//在初始化的时候调用 (在从检查点恢复状态的时候也会先调用该方法)
//通过 FunctionInitializationContext 可以访问到 OperatorStateStore 和 KeyedStateStore,
// 通过 OperatorStateStore 获取 Operator State
// 通过 KeyedStateStore 获取 Keyed State
void initializeState(FunctionInitializationContext context) throws Exception;

RuntimeContext
对于 Keyed State,通常都是通过 RuntimeContext 实例来获取,这通常需要在 rich functions 中才可以做到。 注意,使用 Keyed State 一定要在 KeyedStream 上进行操作。RuntimeContext 提供的获取状态的方法包括:

ValueState<T> getState(ValueStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);

ListCheckpointed 接口
使用 Operator State 的另一种更方便的形式是实现 ListCheckpointed 接口,该接口只能管理 List-Style 的状态,并且在状态恢复的时候会在 Operator 不同的并行实例之间均匀地分配状态。

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

StateBackend 接口
State Backend 决定了作业的状态及检查点是如何存储的。不同的状态存储后端会采用不同的方式来处理状态和检查点。例如,对于 Flink 内置的三种不同类型的状态存储后端,MemoryStateBackend 会将工作状态存储在 TaskManager 的内存中,将检查点存储在 JobManager 的内存中;FsStateBackend 会将工作状态存储在 TaskManager 的内存中,将检查点存储在文件系统中(通常是分布式文件系统);RocksDBStateBackend 则会把状态存储在 RocksDB 中,将检查点存储在文件系统中(类似 FsStateBackend)。

StateBackend 还负责创建 OperatorStateBackend 和 AbstractKeyedStateBackend, 分别负责存储 Operator State 和 Keyed State,以及在需要的时候生成对应的 Checkpoint。所以,实际上 StateBackend 可以看作是一个 Factory,由它创建的具体的 OperatorStateBackend 和 AbstractKeyedStateBackend 才负责实际的状态存储和检查点生成的工作。

StateBackend 的另一个主要作用是和检查点相关,负责为作业创建检查点的存储(检查点写入)以及根据一个检查点的 pointer 获得检查点的存储位置(检查点读取)。

interface StateBackend {
    //解析检查点的存储位置
    CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

    //创建检查点存储
    CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;

    //创建AbstractKeyedStateBackend,负责 keyed state 的存储和检查点
    <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
        Environment env,
        JobID jobID,
        String operatorIdentifier,
        TypeSerializer<K> keySerializer,
        int numberOfKeyGroups,
        KeyGroupRange keyGroupRange,
        TaskKvStateRegistry kvStateRegistry,
        TtlTimeProvider ttlTimeProvider,
        MetricGroup metricGroup,
        @Nonnull Collection<KeyedStateHandle> stateHandles,
        CloseableRegistry cancelStreamRegistry) throws Exception;

    //OperatorStateBackend,负责 operator state 的存储和检查点
    OperatorStateBackend createOperatorStateBackend(
        Environment env,
        String operatorIdentifier,
        @Nonnull Collection<OperatorStateHandle> stateHandles,
        CloseableRegistry cancelStreamRegistry) throws Exception;
}

状态的注册与获取
前面介绍如何使用状态的时候提到,通过 CheckpointedFunction 接口既可以获取 Operator State,也可以获取 Keyed State,这两类状态分别通过 OperatorStateStore 和 KeyedStateStore 这两个接口作为桥梁来进行管理。我们暂时先不关注 checkpoint 相关的功能,只关注状态的存储和获取。下面介绍它们的具体实现。

OperatorStateStore
OperatorStateStore 定义了用于创建和管理托管状态的方法,分别对应 ListState,union ListState 以及 BroadcastState 。 其中 ListState 和 Union ListState 的底层存储是一致的,只是在状态恢复的时候状态的分配模式不一致。

interface OperatorStateStore {
    <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;

    <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

    <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

    //......
}

OperatorStateBackend 接口继承了 OperatorStateStore 接口,其唯一的具体实现类为 DefaultOperatorStateBackend。

在 DefaultOperatorStateBackend 中,使用两个 Map 来存储已经注册的状态名和状态之间的映射关系,分别对应 ListState 和 BroadcastState,

class DefaultOperatorStateBackend {
    /**
     * Map for all registered operator states. Maps state name -> state
     */
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;

    /**
     * Map for all registered operator broadcast states. Maps state name -> state
     */
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
}

先来看下 ListState 的注册和获取:

class DefaultOperatorStateBackend {
    @Override
    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    @Override
    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
    }

    private <S> ListState<S> getListState(
            ListStateDescriptor<S> stateDescriptor,
            OperatorStateHandle.Mode mode) throws StateMigrationException {
        //...... cache related
        //获取状态
        PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredOperatorStates.get(name);

        if (null == partitionableListState) {
            //状态不存在,创建一个新的状态
            partitionableListState = new PartitionableListState<>(
                new RegisteredOperatorStateBackendMetaInfo<>(
                    name,
                    partitionStateSerializer,
                    mode));

            registeredOperatorStates.put(name, partitionableListState);
        } else {
            // has restored state; check compatibility of new state access
            checkStateNameAndMode(
                    partitionableListState.getStateMetaInfo().getName(),
                    name,
                    partitionableListState.getStateMetaInfo().getAssignmentMode(),
                    mode);

            RegisteredOperatorStateBackendMetaInfo<S> restoredPartitionableListStateMetaInfo =
                partitionableListState.getStateMetaInfo();

            // 状态已经存在,检查是否兼容
            // check compatibility to determine if new serializers are incompatible
            TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();

            TypeSerializerSchemaCompatibility<S> stateCompatibility =
                restoredPartitionableListStateMetaInfo.updatePartitionStateSerializer(newPartitionStateSerializer);
            if (stateCompatibility.isIncompatible()) {
                throw new StateMigrationException("The new state typeSerializer for operator state must not be incompatible.");
            }

            partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
        }

        accessedStatesByName.put(name, partitionableListState);
        return partitionableListState;
    }
}

去除掉缓存相关的代码,这里的逻辑非常清晰,就是对 Map<String, PartitionableListState<?>> 的插入和获取操作,PartitionableListState 是 ListState 的具体实现。Union ListState 和普通 ListState 在底层实现上的区别就在于元信息的不同。

BroadcastState 在 BroadcastStream 中使用,它的注册和获取流程同 ListState 基本一致,是在 Map<String, BackendWritableBroadcastState> 上进行的操作, BackendWritableBroadcastState 是 BroadcastState 的具体实现。 具体流程不再赘述。

KeyedStateStore
KeyedStateStore 定义了用于创建和管理托管 keyed state 的方法,分别对应 ValueState, ListState,ReducingState 以及 AggregatingState 以及 MapState。相比于 operator state, Keyed state 的管理要更复杂一些 KeyedStateStore 接口的具体实现是 DefaultKeyedStateStore,DefaultKeyedStateStore 拥有 KeyedStateBackend 的引用,所有的状态获取的方法实际上都由 KeyedStateBackend 来完成。

//DefaultKeyedStateStore
class DefaultKeyedStateStore implements KeyedStateStore {
    protected  <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return keyedStateBackend.getPartitionedState(
                VoidNamespace.INSTANCE,
                VoidNamespaceSerializer.INSTANCE,
                stateDescriptor);
    }
}

KeyedStateBackend 继承了 KeyedStateFactory 和 PriorityQueueSetFactory 接口。和 OperatorStateBackend 不同,KeyedStateBackend 有不同的实现,分别对应不同的状态存储后端。AbstractKeyedStateBackend 为 KeyedStateBackend 提供了基础的实现,是所有 KeyedStateBackend 的抽象父类。KeyedStateBackend 和 AbstractKeyedStateBackend 中一些重要的成员变量和方法如下:

interface KeyedStateBackend<K>
    extends KeyedStateFactory, PriorityQueueSetFactory, Disposable {
    void setCurrentKey(K newKey);
    K getCurrentKey();
    <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception;
    <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception;
}

abstract class AbstractKeyedStateBackend<K> implements
    KeyedStateBackend<K>,
    SnapshotStrategy<SnapshotResult<KeyedStateHandle>>,
    Closeable,
    CheckpointListener {
    /** So that we can give out state when the user uses the same key. */
    private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;

    /** Range of key-groups for which this backend is responsible. */
    protected final KeyGroupRange keyGroupRange;

    /** The key context for this backend. */
    protected final InternalKeyContext<K> keyContext;

    //......

    @Override
    public void setCurrentKey(K newKey) {
        notifyKeySelected(newKey);
        this.keyContext.setCurrentKey(newKey);
        this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups));
    }

    @Override
    @SuppressWarnings("unchecked")
    public <N, S extends State, V> S getOrCreateKeyedState(
            final TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, V> stateDescriptor) throws Exception {
        checkNotNull(namespaceSerializer, "Namespace serializer");
        checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
                "This operation cannot use partitioned state.");

        InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(executionConfig);
            }
            kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
                namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
            keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        return (S) kvState;
    }

    @Override
    public <N, S extends State> S getPartitionedState(
            final N namespace,
            final TypeSerializer<N> namespaceSerializer,
            final StateDescriptor<S, ?> stateDescriptor) throws Exception {

        checkNotNull(namespace, "Namespace");

        if (lastName != null && lastName.equals(stateDescriptor.getName())) {
            lastState.setCurrentNamespace(namespace);
            return (S) lastState;
        }

        InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
        if (previous != null) {
            lastState = previous;
            lastState.setCurrentNamespace(namespace);
            lastName = stateDescriptor.getName();
            return (S) previous;
        }

        final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
        final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;

        lastName = stateDescriptor.getName();
        lastState = kvState;
        kvState.setCurrentNamespace(namespace);

        return state;
    }
    // .....
}

可以看出来,在没有开启 TTL 设置的情况下, 状态的创建最终还是在 KeyedStateBackend#createInternalState 方法中,这个方法在 AbstractKeyedStateBackend 中没有提供实现,而是交由不同的状态存储后端自行实现。

注意到,在 KeyedStateBackend#getPartitionedState 方法中,除了 StateDescriptor 参数以外,还有两个参数分别为 namespace 和 namespace 类型的序列化器,而在 DefaultKeyedStateBackend 创建对象的时候,这两个值分别被设置为常量 VoidNamespace.INSTANCE 和 VoidNamespaceSerializer.INSTANCE。这个 namespace 的作用是什么呢?

实际上,通过引入 namespace,Flink 可以确保在不同的 namespace 下存在相同名称的状态,但它们的值确不用相同。也就是说,状态实际上是和(namespace, name)这两个值相对应的。它的主要应用场景是在窗口中,比如说,假如我需要在窗口中使用状态,这个状态是和具体的窗口相关联的,假如没有 namespace 的存在,我们要如何获取窗口间互相独立的状态呢?有了 namespace,把窗口作为namespace,这个问题自然迎刃而解了。注意,只有无法合并的窗口才可以这样使用,如果窗口可以合并(如session window),无法保证 namespace 的不变性,自然不能这样使用。

public abstract class AbstractPerWindowStateStore extends DefaultKeyedStateStore {

        // we have this in the base class even though it's not used in MergingKeyStore so that
        // we can always set it and ignore what actual implementation we have
        protected W window;

        public AbstractPerWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
            super(keyedStateBackend, executionConfig);
        }
}

public class PerWindowStateStore extends AbstractPerWindowStateStore {

        public PerWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
            super(keyedStateBackend, executionConfig);
        }

        @Override
        protected  <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
            //用窗口作为namespace
            return keyedStateBackend.getPartitionedState(
                window,
                windowSerializer,
                stateDescriptor);
        }
}

public class MergingWindowStateStore extends AbstractPerWindowStateStore {

        public MergingWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
            super(keyedStateBackend, executionConfig);
        }

        @Override
        public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
            throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
        }
        //.....
}

状态的具体实现
Operator State
对于 Operator State 而言,ListState 的具体实现是 PartitionableListState。Union ListState 和普通 ListState 在底层实现上的区别就在于元信息的不同。

每个 State 都有一个关联的元信息,RegisteredStateMetaInfoBase 是所有状态元信息的抽象父类,元信息中保存了状态的名称,状态的序列化器等信息。其中, RegisteredOperatorStateBackendMetaInfo 和 RegisteredBroadcastStateBackendMetaInfo 分别对应了这里 ListState 和 BroadcastState 的元信息,它们都有一个成员变量,OperatorStateHandle.Mode assignmentMode;,即任务恢复时状态的分配模式。对 ListState,其分配模式为 SPLIT_DISTRIBUTE;对 Union ListState,其分配模式为 UNION;对 BroadCastState,其分配模式为 BROADCAST。

先来看一看 PartitionableListState 的实现:

public final class PartitionableListState<S> implements ListState<S> {

    /**
     * Meta information of the state, including state name, assignment mode, and typeSerializer
     */
    private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;

    /**
     * The internal list the holds the elements of the state
     */
    private final ArrayList<S> internalList;

    /**
     * A typeSerializer that allows to perform deep copies of internalList
     */
    private final ArrayListSerializer<S> internalListCopySerializer;
}

从它的成员变量可以看出来,对于 Operator ListState, 其内部就是一个 ArrayList。

同样,再来看看 HeapBroadcastState的实现:

public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K, V> {
    /**
     * Meta information of the state, including state name, assignment mode, and serializer.
     */
    private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;

    /**
     * The internal map the holds the elements of the state.
     */
    private final Map<K, V> backingMap;

    /**
     * A serializer that allows to perform deep copies of internal map state.
     */
    private final MapSerializer<K, V> internalMapCopySerializer;

}

所以从上面的分析可以看出来,对于所有的 Operator State,都是存储在 TaskManager 的堆内存中的,底层的实现分别对应了 ArrayList 和 HashMap。

Keyed State
Keyed State 根据底层存储的不同,对应了不同的数据结构和物理存储。 和 State 接口相对应,有一个 InternalKvState 接口对应状态的内部实现:State 接口及其子类对应公共 API,供用户代码调用;InternalKvState 及其子类对应内部的具体实现,由内部代码调用。

前面提到,内部状态创建的入口在 KeyedStateBackend#createInternalState 方法,这个方法在 AbstractKeyedStateBackend 中没有提供实现,而是交由不同的状态存储后端自行实现。我们就由该方法作为入口,一窥内部 State 的具体实现。

Heap Keyed State
HeapKeyedStateBackend 将 State 存放在 TaskManager 的堆内存中。

HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {

    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =
        Stream.of(
            Tuple2.of(ValueStateDescriptor.class, (StateFactory) HeapValueState::create),
            Tuple2.of(ListStateDescriptor.class, (StateFactory) HeapListState::create),
            Tuple2.of(MapStateDescriptor.class, (StateFactory) HeapMapState::create),
            Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) HeapAggregatingState::create),
            Tuple2.of(ReducingStateDescriptor.class, (StateFactory) HeapReducingState::create),
            Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
        ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));

    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;

        public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
        @Nonnull TypeSerializer<N> namespaceSerializer,
        @Nonnull StateDescriptor<S, SV> stateDesc,
        @Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s",
                stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        StateTable<K, N, SV> stateTable = tryRegisterStateTable(
            namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
        return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
    }

}

从上面的代码可以看出来,要创建一个 InternalKVState, 首先需要获得一个 StateTable, 然后通过 StateFactory 接口创建 InternalKVState。具体的状态的创建分别对应 HeapValueState::create, HeapListState::create, HeapMapState::create, HeapAggregatingState::create, HeapReducingState::create。

StateTable 和 InternalKVState 是什么关系呢?在 HeapXXXState 的公共父类 AbstractHeapState 中可以看出来,对于每一个基于堆内存的 State 的,其底层实际上就是一个 StateTable。

public abstract class StateTable<K, N, S> implements StateSnapshotRestore {
    protected final InternalKeyContext<K> keyContext;
    protected RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo;
    //......
}

StateTable 有三个类型参数,分别对应 key 的类型, namespace 的类型,以及 value 的类型。可以简单地把 StateTable 理解成 (key, namespace) -> Value 这样的存储形式。 StateTable 有两个具体的实现类,非别为 NestedMapsStateTable 和 CopyOnWriteStateTable。这两个类分别对应同步和异步模式checkpiont 的情况。其中 NestedMapsStateTable 对应同步 checkpoint 的情况,不支持异步快照;而 CopyOnWriteStateTable 对应异步 checkpoint 的情况。顾名思义,它的底层提供了 copy-on-write 结构,因而可以支持异步并发的操作。以较为简单的 NestedMapsStateTable 为例,可以看出来它是采用两层嵌套的 map 的结构,也就是我们所说的提供了 (key, namespace) -> Value 的映射关系。只有CopyOnWriteStateTable,由于提供了 copy-on-write 的支持,实现相对复杂一些,感兴趣的话可以仔细看下它的具体细节,这里就再不分析了。

public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
    private final Map<N, Map<K, S>>[] state;

    private final int keyGroupOffset;
}

接着我们看下 HeapKeyedStateBackend#tryRegisterStateTable 的逻辑:

class HeapKeyedStateBackend {
    private <N, V> StateTable<K, N, V> tryRegisterStateTable(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<?, V> stateDesc,
        @Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory) throws StateMigrationException {

        @SuppressWarnings("unchecked")
        StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());

        TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();

        if (stateTable != null) {
            RegisteredKeyValueStateBackendMetaInfo<N, V> restoredKvMetaInfo = stateTable.getMetaInfo();

            restoredKvMetaInfo.updateSnapshotTransformFactory(snapshotTransformFactory);

            TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
                restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
            if (namespaceCompatibility.isCompatibleAfterMigration() || namespaceCompatibility.isIncompatible()) {
                throw new StateMigrationException("For heap backends, the new namespace serializer must be compatible.");
            }

            restoredKvMetaInfo.checkStateMetaInfo(stateDesc);

            TypeSerializerSchemaCompatibility<V> stateCompatibility =
                restoredKvMetaInfo.updateStateSerializer(newStateSerializer);

            if (stateCompatibility.isIncompatible()) {
                throw new StateMigrationException("For heap backends, the new state serializer must not be incompatible.");
            }

            stateTable.setMetaInfo(restoredKvMetaInfo);
        } else {
            RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
                stateDesc.getType(),
                stateDesc.getName(),
                namespaceSerializer,
                newStateSerializer,
                snapshotTransformFactory);

            stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
            registeredKVStates.put(stateDesc.getName(), stateTable);
        }

        return stateTable;
    }
}

这一部分的代码逻辑主要还是基于 Map<String, StateTable<K, ?, ?>> registeredKVStates 的 get 和 put 操作来实现,根据 checkpoint 同步或异步配置,创建的 StateTable 分别为 NestedMapsStateTable 和 CopyOnWriteStateTable。

接着来看看 HeapXXXState 的具体实现。对于 HeapValueState<K, N, V>,很明显,其对应在 StataTable 中存储的就是状态的具体的值;对于 HeapListState<K, N, V>,对应的则是则是 List;对于 HeapListState<K, N, V>,对应的是 Map<v>。</v>

以上几种状态都只是简单的存储和获取操作,但是 HeapReducingState<K, N, V> 和 HeapAggregatingState<K, N, IN, ACC, OUT> 需要在以前的状态的基础上进行 reduce 和聚合操作。

class HeapReducingState<K, N, V>
    extends AbstractHeapMergingState<K, N, V, V, V>
    implements InternalReducingState<K, N, V> {
    private final ReduceTransformation<V> reduceTransformation;

    @Override
    public void add(V value) throws IOException {

        if (value == null) {
            clear();
            return;
        }

        try {
            stateTable.transform(currentNamespace, value, reduceTransformation);
        } catch (Exception e) {
            throw new IOException("Exception while applying ReduceFunction in reducing state", e);
        }
    }

    @Override
    protected V mergeState(V a, V b) throws Exception {
        return reduceTransformation.apply(a, b);
    }

    static final class ReduceTransformation<V> implements StateTransformationFunction<V, V> {

        private final ReduceFunction<V> reduceFunction;

        ReduceTransformation(ReduceFunction<V> reduceFunction) {
            this.reduceFunction = Preconditions.checkNotNull(reduceFunction);
        }

        @Override
        public V apply(V previousState, V value) throws Exception {
            return previousState != null ? reduceFunction.reduce(previousState, value) : value;
        }
    }

}

在上面 HeapReducingState 中,在加入新的值的时候,会调用 ReduceTransformation#apply 方法进行 reduce 操作。在 HeapAggregatingState 中也有类似的逻辑。

在 ReducingStateDescriptor 和 AggregatingStateDescriptor 中提供了对应的 ReduceFunction 和 AggregateFunction。

RocksDB keyed State
//TODO 对于 ROCKSDB 了解不多,后续再加以补充

小结
本文对 Flink 状态管理相关的代码逻辑进行简单的梳理。Flink 中涉及状态管理相关的代码量比较多,而且和 Checkpoint 相关的逻辑杂糅在一起,读起来比较费力。这篇文章主要关注的还是状态的管理,下一篇文章将侧重于 checkpoint 相关的逻辑。

转发自:https://blog.jrwang.me/2019/flink-source-code-state/