在流计算中,时间属性承担了一个极其重要的作用,所有基于时间的操作,例如窗口操作,都需要正确获取时间信息。我们曾经在Flink 源码阅读笔记(12)- 时间、定时器和窗口这篇文章中分析过 Flink 内部时间属性、水位线等机制的具体实现。在这篇文章中,我们将介绍在 SQL 和 Table API 中时间属性相关的一些细节。
在 Flink SQL 中,表可以提供逻辑上的时间属性用于获取时间信息,时间属性可以是处理时间也可以是事件时间。在声明一张表的时候,时间属性可以在表的 schema 中定义。有些特定的操作,如窗口关联和窗口聚合操作必须基于时间属性字段,因而时间属性可以被看作一种特殊的字段类型;但是时间属性可以当作常规的时间戳字段来使用,一旦需要在计算中使用到时间属性,就需要“物化”(materialized)时间属性,时间属性字段就会被转换成一个常规的时间戳类型。被物化后的时间属性不再与 Flink 的时间系统和水位线相关联,因而也就不可以再应用在基于时间的操作中。
public enum TimestampKind { REGULAR, //常规的时间戳类型 ROWTIME, //事件时间 PROCTIME //处理时间 }
由于 Flink SQL 使用 Calcite 完成查询计划的优化,Flink 的所有逻辑类型在 Calcite 中都有对应的 RelDataType,并且为时间属性单独创建了一种新的 RelDataType,即 TimeIndicatorRelDataType:
class TimeIndicatorRelDataType( typeSystem: RelDataTypeSystem, originalType: BasicSqlType, val isEventTime: Boolean) //通过 isEventTime 区分是事件时间还是处理时间 extends BasicSqlType( typeSystem, originalType.getSqlTypeName, originalType.getPrecision) { }
DataStream 转换为 Table
在将 DataStream 转换成一个 Table 的过程中,可以用特殊的表达式来声明时间属性对应的列://处理时间 val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime) //事件时间 val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)
其中 'field.proctime 或这 'field.rowtime 即声明时间属性,表达式会被转换为 PROCTIME 和 ROWTIME 这两个内置函数的调用 UnresolvedCallExpression:
// Time definition /** * Declares a field as the rowtime attribute for indicating, accessing, and working in * Flink's event time. */ def rowtime: Expression = unresolvedCall(ROWTIME, expr) /** * Declares a field as the proctime attribute for indicating, accessing, and working in * Flink's processing time. */ def proctime: Expression = unresolvedCall(PROCTIME, expr)
非 Scala 环境下则通过 ExpressionParser 完成表达式的解析。
使用 TableSource
如果要在 TableSource 中定义时间属性,则需要 TableSource 实现 DefinedProctimeAttribute 或者 DefinedRowAttribute 接口,并且引用的时间属性必须出现在 TableSchema 中,类型为 timestamp 类型。如果要同时使用处理时间和事件时间,对应的 TableSource 需要同时实现这两个接口:
public interface DefinedProctimeAttribute { @Nullable String getProctimeAttribute(); } public interface DefinedRowtimeAttributes { List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors(); }
其中 RowtimeAttributeDescriptor 是对事件时间的描述,包括如何提取事件时间,以及 watermark 的生成策略等:
public final class RowtimeAttributeDescriptor { private final String attributeName; //时间属性名称 private final TimestampExtractor timestampExtractor; //如何提取事件时间 private final WatermarkStrategy watermarkStrategy; //如何生成 watermark }
尽管返回值是 List<RowtimeAttributeDescriptor>,但目前 Flink SQL 只支持单个事件时间属性。
SQL 引擎中时间属性的转换
更新 TableSchema
在将 DataStream 转换成一个 Table 的过程中,首先需要生成表结构。Table 的底层对应的是一个 QueryOperation,在这里就是 ScalaDataStreamQueryOperation (或者 JavaDataStreamQueryOperation,对应 Java API)。QueryOperation 提供了 TableSchema 和字段映射关系:
public class ScalaDataStreamQueryOperation<E> implements QueryOperation { private final DataStream<E> dataStream; private final int[] fieldIndices; //字段索引映射关系 private final TableSchema tableSchema; //表结构 }
获得 TableSchema 的逻辑主要被封装在 FieldInfoUtils.getFieldsInfo 方法中,主要是通过解析 Expression 获得表结构中每一列对应的字段在 DataStream 中元素的索引,并得到对应字段的类型:
/** * Utility methods for extracting names and indices of fields from different {@link TypeInformation}s. */ public class FieldInfoUtils { private static class ExprToFieldInfo extends ApiExpressionDefaultVisitor<FieldInfo> { @Override public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) { return createFieldInfo(unresolvedReference, null); } @Override public FieldInfo visit(UnresolvedCallExpression unresolvedCall) { if (unresolvedCall.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) { return visitAlias(unresolvedCall); } else if (isRowTimeExpression(unresolvedCall)) { return createRowtimeFieldInfo(unresolvedCall, null); } else if (isProcTimeExpression(unresolvedCall)) { return createProctimeFieldInfo(unresolvedCall, null); } return defaultMethod(unresolvedCall); } } private static boolean isRowTimeExpression(Expression origExpr) { return origExpr instanceof UnresolvedCallExpression && ((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.ROWTIME; } private static boolean isProcTimeExpression(Expression origExpr) { return origExpr instanceof UnresolvedCallExpression && ((UnresolvedCallExpression) origExpr).getFunctionDefinition() == BuiltInFunctionDefinitions.PROCTIME; } private static FieldInfo createTimeAttributeField( UnresolvedReferenceExpression reference, TimestampKind kind, //这里的Kind是TimestampKind.PROCTIME或TimestampKind.ROWTIME @Nullable String alias) { final int idx; //对于时间属性,没有对应的索引,用特殊的标识 if (kind == TimestampKind.PROCTIME) { idx = TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER; } else { idx = TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER; } String originalName = reference.getName(); return new FieldInfo( alias != null ? alias : originalName, idx, createTimeIndicatorType(kind)); } }
时间属性并不对应 DataStream 中元素真实的字段,因此会用特殊的标识来作为索引:
public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo<Timestamp> { public static final int ROWTIME_STREAM_MARKER = -1; public static final int PROCTIME_STREAM_MARKER = -2; }
如果通过 TableSource 注册一张表,首先会通过 TableSourceValidation.validateTableSource() 验证表结构、时间属性等信息,然后会被封装为 ConnectorCatalogTable,在这里会完成 TableSchema 的更新:
public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable { public static <T1> ConnectorCatalogTable source(TableSource<T1> source, boolean isBatch) { //更新 TableSchema final TableSchema tableSchema = calculateSourceSchema(source, isBatch); return new ConnectorCatalogTable<>(source, null, tableSchema, isBatch); } private static <T1> TableSchema calculateSourceSchema(TableSource<T1> source, boolean isBatch) { TableSchema tableSchema = source.getTableSchema(); if (isBatch) { return tableSchema; } DataType[] types = Arrays.copyOf(tableSchema.getFieldDataTypes(), tableSchema.getFieldCount()); String[] fieldNames = tableSchema.getFieldNames(); //检查是否实现了 DefinedRowtimeAttributes 接口 if (source instanceof DefinedRowtimeAttributes) { updateRowtimeIndicators((DefinedRowtimeAttributes) source, fieldNames, types); } //检查是否实现了 DefinedProctimeAttribute 接口 if (source instanceof DefinedProctimeAttribute) { updateProctimeIndicator((DefinedProctimeAttribute) source, fieldNames, types); } return TableSchema.builder().fields(fieldNames, types).build(); } private static void updateRowtimeIndicators( DefinedRowtimeAttributes source, String[] fieldNames, DataType[] types) { List<String> rowtimeAttributes = source.getRowtimeAttributeDescriptors() .stream() .map(RowtimeAttributeDescriptor::getAttributeName) .collect(Collectors.toList()); for (int i = 0; i < fieldNames.length; i++) { if (rowtimeAttributes.contains(fieldNames[i])) { // bridged to timestamp for compatible flink-planner types[i] = new AtomicDataType(new TimestampType(true, TimestampKind.ROWTIME, 3)) .bridgedTo(java.sql.Timestamp.class); } } } }
经过更新后的 TableSchema,时间属性列的 LogicalType 就是用特殊 TimestampKind 表征的 TimestampType。
转换到 Calcite
在 DatabaseCalciteSchema 中, Flink SQL 中注册的表被转换成 Calcite 中使用的表:
class DatabaseCalciteSchema extends FlinkSchema { @Override public Table getTable(String tableName) { ObjectPath tablePath = new ObjectPath(databaseName, tableName); try { if (!catalog.tableExists(tablePath)) { return null; } CatalogBaseTable table = catalog.getTable(tablePath); //将 Flink Catalog 中注册的表转换为 Calcite 中的 Table if (table instanceof QueryOperationCatalogView) { QueryOperationCatalogView view = (QueryOperationCatalogView) table; QueryOperation operation = view.getQueryOperation(); if (operation instanceof DataStreamQueryOperation) { List<String> qualifiedName = Arrays.asList(catalogName, databaseName, tableName); ((DataStreamQueryOperation) operation).setQualifiedName(qualifiedName); } else if (operation instanceof RichTableSourceQueryOperation) { List<String> qualifiedName = Arrays.asList(catalogName, databaseName, tableName); ((RichTableSourceQueryOperation) operation).setQualifiedName(qualifiedName); } return QueryOperationCatalogViewTable.createCalciteTable(view); } else if (table instanceof ConnectorCatalogTable) { return convertConnectorTable((ConnectorCatalogTable<?, ?>) table, tablePath); } else if (table instanceof CatalogTable) { return convertCatalogTable(tablePath, (CatalogTable) table); } else { throw new TableException("Unsupported table type: " + table); } } catch (TableNotExistException | CatalogException e) { // TableNotExistException should never happen, because we are checking it exists // via catalog.tableExists throw new TableException(format( "A failure occurred when accessing table. Table path [%s, %s, %s]", catalogName, databaseName, tableName), e); } } }
在 Flink SQL 中定义的表结构 TableSchema 也会经过 FlinkTypeFactory 转换,LogicalType也会被转换成 Calcite 内部使用的 RelDataType:
/** * Flink specific type factory that represents the interface between Flink's [[LogicalType]] * and Calcite's [[RelDataType]]. */ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) { def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = { ...... val relType = t.getTypeRoot match { case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE => val timestampType = t.asInstanceOf[TimestampType] timestampType.getKind match { //时间戳类型识别出常规的时间戳和时间属性 case TimestampKind.PROCTIME => createProctimeIndicatorType(true) case TimestampKind.ROWTIME => createRowtimeIndicatorType(true) case TimestampKind.REGULAR => createSqlType(TIMESTAMP) } case _ => seenTypes.get(t) match { case Some(retType: RelDataType) => retType case None => val refType = newRelDataType() seenTypes.put(t, refType) refType } } ..... } }
经过这一步转换,时间属性被转换成 TimeIndicatorRelDataType 类型。
在 Calcite 优化查询计划是,会识别特殊语句中的时间属性,并转换成对应的 RelNode。例如,在 FlinkLogicalJoin 中,如果关联条件中包含了时间窗口,就会被转换为 StreamExecWindowJoin;而不包含时间窗口的 FlinkLogicalJoin 则会被转换为 StreamExecJoin。其区别就在于对时间属性的识别:
/** * Rule that converts non-SEMI/ANTI [[FlinkLogicalJoin]] with window bounds in join condition * to [[StreamExecWindowJoin]]. */ class StreamExecWindowJoinRule extends ConverterRule( classOf[FlinkLogicalJoin], FlinkConventions.LOGICAL, FlinkConventions.STREAM_PHYSICAL, "StreamExecWindowJoinRule") { override def matches(call: RelOptRuleCall): Boolean = { ...... val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(join) //提取时间窗口的边界 val (windowBounds, _) = WindowJoinUtil.extractWindowBoundsFromPredicate( join.getCondition, join.getLeft.getRowType.getFieldCount, joinRowType, join.getCluster.getRexBuilder, tableConfig) if (windowBounds.isDefined) { //如果识别到时间窗口,该规则匹配 if (windowBounds.get.isEventTime) { true } else { // Check that no event-time attributes are in the input because the processing time window // join does not correctly hold back watermarks. // We rely on projection pushdown to remove unused attributes before the join. !joinRowType.getFieldList.exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) } } else { // the given join does not have valid window bounds. We cannot translate it. false } } }
窗口聚合也是同理,只是规则匹配中是对 HOP, TUMBLE 等窗口函数的识别。
时间属性是一个逻辑上的列,因为它并不真实对应底层 DataStream 元素中具体的字段。因此,如果在计算操作中要用到时间属性列的值,就需要“物化”(materialized)时间属性。这一部分的逻辑主要是在 RelTimeIndicatorConverter 中。
对于 PROCTIME 时间属性,就是将其转换为对内置函数 FlinkSqlOperatorTable.PROCTIME_MATERIALIZE 的调用;而对于 ROWTIME 时间属性,则是对其进行一次强制的类型转换,转换为常规的时间戳。
/** * Helper class for shared logic of materializing time attributes in [[RelNode]] and [[RexNode]]. */ class RexTimeIndicatorMaterializerUtils(rexBuilder: RexBuilder) { def materialize(expr: RexNode): RexNode = { if (isTimeIndicatorType(expr.getType)) { if (isRowtimeIndicatorType(expr.getType)) { // cast rowtime indicator to regular timestamp rexBuilder.makeAbstractCast(timestamp(expr.getType.isNullable), expr) } else { // generate proctime access rexBuilder.makeCall(FlinkSqlOperatorTable.PROCTIME_MATERIALIZE, expr) } } else { expr } } }
SQL 的查询计划最终需要被转换为 Flink 的算子,即生成物理执行计划。对于一个查询计划来说,首先需要进行转换的就是 Scan 操作,在里将底层的 POJO、Tuple 等对象映射为 Table 中使用的 Row。在这里就需要考虑到时间属性的映射。
首先,我们来看下在 StreamExecDataStreamScan 中是如何完成字段映射的转换的:
class StreamExecDataStreamScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, outputRowType: RelDataType) extends TableScan(cluster, traitSet, table) { override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[BaseRow] = { val config = planner.getTableConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream val transform = inputDataStream.getTransformation val rowtimeExpr = getRowtimeExpression(planner.getRelBuilder) // when there is row time extraction expression, we need internal conversion // when the physical type of the input date stream is not BaseRow, we need internal conversion. if (rowtimeExpr.isDefined || ScanUtil.needsConversion(dataStreamTable.dataType)) { // extract time if the index is -1&nbs***bsp;-2. val (extractElement, resetElement) = if (ScanUtil.hasTimeAttributeField(dataStreamTable.fieldIndexes)) { (s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;") } else { ("", "") } val ctx = CodeGeneratorContext(config).setOperatorBaseClass( classOf[AbstractProcessStreamOperator[BaseRow]]) //生成字段映射的代码 ScanUtil.convertToInternalRow( ctx, transform, dataStreamTable.fieldIndexes, //字段索引 dataStreamTable.dataType, getRowType, getTable.getQualifiedName, config, rowtimeExpr, //RowTime 表达式 beforeConvert = extractElement, afterConvert = resetElement) } else { transform.asInstanceOf[Transformation[BaseRow]] } } //获取 RowTime 时间属性的表达式 private def getRowtimeExpression(relBuilder: FlinkRelBuilder): Option[RexNode] = { val fieldIdxs = dataStreamTable.fieldIndexes if (!fieldIdxs.contains(ROWTIME_STREAM_MARKER)) { None } else { //根据字段映射中的特殊marker来查找 rowtime 字段 val rowtimeField = dataStreamTable.fieldNames(fieldIdxs.indexOf(ROWTIME_STREAM_MARKER)) // get expression to extract timestamp fromDataTypeToLogicalType(dataStreamTable.dataType) match { //如果这个datastream本身已经完成了 rowtime 时间属性的提取 case dataType: RowType if dataType.getFieldNames.contains(rowtimeField) && TypeCheckUtils.isRowTime(dataType.getTypeAt(dataType.getFieldIndex(rowtimeField))) => // if rowtimeField already existed in the data stream, use the default rowtime None case _ => // 用内置函数 StreamRecordTimestampSqlFunction 来提取 rowtime 时间属性 // extract timestamp from StreamRecord Some( relBuilder.cast( relBuilder.call(new StreamRecordTimestampSqlFunction), relBuilder.getTypeFactory.createFieldTypeFromLogicalType( new TimestampType(true, TimestampKind.ROWTIME, 3)).getSqlTypeName)) } } } }
在进行字段映射的时候,一个关键的处理就是时间属性要怎么映射。ROWTIME 时间属性被转换为 StreamRecordTimestampSqlFunction 的调用。在前面物化时间属性的阶段,我们已经看到,PROCTIME 时间属性已经被转换为内置函数 FlinkSqlOperatorTable.PROCTIME_MATERIALIZE 的调用,而对于 ROWTIME 时间属性,只是进行了一次强制的类型转换。这主要是因为,PROCTIME 在流处理中,在不同的算子中,每一次调用都应该获取当前的系统时间;而对于 ROWTIME 而言,它的取值是固定的,因此只需要在最开始完成一次转换即可。RPOCTIME 和 ROWTIME 的取值最终都被转换成对内置函数的调用。
这个函数调用的代码生成就比较简单了,RPOCTIME 转换成 context.timerService().currentProcessingTime(),ROWTIME 转换成 context.timestamp(),具体的代码生成可以参考 GenerateUtils。
由于 PROCTIME 并不需要在 Scan 节点进行物化,因此在这里直接用 null 值替代,在后续需要的时候重新进行计算。
而对于从 TableSource 注册而来的表,由于它不像 DataStream 那样,在定义 ROWTIME 时间属性之前已经完成了 timestamp 和 watermark 的指定。因此在转换 TableSource 的过程中,如果定义了 ROWTIME 时间属性,除了需要提取 ROWTIME 时间属性的值以外,还需要指定 watermark。
RowtimeAttributeDescriptor 是对 ROWTIME 时间属性的描述,包括如何提取事件时间的 TimestampExtractor:
/** * The {@link FieldComputer} interface returns an expression to compute the field of the table * schema of a {@link TableSource} from one&nbs***bsp;more fields of the {@link TableSource}'s return type. * * @param <T> The result type of the provided expression. */ public interface FieldComputer<T> { String[] getArgumentFields(); TypeInformation<T> getReturnType(); void validateArgumentFields(TypeInformation<?>[] argumentFieldTypes); /** * Returns the {@link Expression} that computes the value of the field. * * @param fieldAccesses Field access expressions for the argument fields. * @return The expression to extract the timestamp from the {@link TableSource} return type. */ Expression getExpression(ResolvedFieldReference[] fieldAccesses); } //Provides an expression to extract the timestamp for a rowtime attribute. public abstract class TimestampExtractor implements FieldComputer<Long>, Serializable, Descriptor { @Override public TypeInformation<Long> getReturnType() { return Types.LONG; //返回类型是 Long } }
通常提取 ROWTIME 的方式是根据已有的字段来生成,即:
public final class ExistingField extends TimestampExtractor { private String field; /** * Returns an {@link Expression} that casts a {@link Long}, {@link Timestamp},&nbs***bsp;* timestamp formatted {@link String} field (e.g., "2018-05-28 12:34:56.000") * into a rowtime attribute. */ @Override public Expression getExpression(ResolvedFieldReference[] fieldAccesses) { ResolvedFieldReference fieldAccess = fieldAccesses[0]; DataType type = fromLegacyInfoToDataType(fieldAccess.resultType()); //字段引用的表达式 FieldReferenceExpression fieldReferenceExpr = new FieldReferenceExpression( fieldAccess.name(), type, 0, fieldAccess.fieldIndex()); //支持的输入字段类型,包括 BIGINT、TIMESTAMP_WITHOUT_TIME_ZONE 和 VARCHAR switch (type.getLogicalType().getTypeRoot()) { case BIGINT: case TIMESTAMP_WITHOUT_TIME_ZONE: //直接引用相应的字段即可 return fieldReferenceExpr; case VARCHAR: //进行一次类型转换 DataType outputType = TIMESTAMP(3).bridgedTo(Timestamp.class); return new CallExpression( CAST, Arrays.asList(fieldReferenceExpr, typeLiteral(outputType)), outputType); default: throw new RuntimeException("Unsupport type: " + type); } } }
TimestampExtractor 提供的提取时间属性的 Expression, 要先被转换为 RexNode 之后才可以被用在代码生成中,通过 TableSourceUtil.getRowtimeExtractionExpression 完成。如同前面提到对 StreamRecordTimestampSqlFunction 调用,这两种获得时间属性的的方式本质上是一样的,只是调用的函数不一致。
在完成了字段映射和时间属性的提取后,还需要指定 watermark,这正是 RowtimeAttributeDescriptor 中 WatermarkStrategy 的用处:
class StreamExecTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, relOptTable: FlinkRelOptTable) extends PhysicalTableSourceScan(cluster, traitSet, relOptTable) { override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[BaseRow] = { val config = planner.getTableConfig val inputTransform = getSourceTransformation(planner.getExecEnv) //1. 字段映射和时间提取 ...... //2. 指定watermark val withWatermarks = if (rowtimeDesc.isDefined) { val rowtimeFieldIdx = getRowType.getFieldNames.indexOf(rowtimeDesc.get.getAttributeName) val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy watermarkStrategy match { case p: PeriodicWatermarkAssigner => val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) case p: PunctuatedWatermarkAssigner => val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p, producedDataType) ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) case _: PreserveWatermarks => // The watermarks have already been provided by the underlying DataStream. ingestedTable } } else { // No need to generate watermarks if no rowtime attribute is specified. ingestedTable } withWatermarks.getTransformation } }
到这里,就完成了 ROWTIME 时间属性的提取的 watermark 的生成。 PROCTIME 在使用时按需要进行物化。
时间属性是广泛应用在窗口操作中,是流式 SQL 处理中非常重要的概念。本文对 Flink SQL 中时间属性的使用方法和具体实现进行了介绍。