前言

Flink SQL的窗口操作在straming的实现上增添了不少东西。是用起来越来越方便。本篇主要内容:

  • Watermark

  • 滚动窗口

  • 滑动窗口

  • 累积窗口函数

  • 窗口分组聚合GROUPING SETS

  • Clue幂集函数

  • Over函数

Watermark

引入

由于实时计算的输入数据是持续不断的,因此我们需要一个有效的进度指标,来帮助我们确定关闭时间窗口的正确时间点,保证关闭窗口后不会再有数据进入该窗口,可以安全输出这个窗口的聚合结果。

而Watermark就是一种衡量Event Time进展的有效机制。随着时间的推移,最早流入实时计算的数据会被处理完成,之后流入的数据处于正在处理状态。处于正在处理部分的和已处理部分的交界的时间戳,可以被定义为Watermark,代表在此之前的事件已经被处理完成并输出。

针对乱序的流,Watermark也至关重要,即使部分事件延迟到达,也不会影响窗口计算的正确性。此外,并行数据流中,当算子(Operator)有多个输入流时,算子的Event Time以最小流Event Time为准。

具体可参考Flink Straming的原理介绍:

Flink的窗口、时间语义,Watermark机制,多代码案例详解,Flink学习入门(三)

watermark策略

Flink SQL提供了几种常用的watermark策略。

  1. 严格意义上递增的时间戳,发出到目前为止已观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。watermark for rowtime_column as rowtime_column

  2. 递增的时间戳,发出到目前为止已观察到的最大时间戳为负1的水印。时间戳等于或小于最大时间戳的行不会迟到。watermark for rowtime_column as rowtime_column - INTERVAL '1' SECOND.

  3. 有界时间戳(乱序)发出水印,它是观察到的最大时间戳减去指定的延迟, 例如,watermark for rowtime_column as rowtime_column - INTERVAL'5'SECOND是5秒的延迟水印策略。watermark for rowtime_column as rowtime_column - INTERVAL 'string' timeUnit.

现常用的语法

watermark [watermarkName] for <rowtime_field> as withOffset(<rowtime_field>, offset)

参数 是否必填 说明
watermarkName 标识Watermark的名字。
<rowtime_field> <rowtime_field>必须是表中已定义的一列(当前仅支持TIMESTAMP类型),基于该列生成Watermark,并且标识该列为Event Time列。您可以使用<rowtime_field>在作业代码中定义窗口。
withOffset Watermark的生成策略,根据<rowtime_field> - offset生成Watermark的值。withOffset的第一个参数必须是<rowtime_field>。
offset Watermark值与Event Time值的偏移量,单位为毫秒。

窗口函数

滚动窗口

滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。

语法

TUMBLE函数用在GROUP BY子句中,用来定义滚动窗口。TUMBLE(< time-attr>, < size-interval>) < size-interval>: INTERVAL 'string' timeUnit

< time-attr>参数必须是时间流中的一个合法的时间属性字段

标识函数

使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合

窗口标识函数 返回类型 描述
TUMBLE_START(time-attr, size-interval) TIMESTAMP 返回窗口的起始时间(包含边界)。例如[00:10,00:15)窗口,返回00:10
TUMBLE_END(time-attr, size-interval) TIMESTAMP 返回窗口的结束时间(包含边界)。例如[00:00, 00:15]窗口,返回00:15。
TUMBLE_ROWTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一个rowtime attribute,即可以基于该字段做时间属性的操作,例如,级联窗口只能用在基于Event Time的Window上,详情请参见级联窗口。
TUMBLE_PROCTIME(time-attr, size-interval) TIMESTAMP(rowtime-attr) 返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一个Proctime Attribute,即可以基于该字段做时间属性的操作。例如,级联窗口只能用在基于Processing Time的Window上,详情请参见级联窗口。

新版本语法(简洁)

