Window 窗口是什么
Windows 是处理无限流的核心。Windows 将流分成有限大小的“存储桶”,我们可以在其上应用计算。Flink 是一个优秀的流计算引擎,数据是源源不断的,它认为批处理 Batch 是一种特殊的流计算,在流中分割出一个个窗口,每个窗口相当于有限大小的空间,汇聚了待处理的数据。
窗口式 Flink 程序的一般结构如下所示。第一个片段指的是键控流,第二个片段指的是非键控流。可以看到,唯一的区别是对键控流的 keyBy(...) 调用和对非键控流的 window(...) 变为 windowAll(...)。
// Keyed Window stream .keyBy(...) <- 按照一个Key进行分组 .window(...) <- 将数据流中的元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) .reduce/aggregate/process() <- 窗口处理函数Window Function // Non-Keyed Window stream .windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中 [.trigger(...)] <- 指定触发器Trigger(可选) [.evictor(...)] <- 指定清除器Evictor(可选) .reduce/aggregate/process() <- 窗口处理函数Window Function 首先,我们要决定是否对一个DataStream按照Key进行分组,这一步必须在窗口计算之前进行。经过keyBy的数据流将形成多组数据,下游算子的多个实例可以并行计算。windowAll不对数据流进行分组,所有数据将发送到下游算子单个实例上。决定是否分组之后,窗口的后续操作基本相同,本文所涉及内容主要针对经过keyBy的窗口(Keyed Window),经过windowAll的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1。
Window 按驱动类型分类
Tumbling window 滚动窗口
//基于滚动窗口,以事件时间表达处理进度。使用水印,窗口触发的标记并且允许数据延迟五秒到达。 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //如果不指定时间的话,默认是 ProcessingTime,但是如果指定为事件事件的话,需要事件中带有时间或者添加时间水印 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStream<WordEvent> data = env.addSource(new CustomSource()) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<WordEvent>() { private long currentTimestamp = Long.MIN_VALUE; private final long maxTimeLag = 5000; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag); } @Override public long extractTimestamp(WordEvent element, long previousElementTimestamp) { long timestamp = element.getTimestamp(); currentTimestamp = Math.max(timestamp, currentTimestamp); return timestamp; } }); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssSSS"); data.keyBy(WordEvent::getWord) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .apply(new WindowFunction<WordEvent, WordEvent, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<WordEvent> input, Collector<WordEvent> out) throws Exception { System.out.println(format.format(window.getStart()) + "====" + format.format(window.getEnd() )+ " w"+( window.getEnd()-window.getStart())); for (WordEvent word : input) { out.collect(word); } } }) .print("window计算结果:"); env.execute("flink window example");
TimeCharacteristic.EventTime,需要与后面算子 window 操作的 TumblingEventTimeWindows.of(Time.seconds(10)) 保持一致
Sliding Windows 滑动窗口
有时候我们会需要每 5min 后,统计前面 60min 内事件的聚合值。
这时需要滑动窗口,保存上一次时间窗口的输入值,给下一个窗口使用重叠的部分。所以一个数据在滑动窗口中,可以出现在多个窗口中。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9000); data.flatMap(new LineSplitter()) .keyBy(1) .timeWindow(Time.seconds(60), Time.seconds(30)) .sum(0) .print();
Session window 会话窗口
Session Window 会话窗口分配器 Assigner 按活动会话对元素进行分组。
与 Tumbling 滚动窗口和 Sliding 滑动窗口相比,会话窗口不重叠且没有固定的开始和结束时间。
相反,当会话窗口在一定时间段内未接收到元素时,即在发生不活动间隙时,它将关闭。会话窗口分配器可以配置有静态会话间隔,也可以配置有会话间隔提取器功能,该功能定义不活动的时间长度。 当此时间段到期时,当前会话将关闭,随后的元素将分配给新的会话窗口。
//基于会话时间窗口,根据处理时间,统计五秒内接收到的数据,对接收到的数据的第二个参数进行分组,对第一个参数进行数据累加 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //如果不指定时间的话,默认是 ProcessingTime,但是如果指定为事件事件的话,需要事件中带有时间或者添加时间水印 DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9000); data.flatMap(new LineSplitter()) .keyBy(1) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) //表示如果 5s 内没出现数据则认为超出会话时长,然后计算这个窗口的和 .sum(0) .print(); env.execute("flink window example"); public class LineSplitter implements FlatMapFunction<String, Tuple2<Long, String>> { @Override public void flatMap(String s, Collector<Tuple2<Long, String>> collector) { String[] tokens = s.split(" "); if (tokens.length >= 2 && isValidLong(tokens[0])) { collector.collect(new Tuple2<>(Long.valueOf(tokens[0]), tokens[1])); } } private static boolean isValidLong(String str) { try { long _v = Long.parseLong(str); return true; } catch (NumberFormatException e) { log.info("the str = {} is not a number", str); return false; } } }
CountWindows 计数窗口
Apache Flink 还具有计数窗口。滚动计数窗口为 100 时,将在一个窗口中收集 100 个事件,并在添加第 100 个元素时触发该窗口的计算。
在 DataStream API 中,滚动和滑动计数窗口的定义如下:
Tunmling Count Window
例如每次想要统计 100 个事件的合计值,只要从数据源处,累加满 100 个事件就触发计算,可以使用下面的 API
DataStream<WordWithCount> windowCounts = text .flatMap(...) .keyBy("word") .countWindow(100) .sum(1);
Sliding Count Window
在一些场景下,有可能来了 5 个数据后,要统计前 100 个数据的合计值,每个数据可以被分配到多个窗口中,那么就可以使用滑动窗口实现。
DataStream<WordWithCount> windowCounts = text .flatMap(...) .keyBy("word") .window(SlidingEventTimeWindows.of(5, 100) .sum(1);
Window 开放的三大核心 API
核心有三个组件:窗口分配器 Window Assigner 、触发器 Trigger、驱逐者 Evictor
Window Assigner
该组件主要功能是决定数据该分发到哪个窗口,它的作用可以类比于 Spring MVC 中的 Dispatcher。
上图左侧是 Flink 窗口实现的包结构,三大组件在对应的目录下,清晰明了。
底部是 Window Assigner 的继承类,在调用 WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) 这类方法时,可以传入上述的窗口分发器,在里面实现自定义的窗口分发逻辑。
Trigger
每个窗口都有一个触发器,该触发器决定何时评估或清除该窗口。
对于每个插入到窗口中的元素以及先前注册的计时器超时时,将触发该触发器。
Evictor
直译为 ‘驱逐者’,作用类似于过滤器 fliter,在 trigger 后执行,如果设定了 evictor,将会去除不符合条件的数据(默认是不设定的,不会驱逐)
通过这三大组件,可以实现自定义窗口逻辑,决定数据如何分配、何时触发计算以及哪些数据要被提前去除,详细使用例子可以参考官网的示例:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html
窗口计算什么条件下会触发呢
- Event Time < watermark时间(对于late element太多的数据而言)
- watermark时间 >= window_end_time(对于out-of-order以及正常的数据而言)&在[window_start_time,window_end_time)中有数据存在