Timestamps

Flink的时间一共分为:Processing Time(处理时间)、Event Time (事件时间)和 Ingestion Time(摄取时间)

  • Event Time 应用程序填装数据,塞入的时间。也就是数据本身携带的时间
  • Ingestion Time Flink数据源接收到每个事件源的当前时间作为时间戳
  • Processing Time 事件被处理时的机器时间
    图片说明

Event Time

事件产生的时间,该时间通常在它们进入 Flink 之前嵌入到记录中,并且可以从每个记录中提取事件时间戳。 (可以想象成它是数据本身的一个属性,它的值保存的是时间)

Event Time 中,时间值取决于数据,而不取决于系统时间。 Event Time 程序必须指定如何生成 Event Time 的 Watermark,这是表示 Event Time 进度的机制。

Ingestion Time

Ingestion Time 是事件进入Flink的时间。

在源操作处,每条记录都将源的当前时间作为时间戳记,并且基于时间的操作(例如时间窗口)引用该时间戳记。

Ingestion Time 从概念上讲介于事件时间和处理时间之间。

与 Processing Time 相比,它稍微贵一点(翻译的时候有点懵,应该是程序计算资源花费会增加,因为相比于前面两种类型,它会自动分配 Watermark),但结果却更可预测。

由于 Ingestion Time 使用稳定的时间戳(在源处分配了一次),因此对记录的不同窗口操作将引用相同的时间戳,而在 Processing Time 中,每个窗口的算子 Operator 都可以将记录分配给不同的窗口(基于本地系统时间和到达延误)。

与 Processing Time 相比,Ingestion Time 程序无法处理任何乱序事件或延迟数据,但程序无需指定如何生成 Watermark。

在内部,将 Ingestion Time 视为事件发生的时间,它具有自动分配时间戳和自动生成 Watermark 的功能。

Processing Time

Processing Time是指事件正在执行所在机器(部署应用服务器)的系统时间。

当流式程序在 Processing Time 上运行时,所有基于时间的操作(如时间窗口)都将使用运行相应算子 Operator 所在计算机的系统时钟。

每小时 Processing Time 窗口将包括系统时钟指示整小时的时间之间到达特定操作员的所有记录。例如,如果应用程序在 9:15 am开始运行,则第一个每小时处理 Processing Time 将包括在 9:15 am 和 10:00 am 之间处理的事件,下一个窗口将包括在 10:00 am 和 11:00 am 之间处理的事件,依此类推。

Processing Time 是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,Processing Time 不能提供确定性,因为它容易受到记录到达系统(例如从消息队列)到达系统的速度,记录在系统内部操作员之间流动的速度的影响,以及中断(计划的或其他方式)。

设置时间属性

Flink DataStream程序的第一部分通常设置基准时间特征。该设置定义了数据流源的行为方式(例如,是否分配时间戳),以及诸如的窗口操作应使用什么时间概念KeyedStream.timeWindow(Time.seconds(30))。

以下示例显示了一个Flink程序,该程序在每小时的时间窗口中汇总事件。窗口的行为与时间特征相适应。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));

stream
    .keyBy( (event) -> event.getUser() )
    .timeWindow(Time.hours(1))
    .reduce( (a, b) -> a.add(b) )
    .addSink(...);

请注意,为了在事件时间中运行上面示例程序,程序需要使用直接为数据定义 Processing Time 并自己设定 Watermark 的生成规则,或者程序必须在源之后注入 Timestamp Assigner & Watermark Generator 。这些功能描述了如何访问事件时间戳,以及事件流呈现出何种程度的乱序。

Watermarks

为了能够准确地表达事件时间的处理进度,就必须用到水印。Flink水印的本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。当时间戳为T的水印出现时,表示事件时间t <= T的数据都已经到达,即水印后面应该只能流入事件时间t > T的数据。也就是说,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。
下面的图,是一个乱序的基于事件时间的数据流示例:
图片说明
图中的方框就是数据元素,其中的数字表示事件时间,W(x)就表示时间戳是x的水印,并有长度为4个时间单位的滚动窗口。假设时间单位为秒,可见事件时间为2、3、1s的元素都会进入区间为[1s, 4s]的窗口,而事件时间为7s的元素会进入区间为[5s, 8s]的窗口。当水印W(4)到达时,表示已经没有t <= 4s的元素了,[1s, 4s]窗口会被触发并计算。同理,水印W(9)到达时,[5s, 8s]窗口会被触发并计算,以此类推。

