SQL 和关系代数在设计之初就针对的是静态的数据。静态数据是有界的,因此可以很容易地和表(关系)进行映射。但是对于一个不断变化的实时数据流而言,数据是无边界不断更新的,在将 SQL 应用在流上的时候,势必需要考虑数据的更新问题。例如,对与聚合操作而言,随着新数据源源不断地到达,聚合的结果必然是需要不断更新的。在这种情况下,目前包括 Flink、Calcite、Beam、 Kafka 等社区的开发人员一起在推动 Streaming SQL 的标准化,流和动态表的是这些工作的基础。

流和动态表

我们知道,从数据库的角度来看,一张表可以看作是一系列 Change log(INSERT、UPDATE、DELETE)聚合的结果。如果每一条 Change log 对应实时数据流中的一条消息,那么一张表和一个 Change log 的数据流就是可以互相转换的:

  • The aggregation of a stream of updates over time yields a table
  • The observation of changes to a table over time yield a stream

在这样一个基本概念的指导下,将 SQL 应用到流上就是完全可能的,其难点就在于如何处理计算结果的更新。为此,社区提出了动态表的概念,通过将计算结果转换为 INSERT、UPDATE、DELETE 这三种类型的消息来完成到动态表的映射。关于动态表的更细致的介绍,可以查看Flink官方的文档和这个演讲Foundations of Streaming SQL [Qcon London 2018],以获得更深入的了解。

动态表的实现

节点的输出类型

Flink SQL 在 StreamPhysicalRel 接口中,定义了一个关系表达式节点产生的消息类型:

trait StreamPhysicalRel extends FlinkPhysicalRel {
  /**
    * Whether the [[StreamPhysicalRel]] produces update and delete changes.
    */
  def producesUpdates: Boolean

  /**
    * Whether the [[StreamPhysicalRel]] produces retraction messages.
    */
  def producesRetractions: Boolean

  /**
    * Whether the [[StreamPhysicalRel]] requires retraction messages&nbs***bsp;not.
    */
  def needsUpdatesAsRetraction(input: RelNode): Boolean

  /**
    * Whether the [[StreamPhysicalRel]] consumes retraction messages instead of forwarding them.
    * The node might&nbs***bsp;might not produce new retraction messages.
    */
  def consumesRetractions: Boolean
}

其中,producesUpdates() 和 producesRetractions() 方法用于确定当前节点是否会产生 UPDATE 或 RETRACT 类型的消息,这两个方法比较容易理解。例如,对于 StreamExecJoin ,Join 操作在 outer join 的情况下会产生 RETRACT 和 UPDATE 消息;而 StreamExecGroupAggregate 只会产生 UPDATE 消息,但聚合操作不会产生 RETRACT 消息(即便这样,也并不意味着这个节点不会生成 DELETE 消息,因为 UPDATE 可能会被分解为 DELETE 和 INSERT,见下面的描述)。

needsUpdatesAsRetraction(input: RelNode) 这个方法是针对当前节点的子节点(上游节点)而言,这个方法的输入参数就是其子节点,其含义是当前节点是否需要其上游节点将 UPDATE 消息用 RETRACT 的形式发送(即,强制要求将 UPDATE 消息分解为 DELETE 和 INSERT 两条消息)。例如,对于 StreamExecGroupAggregate 聚合操作,就需要上游节点以 RETRACT 的形式发送消息,这样在上游更正计算结果时,当前已经聚合的结果才可以正常地进行更正(先从聚合结果中移除错误的记录,然后重新聚合);对于 StreamExecJoin 来说,情况要复杂一点,涉及到对唯一键的判断,下文再详述。

consumesRetractions() 则表明当前节点是否需要处理上游节点产生的 RETRACT 消息,而非简单地向下游进行转发。这和节点自身是否生成 RETRACT 消息无关。StreamExecGroupAggregate 聚合操作就需要处理 RETRACT 消息;而 StreamExecJoin 的该方法返回值则是 false,这表明 Join 操作本身并不需要处理 RETRACT,只单纯向下游转发 RETRACT 消息即可。

UpdateAsRetractionTrait 和 AccModeTrait

Calcite 中用 RelTrait 描述一个 RelNode 节点的性质,Flink SQL 中用两类特质来描述输出类型相关的性质,分别是 AccModeTrait 和 UpdateAsRetractionTrait。顾名思义,AccModeTrait 定义了一个节点的输出模式,UpdateAsRetractionTrait 决定了一个节点是否要将 UPDATE 以 RETRACT 的形式输出。

class UpdateAsRetractionTrait(updateAsRetraction: Boolean) extends RelTrait {
}

object UpdateAsRetractionTrait {
  def apply(updateAsRetraction: Boolean): UpdateAsRetractionTrait = {
    new UpdateAsRetractionTrait(updateAsRetraction)
  }

