Window 窗口是什么
Windows 是处理无限流的核心。Windows 将流分成有限大小的“存储桶”,我们可以在其上应用计算。Flink 是一个优秀的流计算引擎,数据是源源不断的,它认为批处理 Batch 是一种特殊的流计算,在流中分割出一个个窗口,每个窗口相当于有限大小的空间,汇聚了待处理的数据。
窗口式 Flink 程序的一般结构如下所示。第一个片段指的是键控流,第二个片段指的是非键控流。可以看到,唯一的区别是对键控流的 keyBy(...) 调用和对非键控流的 window(...) 变为 windowAll(...)。
// Keyed Window
stream
.keyBy(...) <- 按照一个Key进行分组
.window(...) <- 将数据流中的元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
// Non-Keyed Window
stream
.windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中
[.trigger(...)] <- 指定触发器Trigger(可选)
[.evictor(...)] <- 指定清除器Evictor(可选)
.reduce/aggregate/process() <- 窗口处理函数Window Function
首先,我们要决定是否对一个DataStream按照Key进行分组,这一步必须在窗口计算之前进行。经过keyBy的数据流将形成多组数据,下游算子的多个实例可以并行计算。windowAll不对数据流进行分组,所有数据将发送到下游算子单个实例上。决定是否分组之后,窗口的后续操作基本相同,本文所涉及内容主要针对经过keyBy的窗口(Keyed Window),经过windowAll的算子是不分组的窗口(Non-Keyed Window),它们的原理和操作与Keyed Window类似,唯一的区别在于所有数据将发送给下游的单个实例,或者说下游算子的并行度为1。
Window 按驱动类型分类
Tumbling window 滚动窗口
//基于滚动窗口,以事件时间表达处理进度。使用水印,窗口触发的标记并且允许数据延迟五秒到达。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//如果不指定时间的话,默认是 ProcessingTime,但是如果指定为事件事件的话,需要事件中带有时间或者添加时间水印
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<WordEvent> data = env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<WordEvent>() {
private long currentTimestamp = Long.MIN_VALUE;
private final long maxTimeLag = 5000;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag);
}
@Override
public long extractTimestamp(WordEvent element, long previousElementTimestamp) {
long timestamp = element.getTimestamp();
currentTimestamp = Math.max(timestamp, currentTimestamp);
return timestamp;
}
});
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssSSS");
data.keyBy(WordEvent::getWord)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new WindowFunction<WordEvent, WordEvent, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<WordEvent> input, Collector<WordEvent> out) throws Exception {
System.out.println(format.format(window.getStart()) + "====" + format.format(window.getEnd() )+ " w"+( window.getEnd()-window.getStart()));
for (WordEvent word : input) {
out.collect(word);
}
}
})
.print("window计算结果:");
env.execute("flink window example");TimeCharacteristic.EventTime,需要与后面算子 window 操作的 TumblingEventTimeWindows.of(Time.seconds(10)) 保持一致
Sliding Windows 滑动窗口
有时候我们会需要每 5min 后,统计前面 60min 内事件的聚合值。
这时需要滑动窗口,保存上一次时间窗口的输入值,给下一个窗口使用重叠的部分。所以一个数据在滑动窗口中,可以出现在多个窗口中。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9000);
data.flatMap(new LineSplitter())
.keyBy(1)
.timeWindow(Time.seconds(60), Time.seconds(30))
.sum(0)
.print();Session window 会话窗口
Session Window 会话窗口分配器 Assigner 按活动会话对元素进行分组。
与 Tumbling 滚动窗口和 Sliding 滑动窗口相比,会话窗口不重叠且没有固定的开始和结束时间。
相反,当会话窗口在一定时间段内未接收到元素时,即在发生不活动间隙时,它将关闭。会话窗口分配器可以配置有静态会话间隔,也可以配置有会话间隔提取器功能,该功能定义不活动的时间长度。 当此时间段到期时,当前会话将关闭,随后的元素将分配给新的会话窗口。
//基于会话时间窗口,根据处理时间,统计五秒内接收到的数据,对接收到的数据的第二个参数进行分组,对第一个参数进行数据累加
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//如果不指定时间的话,默认是 ProcessingTime,但是如果指定为事件事件的话,需要事件中带有时间或者添加时间水印
DataStreamSource<String> data = env.socketTextStream("127.0.0.1", 9000);
data.flatMap(new LineSplitter())
.keyBy(1)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) //表示如果 5s 内没出现数据则认为超出会话时长,然后计算这个窗口的和
.sum(0)
.print();
env.execute("flink window example");
public class LineSplitter implements FlatMapFunction<String, Tuple2<Long, String>> {
@Override
public void flatMap(String s, Collector<Tuple2<Long, String>> collector) {
String[] tokens = s.split(" ");
if (tokens.length >= 2 && isValidLong(tokens[0])) {
collector.collect(new Tuple2<>(Long.valueOf(tokens[0]), tokens[1]));
}
}
private static boolean isValidLong(String str) {
try {
long _v = Long.parseLong(str);
return true;
} catch (NumberFormatException e) {
log.info("the str = {} is not a number", str);
return false;
}
}
}
CountWindows 计数窗口
Apache Flink 还具有计数窗口。滚动计数窗口为 100 时,将在一个窗口中收集 100 个事件,并在添加第 100 个元素时触发该窗口的计算。
在 DataStream API 中,滚动和滑动计数窗口的定义如下:
Tunmling Count Window
例如每次想要统计 100 个事件的合计值,只要从数据源处,累加满 100 个事件就触发计算,可以使用下面的 API
DataStream<WordWithCount> windowCounts = text
.flatMap(...)
.keyBy("word")
.countWindow(100)
.sum(1);Sliding Count Window
在一些场景下,有可能来了 5 个数据后,要统计前 100 个数据的合计值,每个数据可以被分配到多个窗口中,那么就可以使用滑动窗口实现。
DataStream<WordWithCount> windowCounts = text
.flatMap(...)
.keyBy("word")
.window(SlidingEventTimeWindows.of(5, 100)
.sum(1);Window 开放的三大核心 API
核心有三个组件:窗口分配器 Window Assigner 、触发器 Trigger、驱逐者 Evictor
Window Assigner
该组件主要功能是决定数据该分发到哪个窗口,它的作用可以类比于 Spring MVC 中的 Dispatcher。
上图左侧是 Flink 窗口实现的包结构,三大组件在对应的目录下,清晰明了。
底部是 Window Assigner 的继承类,在调用 WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) 这类方法时,可以传入上述的窗口分发器,在里面实现自定义的窗口分发逻辑。
Trigger
每个窗口都有一个触发器,该触发器决定何时评估或清除该窗口。
对于每个插入到窗口中的元素以及先前注册的计时器超时时,将触发该触发器。
Evictor
直译为 ‘驱逐者’,作用类似于过滤器 fliter,在 trigger 后执行,如果设定了 evictor,将会去除不符合条件的数据(默认是不设定的,不会驱逐)
通过这三大组件,可以实现自定义窗口逻辑,决定数据如何分配、何时触发计算以及哪些数据要被提前去除,详细使用例子可以参考官网的示例:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html
窗口计算什么条件下会触发呢
- Event Time < watermark时间(对于late element太多的数据而言)
- watermark时间 >= window_end_time(对于out-of-order以及正常的数据而言)&在[window_start_time,window_end_time)中有数据存在



京公网安备 11010502036488号