如果事件时间为6的元素出现在W(9)后面,就算是迟到了。后面会有迟到数据的处理。

上面的示例只有一个并行度,在有多个并行度的情况下,就会有多个流产生水印,窗口触发时该采用哪个水印呢?答案是所有流入水印中时间戳最小的那个。

来自官方文档的图能够说明问题。
图片说明
如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间,那么所有流的数据肯定已经全部收齐,就可以安全地触发窗口计算了。

提取事件时间、产生水印

那么事件时间是如何从数据中提取的,水印又是如何产生的呢?Flink提供了统一的DataStream.assignTimestampsAndWatermarks()方法来提取事件时间并同时产生水印。
assignTimestampsAndWatermarks()方法接受的参数类型有AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks两种,分别对应周期性水印和打点(即由事件本身的属性触发)水印。

周期性水印

使用AssignerWithPeriodicWatermarks时,水印是周期性产生的。默认ProcessingTime时生成水印的间隔为0
EventTime和IngestionTime默认生成水印间隔为200ms,也能通过ExecutionConfig.setAutoWatermarkInterval()方法来指定新的周期。
我们需要通过实现extractTimestamp()方法来提取事件时间,实现getCurrentWatermark()方法产生水印。但好在Flink已经提供了2种内置的实现类,所以我们直接用就可以了。

  • BoundedOutOfOrdernessGenerator

    /**
    * This generator generates watermarks assuming that elements arrive out of order,
    * but only to a certain degree. The latest elements for a certain timestamp t will arrive
    * at most n milliseconds after the earliest elements for timestamp t.
    */
    public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
    
      private final long maxOutOfOrderness = 3500; // 3.5 seconds
    
      private long currentMaxTimestamp;
    
      @Override
      public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
          long timestamp = element.getCreationTime();
          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
          return timestamp;
      }
    
      @Override
      public Watermark getCurrentWatermark() {
          // return the watermark as current highest timestamp minus the out-of-orderness bound
          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
      }
    }
  • TimeLagWatermarkGenerator

    /**
    * This generator generates watermarks that are lagging behind processing time by a fixed amount.
    * It assumes that elements arrive in Flink after a bounded delay.
    */
    public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
    
      private final long maxTimeLag = 5000; // 5 seconds
    
      @Override
      public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
          return element.getCreationTime();
      }
    
      @Override
      public Watermark getCurrentWatermark() {
          // return the watermark as current time minus the maximum time lag
          return new Watermark(System.currentTimeMillis() - maxTimeLag);
      }
    }

带标点的水印

要在特定事件表明可能会生成新的水印时生成水印,请使用 AssignerWithPunctuatedWatermarks。对于此类,Flink将首先调用该extractTimestamp(...)方法为元素分配时间戳,然后立即checkAndGetNextWatermark(...)在该元素上调用该 方法。

该checkAndGetNextWatermark(...)方***传递该方法中分配的时间戳extractTimestamp(...) ,并可以决定是否要生成水印。每当该checkAndGetNextWatermark(...) 方法返回一个非空水印,并且该水印大于最新的先前水印时,就会发出新的水印。

public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
}
注意:可以在每个事件上生成水印。但是,由于每个水印都会在下游引起一些计算,因此过多的水印会降低性能。

总结:水印的作用是解决事件的乱序问题,他的原理可以看成以下的时间轴。Watermark有单独的时间轴,他允许设置延迟时间,触发计算是以Watermark时间为准的。而时间窗口的划分是以实际处理时间划分的。

图片说明

迟到事件

虽说水位线表明着早于它的事件不应该再出现,但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

重新激活已经关闭的窗口并重新计算以修正结果。
将迟到事件收集起来另外处理。
将迟到事件视为错误消息并丢弃。
Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side Output和Allowed Lateness。

  • Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

  • Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

参考

https://ci.apache.org/projects/flink
https://blog.csdn.net/Dax1n/article/details/77975935