  //默认不需要作为 RETRACT 输出
  val DEFAULT = new UpdateAsRetractionTrait(false)
}

class AccModeTrait(accMode: AccMode) extends RelTrait {
}

/**
  * The [[AccMode]] determines how insert, update, and delete changes of tables are encoded
  * by the messages that an operator emits.
  */
object AccMode extends Enumeration {
    /**
    * unknown acc mode
    */
  val UNKNOWN = Value

  /**
    * An operator in [[Acc]] mode emits change messages as
    * [[org.apache.flink.table.dataformat.BaseRow]] which encode a data row with header info,
    * logically equivalent to (boolean, row).
    *
    * An operator in [[Acc]] mode may only produce update and delete messages, if the table has
    * a unique key and all key attributes are contained in the Row.
    *
    * Changes are encoded as follows:
    * - insert: (true, NewRow)
    * - update: (true, NewRow) // the Row includes the full unique key to identify the row to update
    * - delete: (false, OldRow) // the Row includes the full unique key to identify the row to delete
    */
  val Acc = Value

  /**
    * * An operator in [[AccRetract]] mode emits change messages as
    * [[org.apache.flink.table.dataformat.BaseRow]] which encode a data row with header info,
    * logically equivalent to (boolean, row).
    *
    * Changes are encoded as follows:
    * - insert: (true, NewRow)
    * - update: (false, OldRow), (true, NewRow) // updates are encoded in two messages!
    * - delete: (false, OldRow)
    */
  val AccRetract = Value
}

可以看到,在 AccMode.Acc 和 AccMode.AccRetract 这两种模式下,区别在于对 UPDATE 消息的编码不同。

在 StreamExecRetractionRules 中定义了三种规则用来生成 StreamPhysicalRel 中 retraction 相关的特质。其中 AssignDefaultRetractionRule 用来分配默认的特质,SetUpdatesAsRetractionRule 用来更新 UpdateAsRetractionTrait,SetAccModeRule 用来更新 AccModeTrait。

在 SetUpdatesAsRetractionRule 更新 UpdateAsRetractionTrait 的时候,要依赖在 StreamPhysicalRel 中定义的几个方法。一个子节点在以下两种情况下需要产生 RETRACT 消息:

  • 父节点要求子节点必须将 UPDATE 作为 RETRACT 发送,例如父节点是 StreamExecGroupAggregate
  • 通过传递性的,父节点的父节点要求将 UPDATE 作为 RETRACT 发送

而在 SetAccModeRule 中更新 UpdateAsRetractionTrait 为 AccMode.AccRetract,则按照这样的规则:

  • 当前节点已经包含了 SetUpdatesAsRetractionRule(true) 的特质
  • 当前节点的子节点是 AccMode.AccRetract,且当前节点转发 RETRACT 消息(consumesRetractions 方法返回 false)

按顺序应用上面的三条规则,就完成了一个 StreamPhysicalRel 中 retraction 相关的特质的填充。

判断唯一键

唯一键(UniqueKey)字段可以用来区分一个节点输出的消息是否具有唯一性,可以由多个字段组合而成。当然,一个节点也可能不包含唯一键。唯一键在流 SQL 中的意义在于,如果能够确定唯一键,那么在一些特定条件下,输出的时候可以有减少消息的数量(例如,可以直接生成 UPDATE 消息,而不需要先 DELETE 再 INSERT),或是有针对性地优化中间状态的存储(例如,流 Join 状态的存储,后续分析 Join 实现的时候会详细介绍)。

Flink SQL 中将唯一键作为元数据进行处理,主要是提供了一个 MetadataHandler<UniqueKeys> 的实现,即 FlinkRelMdUniqueKeys。 FlinkRelMdUniqueKeys 提供了获取一个 RelNode 的唯一键的方法,在需要查询唯一键时,通过 RelOptCluster.getMetadataQuery.getUniqueKeys(relNode) 进行查询即可。唯一键的判断在有些条件下比较复杂,目前 FlinkRelMdUniqueKeys 在参考 Calcite 自身提供的 RelMdUniqueKeys 的基础上进行了扩展,但应该也没有覆盖到全部的情况,我们简单地看一下。

对于 Aggregate 操作,唯一键的判断比较简单,就是 Group By 分组对应的字段:

class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.UniqueKeys] {
  def getUniqueKeys(
      rel: Aggregate,
      mq: RelMetadataQuery,
      ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
    getUniqueKeysOnAggregate(rel.getGroupSet.toArray, mq, ignoreNulls)
  }

  def getUniqueKeys(
      rel: StreamExecGroupAggregate,
      mq: RelMetadataQuery,
      ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
    getUniqueKeysOnAggregate(rel.grouping, mq, ignoreNulls)
  }