TUMBLE(TABLE data, DESCRIPTOR(timecol), size)

  • data: 是一个表参数。

  • timecol: 是一个列描述符指示应该映射到哪个时间的属性列。

  • size: 是一个持续时间指定窗口的宽度。

滑动窗口

滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。

通常,大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数(moving averages)时很实用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide为10秒,size为5分钟。下图为您展示间隔为30秒,窗口大小为1分钟的滑动窗口。

语法

下面不再介绍之前版本的写法,按1.13版本来 HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

  • data: 是一个表参数,数据表。

  • timecol: 是一个列描述符指示应该映射到哪个时间的属性列。

  • slide: 滑动时间。

  • size: 是一个持续时间指定窗口的宽度。

累积窗口函数

累积窗口在某些情况下非常有用,例如在固定的窗口间隔内早期触发的滚动窗口。例如,每日从00:00到每分钟累计计算UV值,10:00的UV值表示00:00到10:00的UV总数。这可以通过累积窗口轻松有效地实现。

累积函数将元素分配给窗口,这些窗口在初始步长间隔内覆盖行,并在每一步扩展到一个更多的步长(保持窗口开始固定),直到最大窗口大小。你可以把累积函数看作是先应用具有最大窗口大小的滚动窗口,然后把每个滚动窗口分成几个窗口,每个窗口的开始和结束都有相同的步长差。所以累积窗口确实有重叠,而且没有固定的大小

例如,你可以对1小时的步长和1天的最大长度有一个累积窗口,你就会得到窗口 [00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00)

语法

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

  • data: 是一个表参数,数据表。

  • timecol: 是一个列描述符指示应该映射到哪个时间的属性列。

  • step: 步长

  • size: 是一个持续时间指定窗口的宽度!

以此为例:数据:goods

