Window 窗口是什么

Windows 是处理无限流的核心。Windows 将流分成有限大小的“存储桶”,我们可以在其上应用计算。Flink 是一个优秀的流计算引擎,数据是源源不断的,它认为批处理 Batch 是一种特殊的流计算,在流中分割出一个个窗口,每个窗口相当于有限大小的空间,汇聚了待处理的数据。

窗口式 Flink 程序的一般结构如下所示。第一个片段指的是键控流,第二个片段指的是非键控流。可以看到,唯一的区别是对键控流的 keyBy(...) 调用和对非键控流的 window(...) 变为 windowAll(...)。

// Keyed Window
stream
       .keyBy(...)               <-  按照一个Key进行分组
       .window(...)              <-  将数据流中的元素分配到相应的窗口中
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function

// Non-Keyed Window
stream
       .windowAll(...)           <-  不分组,将数据流中的所有元素分配到相应的窗口中
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function


首先,我们要决定是否对一个DataStream按照Key进行分组,这一步必须在窗口计算之前进行。经过keyBy的数据流将形成多组数据,下游算子的多个实例可以并行计算。windowAll不对数据流进行分组,所有数据将发送到下游算子单个实例上。决定是否分组之后,窗口的后续操作基本相同,本文所涉及内容主要针对经过keyBy的窗口(Keyed Window),经过windowAll的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1。

Window 按驱动类型分类

Tumbling window 滚动窗口

图片说明

//基于滚动窗口,以事件时间表达处理进度。使用水印,窗口触发的标记并且允许数据延迟五秒到达。
 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //如果不指定时间的话,默认是 ProcessingTime,但是如果指定为事件事件的话,需要事件中带有时间或者添加时间水印
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStream<WordEvent> data = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<WordEvent>() {
                    private long currentTimestamp = Long.MIN_VALUE;

                    private final long maxTimeLag = 5000;

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag);
                    }

                    @Override
                    public long extractTimestamp(WordEvent element, long previousElementTimestamp) {
                        long timestamp = element.getTimestamp();
                        currentTimestamp = Math.max(timestamp, currentTimestamp);
                        return timestamp;
                    }
                });
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssSSS");
        data.keyBy(WordEvent::getWord)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new WindowFunction<WordEvent, WordEvent, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<WordEvent> input, Collector<WordEvent> out) throws Exception {
                        System.out.println(format.format(window.getStart()) + "====" + format.format(window.getEnd() )+ " w"+( window.getEnd()-window.getStart()));
                        for (WordEvent word : input) {
                            out.collect(word);
                        }
                    }
                })
                .print("window计算结果:");

        env.execute("flink window example");

TimeCharacteristic.EventTime,需要与后面算子 window 操作的 TumblingEventTimeWindows.of(Time.seconds(10)) 保持一致

Sliding Windows 滑动窗口

有时候我们会需要每 5min 后,统计前面 60min 内事件的聚合值。

这时需要滑动窗口,保存上一次时间窗口的输入值,给下一个窗口使用重叠的部分。所以一个数据在滑动窗口中,可以出现在多个窗口中。

图片说明

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9000);
 data.flatMap(new LineSplitter())
                .keyBy(1)
                .timeWindow(Time.seconds(60), Time.seconds(30))
                .sum(0)
                .print();

Session window 会话窗口

Session Window 会话窗口分配器 Assigner 按活动会话对元素进行分组。

与 Tumbling 滚动窗口和 Sliding 滑动窗口相比,会话窗口不重叠且没有固定的开始和结束时间。

相反,当会话窗口在一定时间段内未接收到元素时,即在发生不活动间隙时,它将关闭。会话窗口分配器可以配置有静态会话间隔,也可以配置有会话间隔提取器功能,该功能定义不活动的时间长度。 当此时间段到期时,当前会话将关闭,随后的元素将分配给新的会话窗口。
图片说明

//基于会话时间窗口,根据处理时间,统计五秒内接收到的数据,对接收到的数据的第二个参数进行分组,对第一个参数进行数据累加
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//如果不指定时间的话,默认是 ProcessingTime,但是如果指定为事件事件的话,需要事件中带有时间或者添加时间水印
DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9000);
data.flatMap(new LineSplitter())
                .keyBy(1)
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) //表示如果 5s 内没出现数据则认为超出会话时长,然后计算这个窗口的和
                .sum(0)
                .print();
        env.execute("flink window example");

public class LineSplitter implements FlatMapFunction<String, Tuple2<Long, String>> {
    @Override
    public void flatMap(String s, Collector<Tuple2<Long, String>> collector) {
        String[] tokens = s.split(" ");

        if (tokens.length >= 2 && isValidLong(tokens[0])) {
            collector.collect(new Tuple2<>(Long.valueOf(tokens[0]), tokens[1]));
        }
    }

    private static boolean isValidLong(String str) {
        try {
            long _v = Long.parseLong(str);
            return true;
        } catch (NumberFormatException e) {
            log.info("the str = {} is not a number", str);
            return false;
        }
    }
}

图片说明

图片说明

CountWindows 计数窗口

Apache Flink 还具有计数窗口。滚动计数窗口为 100 时,将在一个窗口中收集 100 个事件,并在添加第 100 个元素时触发该窗口的计算。

在 DataStream API 中,滚动和滑动计数窗口的定义如下:

Tunmling Count Window

例如每次想要统计 100 个事件的合计值,只要从数据源处,累加满 100 个事件就触发计算,可以使用下面的 API

 DataStream<WordWithCount> windowCounts = text
    .flatMap(...)
    .keyBy("word")
    .countWindow(100)
    .sum(1);

Sliding Count Window

在一些场景下,有可能来了 5 个数据后,要统计前 100 个数据的合计值,每个数据可以被分配到多个窗口中,那么就可以使用滑动窗口实现。

 DataStream<WordWithCount> windowCounts = text
    .flatMap(...)
    .keyBy("word")
    .window(SlidingEventTimeWindows.of(5, 100)
    .sum(1);

Window 开放的三大核心 API

核心有三个组件:窗口分配器 Window Assigner 、触发器 Trigger、驱逐者 Evictor

Window Assigner

该组件主要功能是决定数据该分发到哪个窗口,它的作用可以类比于 Spring MVC 中的 Dispatcher。
图片说明

上图左侧是 Flink 窗口实现的包结构,三大组件在对应的目录下,清晰明了。

底部是 Window Assigner 的继承类,在调用 WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) 这类方法时,可以传入上述的窗口分发器,在里面实现自定义的窗口分发逻辑。

Trigger

每个窗口都有一个触发器,该触发器决定何时评估或清除该窗口。

对于每个插入到窗口中的元素以及先前注册的计时器超时时,将触发该触发器。

Evictor

直译为 ‘驱逐者’,作用类似于过滤器 fliter,在 trigger 后执行,如果设定了 evictor,将会去除不符合条件的数据(默认是不设定的,不会驱逐)
通过这三大组件,可以实现自定义窗口逻辑,决定数据如何分配、何时触发计算以及哪些数据要被提前去除,详细使用例子可以参考官网的示例:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html

窗口计算什么条件下会触发呢

  • Event Time < watermark时间(对于late element太多的数据而言)
  • watermark时间 >= window_end_time(对于out-of-order以及正常的数据而言)&在[window_start_time,window_end_time)中有数据存在