  def getUniqueKeysOnAggregate(
      grouping: Array[Int],
      mq: RelMetadataQuery,
      ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
    // group by keys form a unique key
    ImmutableSet.of(ImmutableBitSet.of(grouping.indices: _*))
  }
}

对于 Join 操作,则需要判断连接的左侧和右侧节点的唯一键情况,然后对左右的唯一键进行组合:

class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.UniqueKeys] {
  def getUniqueKeys(
      join: Join,
      mq: RelMetadataQuery,
      ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
    join.getJoinType match {
      case JoinRelType.SEMI | JoinRelType.ANTI =>
        // only return the unique keys from the LHS since a SEMI/ANTI join only
        // returns the LHS
        mq.getUniqueKeys(join.getLeft, ignoreNulls)
      case _ =>
        getJoinUniqueKeys(
          join.analyzeCondition(), join.getJoinType, join.getLeft, join.getRight, mq, ignoreNulls)
    }
  }

  private def getJoinUniqueKeys(
      joinInfo: JoinInfo,
      joinRelType: JoinRelType,
      left: RelNode,
      right: RelNode,
      mq: RelMetadataQuery,
      ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
    val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
    val rightUniqueKeys = mq.getUniqueKeys(right, ignoreNulls)
    getJoinUniqueKeys(
      joinInfo, joinRelType, left.getRowType, leftUniqueKeys, rightUniqueKeys,
      mq.areColumnsUnique(left, joinInfo.leftSet, ignoreNulls),
      mq.areColumnsUnique(right, joinInfo.rightSet, ignoreNulls),
      mq)
  }
}

对于 Project 或 Calc 操作,则首先需要从子节点中提取唯一键,然后确保所有唯一键都在输出列表中,且在这些字段上不能进行任何计算操作(只作为 RexInputRef 或 AS,因为无法确定其它映射的结果是否能保留唯一性)。

对于其它的情况,这里不再赘述,感兴趣的可以参考原始代码。

三类 Sink

在 Flink SQL 中,流表 Sink 对应的有三个接口 AppendStreamTableSink,RetractStreamTableSink 和 UpsertStreamTableSink。 AppendStreamTableSink 适用于查询结果只存在 APPEND 结果的情况,即不存在更新和撤回的消息;UpsertStreamTableSink 则适用于查询结果存在唯一键的情况,在这种情况下 UPDATE 结果不会被转换为 DELETE 和 INSERT 两条消息;RetractStreamTableSink 则是一种更通用的形式,但是 UPDATE 会被转换为 DELETE 和 INSERT 两条消息。需要注意的是,UpsertStreamTableSink 同样有可能接收到 DELETE 消息,例如在 Outer Join 的情况下,仍然会有 DELETE 消息产生,但是对于 Aggregate,则可以有效减少撤回的消息数量;UpsertStreamTableSink 要求查询结果必须存在唯一键,这个也不难理解,只有存在唯一键,才能确定是对已有结果和更新还是新插入的结果。

在 StreamExecSink 中,needsUpdatesAsRetraction(input: RelNode) 是根据 TableSink 的类型来确定的:


class StreamExecSink[T] {
  //如果是RetractStreamTableSink,则要求子节点必须以 RETRACT 形式发送消息
  override def needsUpdatesAsRetraction(input: RelNode): Boolean =
    sink.isInstanceOf[RetractStreamTableSink[_]]
}

在生成物理执行计划的过程中,会根据 Sink 类型进行检验和转换:

  • AppendStreamTableSink 要求查询结果是 APPEND 的,通过 UpdatingPlanChecker.isAppendOnly() 进行校验(遍历逻辑树)
  • UpsertStreamTableSink 要求查询结果存在唯一键
  • RetractStreamTableSink 是通用的,不需要校验

Table 转换为 Stream

在将 Table 转换为 Stream 的过程中,同样有三种形式,分别对应 StreamTableEnvironment.toAppendStream, StreamTableEnvironment.toRetractStream 和 StreamTableEnvironment.toUpsertStream 三个方法。

转换成 Stream 的操作会被转换为 OutputConversionModifyOperation,其包含三种模式,分别对应上面的三个方法:

public class OutputConversionModifyOperation implements ModifyOperation {
  /**
	 * Should the output type contain the change flag, and what should the
	 * flag represent (retraction&nbs***bsp;deletion).
	 */
	public enum UpdateMode {
		APPEND,
		RETRACT,
		UPSERT
	}
}

OutputConversionModifyOperation 被转换为 DataStreamTableSink,通过两个属性 updatesAsRetraction 和 withChangeFlag 来表征上面的三种模式:

