开发的过程也是探索和学习的过程,一个问题可以揪出背后众多知识。回头想想,这些问题早在官网给出了答案。有时候会想,始终是一种由发现问题到寻求解决问题之路的一种驱动,而不是一种系统学习后把控全局地去实现或者开发。所以学习的某一种过程是与bug并行,与解决问题为驱动的一种学习方式。不管哪一种过程,都是你的成长与经验的积累。你觉得呢?可评论或者留言给我~~~

问题的发现

  1. Connector Kafka创建的表

  2. Connector jdbc创建的表

  3. 1表与2表 join 可以正常join,但是checkpoint失败

「Kafka创建的表」

 s"""
       |CREATE TABLE car(
       |`id`   bigint ,
       |`user_id` bigint 
       |)
       |WITH (
       |    'connector' = 'kafka',
       |    'topic' = '$topic',
       |    'scan.startup.mode' = 'latest-offset',
       |    'properties.bootstrap.servers' = '$KAFKA_SERVICE',
       |    'properties.group.id' = 'indicator',
       |    'format' = 'canal-json'
       |)
       |""".stripMargin

「Connector jdbc创建的表」

s"""
        |CREATE TABLE users(
        |id int,
        |name string,
        |PRIMARY KEY (id) NOT ENFORCED
        |)
        |WITH (
        |'connector' = 'jdbc',
        |'url' = 'xxxx',
        |'driver'='$DRIVER_CLASS_NAME',
        |'table-name'='$tableName',
        |'lookup.cache.max-rows'='100',
        |'lookup.cache.ttl'='30s'
        |)
        |""".stripMargin

「JOIN」

        SELECT
            mc.user_id user_id,
            count(1) AS `value`
        FROM car mc
            inner join users u on mc.user_id=s.id
        group by mc.user_id

「正常结果可以出,但是checkpoint失败,意味着项目重启数据恢复不了」

Failed to trigger checkpoint for job f1d6ef37a56dd54a49d5f3961787c198 

since some tasks of job f1d6ef37a56dd54a49d5f3961787c198 has been finished,

abort the checkpoint Failure reason: Not all required tasks are currently running

「以此来引起话题Flink SQL维表Join」

维表Joins

由于维表是一张不断变化的表(静态表只是动态表的一种特例)。那如何 JOIN 一张不断变化的表呢?如果用传统的 JOIN 语法来表达维表 JOIN,是不完整的。因为维表是一直在更新变化的,如果用这个语法那么关联上的是哪个时刻的维表呢?我们是不知道的,结果是不确定的。所以 Flink SQL 的维表 JOIN 语法引入了Temporal Table 的标准语法,用来声明关联的是维表哪个时刻的快照。

「Temporal Joins」

普通关联会一直保留关联双侧的数据,数据也就会一直膨胀,直到撑爆内存导致任务失败,Temporal Join则可以定期清理过期数据,在合理的内存配置下即可避免内存溢出。

语法

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1

Event Time Temporal Join

「注:」 基于事件时间的Temporal Join,join key必须为维度表的主键,在运行时flink需要根据主键来找寻对应数据的版本。

例如,假设我们有一个订单表,每个订单都有不同货币的价格。为了将此表正确地规范化为单一货币(如USD),每个订单都需要与下订单时的适当货币兑换率相结合。

-- Create a table of orders. This is a standard
-- append-only dynamic table.
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);


CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL
    WATERMARK FOR update_time AS update_time
) WITH (
   'connector' = 'upsert-kafka',
   /* ... */
);

SELECT 
     order_id,
     price,
     currency,
     conversion_rate,
     order_time,
FROM orders
LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency

Processing Time Temporal Join

处理时间时态表连接使用处理时间属性将行与外部版本表中键的最新版本相关联。

根据定义,使用processing-time属性,连接将始终返回给定键的最新值。可以将查找表看作是一个简单的HashMap,它存储来自构建端的所有记录。这种连接的强大之处在于,当在Flink中无法将表具体化为动态表时,它允许Flink直接针对外部系统工作。

CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    `proctime` as PROCTIME()
) WITH (/* ... */);



SELECT
  o.amount, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

「查找维表join在我理解有称为 Lookup Join」

Lookup Join

JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表)。用到的语法是 Temporal Joins 的语法。