time price item
2020-04-15 08:05 4.00 C
2020-04-15 08:07 2.00 A
2020-04-15 08:09 5.00 D
2020-04-15 08:11 3.00 B
2020-04-15 08:13 1.00 E
2020-04-15 08:17 6.00 F
SELECT window_start, window_end, SUM(price)
  FROM TABLE(
    CUMULATE(TABLE goods, DESCRIPTOR(time), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end
window_start window_end price
2020-04-15 08:00 2020-04-15 08:06 4.00
2020-04-15 08:00 2020-04-15 08:08 6.00
2020-04-15 08:00 2020-04-15 08:10 11.00
2020-04-15 08:10 2020-04-15 08:12 3.00
2020-04-15 08:10 2020-04-15 08:14 4.00
2020-04-15 08:10 2020-04-15 08:16 4.00
2020-04-15 08:10 2020-04-15 08:18 10.00
2020-04-15 08:10 2020-04-15 08:20 10.00

窗口分组聚合GROUPING SETS

窗口聚合也支持分组集语法。分组集允许更复杂的分组标准比描述所GROUP by操作。指定行分别分组,每个分组集和聚合计算每组一样简单的group by子句 与下面相同 ROLLUP 汇总是一种速记符号用于指定一个常见类型的分组集。它代表给定的表达式和所有前缀列表的列表,包括空列表。      窗口与汇总需要聚合window_start和window_end列必须在GROUP BY子句,但不是在ROLLUP中条款。

val kafka_sql=
      """
        |CREATE TABLE goods (
        |  item_id VARCHAR,
        |  item_type VARCHAR,
        |  event_time varchar,
        |  on_sell_time AS TO_TIMESTAMP(event_time),
        |  price DOUBLE,
        |  WATERMARK FOR on_sell_time AS on_sell_time - INTERVAL '1' SECOND
        |) WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'testtopic',
        |  'properties.bootstrap.servers' = 'xxxxxx',
        |  'format' = 'json'
        |)
        |""".stripMargin
    tableEnv.executeSql(kafka_sql)

    val query=
      """
        |SELECT window_start, window_end, item_type, SUM(price) as price
        |  FROM TABLE(
        |    TUMBLE(TABLE goods, DESCRIPTOR(on_sell_time), INTERVAL '5' MINUTES))
        |  GROUP BY window_start, window_end, GROUPING SETS ((item_type), ())
        |""".stripMargin
        
        或者
       
      val query=
      """
        |SELECT window_start, window_end, item_type, SUM(price) as price
        |  FROM TABLE(
        |    TUMBLE(TABLE goods, DESCRIPTOR(on_sell_time), INTERVAL '5' MINUTES))
        |  GROUP BY window_start, window_end, ROLLUP (item_type)
        |""".stripMargin


tableEnv.executeSql(query).print()
        

数据源write(item)方法后发送kafka的消息

def test_over_window(): Array[AnyRef] ={
    Array(
      Map(
        "item_id"->"ITEM001",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:01:00",
        "price"->20
      ),
      Map(
        "item_id"->"ITEM002",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:02:00",
        "price"->50
      ),
      Map(
        "item_id"->"ITEM003",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:03:00",
        "price"->30
      ),
      Map(
        "item_id"->"ITEM004",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:04:00",
        "price"->60
      ),
      Map(
        "item_id"->"ITEM005",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:05:00",
        "price"->40
      ),
      Map(
        "item_id"->"ITEM006",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:06:00",
        "price"->20
      ),
      Map(
        "item_id"->"ITEM007",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:07:00",
        "price"->10
      ),
      Map(
        "item_id"->"ITEM008",
        "item_type"->"orange",
        "event_time"->"2020-04-15 10:08:00",
        "price"->20
      ),
      Map(
        "item_id"->"ITEM009",
        "item_type"->"orange",
        "event_time"->"2020-04-15 10:09:00",
        "price"->21
      ),
      Map(
        "item_id"->"ITEM010",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:10:00",
        "price"->22
      ),
      Map(
        "item_id"->"ITEM011",
        "item_type"->"orange",
        "event_time"->"2020-04-15 10:11:00",
        "price"->26
      )
    )
  }

结果:

window_start window_end item_type price
2020-04-15 10:00:00.000 2020-04-15 10:05:00.000 apple 160.0
2020-04-15 10:00:00.000 2020-04-15 10:05:00.000 (NULL) 160.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 apple 70.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 orange 41.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 (NULL) 111.0

CUBE

CUBE是一种用于指定公共分组集类型的简写符号。它表示给定的列表及其所有可能的子集——幂集

def cube(): Unit ={
    val kafka_sql=
      """
        |CREATE TABLE goods (
        |  item_id VARCHAR,
        |  item_type VARCHAR,
        |  event_time varchar,
        |  on_sell_time AS TO_TIMESTAMP(event_time),
        |  price DOUBLE,
        |  WATERMARK FOR on_sell_time AS on_sell_time - INTERVAL '1' SECOND
        |) WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'testtopic',
        |  'properties.bootstrap.servers' = 'xxxxx',
        |  'format' = 'json'
        |)
        |""".stripMargin
    tableEnv.executeSql(kafka_sql)

    val query=
      """
        |SELECT window_start, window_end,item_id, item_type, SUM(price) as price
        |  FROM TABLE(
        |    TUMBLE(TABLE goods, DESCRIPTOR(on_sell_time), INTERVAL '5' MINUTES))
        |  GROUP BY window_start, window_end, CUBE(item_id,item_type)
        |""".stripMargin

    tableEnv.executeSql(query).print()
    
    }

数据源write(item)方法后发送kafka的消息

def test_clue(): Array[AnyRef] ={
    Array(
      Map(
        "item_id"->"ITEM001",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:01:00",
        "price"->20
      ),
      Map(
        "item_id"->"ITEM001",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:02:00",
        "price"->50
      ),
      Map(
        "item_id"->"ITEM001",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:03:00",
        "price"->30
      ),
      Map(
        "item_id"->"ITEM001",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:04:00",
        "price"->60
      ),
      Map(
        "item_id"->"ITEM001",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:05:00",
        "price"->40
      ),
      Map(
        "item_id"->"ITEM002",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:06:00",
        "price"->20
      ),
      Map(
        "item_id"->"ITEM002",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:07:00",
        "price"->10
      ),
      Map(
        "item_id"->"ITEM002",
        "item_type"->"orange",
        "event_time"->"2020-04-15 10:08:00",
        "price"->20
      ),
      Map(
        "item_id"->"ITEM002",
        "item_type"->"orange",
        "event_time"->"2020-04-15 10:09:00",
        "price"->21
      ),
      Map(
        "item_id"->"ITEM02",
        "item_type"->"apple",
        "event_time"->"2020-04-15 10:10:00",
        "price"->22
      ),
      Map(
        "item_id"->"ITEM02",
        "item_type"->"orange",
        "event_time"->"2020-04-15 10:11:00",
        "price"->26
      )
    )
  }

结果:

window_start window_end item_id item_type price
2020-04-15 10:00:00.000 2020-04-15 10:05:00.000 (NULL) apple 140.0
2020-04-15 10:00:00.000 2020-04-15 10:05:00.000 ITEM001 apple 140.0
2020-04-15 10:00:00.000 2020-04-15 10:05:00.000 ITEM001 (NULL) 140.0
2020-04-15 10:00:00.000 2020-04-15 10:05:00.000 (NULL) (NULL) 140.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 (NULL) apple 70.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 ITEM001 apple 40.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 ITEM001 (NULL) 40.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 ITEM002 orange 41.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 (NULL) orange 41.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 ITEM002 (NULL) 71.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 (NULL) (NULL) 111.0
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 ITEM002 apple 30.0

over函数

以窗口Top N为例子

 def top_n: Unit ={
    val kafka_sql=
      """
        |CREATE TABLE goods (
        |  item_id VARCHAR,
        |  item_type VARCHAR,
        |  event_time varchar,
        |  on_sell_time AS TO_TIMESTAMP(event_time),
        |  price DOUBLE,
        |  WATERMARK FOR on_sell_time AS on_sell_time - INTERVAL '1' SECOND
        |) WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'testtopic',
        |  'properties.bootstrap.servers' = 'xxxxx',
        |  'format' = 'json'
        |)
        |""".stripMargin
    tableEnv.executeSql(kafka_sql)

    val query=
      """
        |SELECT *
        |  FROM (
        |    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
        |    FROM (
        |      SELECT window_start, window_end, item_type, SUM(price) as price, COUNT(*) as cnt
        |      FROM TABLE(
        |        TUMBLE(TABLE goods, DESCRIPTOR(on_sell_time), INTERVAL '5' MINUTES))
        |      GROUP BY window_start, window_end, item_type
        |    )
        |  ) WHERE rownum <= 2
        |""".stripMargin
    tableEnv.executeSql(query).print()
    
    }

数据源也就是上面的test_over_window通过write(item)方法后发送kafka的消息

结果:

window_start window_end item_type price cnt rownum
2020-04-15 10:00:00.000 2020-04-15 10:05:00.000 apple 160.0 4 1
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 apple 70.0 3 1
2020-04-15 10:05:00.000 2020-04-15 10:10:00.000 orange 41.0 2 2

总结:

在Flink SQL窗口中:累积窗口函数, 窗口分组聚合GROUPING SETS,Clue幂集函数, Over函数这些给人眼前一亮,的确是减少了开发工作量。但是需要开发工作者灵活运行才会发挥价值。知识无止境,人生也无止境。

  

大数据左右手

技术如同手中的水有了生命似的,汇聚在了一起。作为大数据开发工作者,致力于大数据技术的学习与工作,分享大数据原理、架构、实时、离线、面试与总结,分享生活思考与读书见解。总有适合你的那一篇。

关注公众号!!!

和我联系吧,加群交流大数据知识,一起成长~~~