class DataStreamTableSink[T](
    queryOperation: QueryOperation,
    outputType: TypeInformation[T],
    val updatesAsRetraction: Boolean, 
    val withChangeFlag: Boolean
  ) extends TableSink[T] {
}
对应的映射关系为:
abstract class PlannerBase {
  private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
    modifyOperation match {
      case outputConversion: OutputConversionModifyOperation =>
        val input = getRelBuilder.queryOperation(outputConversion.getChild).build()
        val (updatesAsRetraction, withChangeFlag) = outputConversion.getUpdateMode match {
          case UpdateMode.RETRACT => (true, true)
          case UpdateMode.APPEND => (false, false)
          case UpdateMode.UPSERT => (false, true)
        }
        val typeInfo = LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(outputConversion.getType)
        val tableSink = new DataStreamTableSink(
          outputConversion.getChild, typeInfo, updatesAsRetraction, withChangeFlag)
        LogicalSink.create(input, tableSink, "DataStreamTableSink")
    }
  }
}

UpdateAsRetractionTrait 的初始化

在 AssignDefaultRetractionRule 中,如果节点的 UpdateAsRetractionTrait 没有设置,那么会被设置为默认的 UpdateAsRetractionTrait(false),即不以 Retract 模式输出。如果用户要求将结果输出为 Retract Stream 或输出到 RetractTableSink,就需要初始化根节点为 UpdateAsRetractionTrait(true)。

在 FlinkUpdateAsRetractionTraitInitProgram 中会根据 StreamOptimizeContext.updateAsRetraction 来确定是否将根节点初始化为 UpdateAsRetractionTrait(true):


class FlinkUpdateAsRetractionTraitInitProgram extends FlinkOptimizeProgram[StreamOptimizeContext] {

  override def optimize(root: RelNode, context: StreamOptimizeContext): RelNode = {
    if (context.updateAsRetraction) {
      val newTraitSet = root.getTraitSet.plus(UpdateAsRetractionTrait(true))
      root.copy(newTraitSet, root.getInputs)
    } else {
      root
    }
  }
}

在 StreamCommonSubGraphBasedOptimizer 中会根据输出的要求来获取输出的模式,并进一步初始化 StreamOptimizeContext:

class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
  extends CommonSubGraphBasedOptimizer {
  
  override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
    val config = planner.getTableConfig
    // build RelNodeBlock plan
    val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, config)
    // infer updateAsRetraction property for sink block
    sinkBlocks.foreach { sinkBlock =>
      val retractionFromRoot = sinkBlock.outputNode match {
        case n: Sink =>
          n.sink match {
            case _: RetractStreamTableSink[_] => true
            case s: DataStreamTableSink[_] => s.updatesAsRetraction
            case _ => false
          }
        case o =>
          o.getTraitSet.getTrait(UpdateAsRetractionTraitDef.INSTANCE).sendsUpdatesAsRetractions
      }
      sinkBlock.setUpdateAsRetraction(retractionFromRoot)

      .......
    }
  }
}

物理执行计划

在生成的物理执行计划中,每个节点生成的记录以 BaseRow 的形式封装,可以根据 BaseRow 中封装的 header 信息获取一条记录对应的变更类型:

public final class BaseRowUtil {
	/**
	 * Indicates the row as an accumulate message.
	 */
	public static final byte ACCUMULATE_MSG = 0;

	/**
	 * Indicates the row as a retraction message.
	 */
	public static final byte RETRACT_MSG = 1;

	public static boolean isAccumulateMsg(BaseRow baseRow) {
		return baseRow.getHeader() == ACCUMULATE_MSG;
	}

	public static boolean isRetractMsg(BaseRow baseRow) {
		return baseRow.getHeader() == RETRACT_MSG;
	}
}

这样,每个节点可以根据从上游接收的记录的变更类型确定需要执行的操作;同样地,每个节点也可以设置对应的变更类型通知下游节点。在 StreamingJoinOperator 和 GroupAggFunction 中可以看到比较详细的对变更消息的处理过程,后面分析到 Join 操作和 Aggregate 的详细实现时会进行更细致的分析。

同时,一些操作也会根据 AccModeTrait 来决定生成的操作类型和提交的记录类型,例如在 StreamExecGroupAggregate 中会根据子节点的 AccModeTrait 确定是否需要为聚合函数生成 retract 方法,也会根据当前节点的 AccModeTrait 来决定在聚合结果发生更新是是否提交 RETRACT_MSG。具体的可参考 StreamExecGroupAggregate 的代码。

小结

本文简单介绍了流和动态表的映射关系,并对 Flink SQL 中动态表相关的一些实现做了介绍。动态表主要涉及到对计算结果和撤回和更新,这和具体的 SQL 操作有很大的关系,后面在介绍 Join、Aggregate 等算子的具体实现时还会进一步加以分析。

参考