所以,对于开篇,使用话改动一下SQL可以解决问题

s"""
       |CREATE TABLE car(
       |`id`   bigint ,
       |`user_id` bigint,
       |`proctime` as PROCTIME()
       |)
       |WITH (
       |    'connector' = 'kafka',
       |    'topic' = '$topic',
       |    'scan.startup.mode' = 'latest-offset',
       |    'properties.bootstrap.servers' = '$KAFKA_SERVICE',
       |    'properties.group.id' = 'indicator',
       |    'format' = 'canal-json'
       |)
       |""".stripMargin
    SELECT
        mc.user_id user_id,
        count(1) AS `value`
    FROM car mc
        inner join users FOR SYSTEM_TIME AS OF mc.proctime as u on mc.user_id=s.id
    group by mc.user_id

开发过程中遇到的一些疑问概念与参数理解

缓存机制

每处理一条流里的消息,都要到数据库里查询维表,而维表一般存在第三方数据库,这就导致每次都要有远程请求,特别是数据流大的情况下,频繁的维表查询,也会对外部数据库造成很大压力、降低整体吞吐,所以对维度表进行缓存不失为一个好的策略。但是用缓存也有个潜在的风险:缓存里的数据有可能不是最新的,这要在性能和正确性之间做权衡。

「如果两个参数任意一个设置成-1,那就会禁用查询缓存」

connector.lookup.cache.max-rows:缓存最大的记录条数,默认-1

connector.lookup.cache.ttl:缓存失效时间,默认-1

标量函数

标量函数用于对传递给它的一个或者多个参数值进行处理和计算,并返回一个单一的值。

就是所谓的标量函数(UDF)

package com.lcb.mars.flinksql.scala.function

import org.apache.flink.table.functions.ScalarFunction

/**
 * @Desc 寻找字符串是否在set中
 * @Date 2021/8/18 16:31
 */
class FindInSetFunction extends ScalarFunction{
  def eval(str:String,strlist:String): Boolean ={
    if (strlist.isEmpty || str.isEmpty) {
      return false
    }
    val strings: Array[String] = strlist.split(",")
    if(strings.length>0){
      return strings.contains(str)
    }
    false
  }
}

Upsert Kafka

Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。

作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

「容错的参数」

flag whether to fail if a field is missing&nbs***bsp;not,'false' by default

'value.json.fail-on-missing-field' = 'true', 
'value.json.ignore-parse-errors' = 'true'

  
skip fields and rows with parse errors instead of failing;

'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',

'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false', 

其他参数

value.fields-include

可选,默认为ALL。控制key字段是否出现在 value 中。当取ALL时,表示消息的 value 部分将包含 schema

中所有的字段,包括定义为主键的字段。当取EXCEPT_KEY时,表示记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外


key.fields-prefix

可选。为了避免与value字段命名冲突,为key字段添加一个自定义前缀。默认前缀为

空。一旦指定了key字段的前缀,必须在DDL中指明前缀的名称,但是在构建key的序列化数据类型时,将移除该前缀。见下面的示例。

在需要注意的是:使用该配置属性,value.fields-include的值必须为EXCEPT_KEY



PRIMARY KEY (id) NOT ENFORCED

flink在upsert操作的时候, 是需要指定主键的, 否则会出现 requires that Table has a full primary keys if it is updated.

Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
 at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)

「意思是声明id为主键, 但是不做强检验」

PRIMARY KEY (id) NOT ENFORCED

UpsertStreamTableSink 需要上游的 Query 有完整的 Primary Key 信息,不然就直接抛异常。这个现象涉及到 Flink 的 UpsertStreamTableSink 机制。顾名思义,它是一个更新的 Sink,需要按 Key 来更新,所以必须要有 Key 信息。

之前是通过Query中自动推算出来PK,现在呢?而是完全依赖 Create table 语法。比如 Create 一个 jdbc_table,需要在定义中显式地写好 Primary Key(后面 NOT ENFORCED 的意思是不强校验,因为 Connector 也许没有具备 PK 的强校验的能力)。当指定了 PK,就相当于就告诉框架这个 Sink 会按照对应的 Key 来进行更新。

CREATE TABLE sink_table (    id BIGINT,
    ...
    PRIMARY KEY (id) NOT ENFORCED
    )