本文目录简介

  • 介绍

  • 计划器

  • Flink与Blink流批环境

  • 支持连接资源

  • 创建数据源表

  • 创建数据结果表

  • 创建数据维表

  • 小案例

介绍

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。

从1.9开始,Flink 提供了两个 Table Planner 实现来执行 Table API 和 SQL 程序:Blink Planner和Old Planner,Old Planner 在1.9之前就已经存在了 Planner 的作用主要是把关系型的操作翻译成可执行的、经过优化的 Flink 任务。两种 Planner 所使用的优化规则以及运行时类都不一样。它们在支持的功能上也有些差异。

两种计划器

  • Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。

  • Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。

  • 旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。

  • 基于字符串的键值配置选项仅在 Blink 计划器中使用。

  • PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。

  • Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。

  • 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。

1.13版本Maven资源

    <properties>
        <flink.version>1.13.0</flink.version>
    </properties>

    <dependencies>
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-format-changelog-json</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

环境:TableEnvironment

功能

  1. 在内部的 catalog 中注册 Table

  2. 注册外部的 catalog

  3. 加载可插拔模块

  4. 执行 SQL 查询

  5. 注册自定义函数 (scalar、table 或 aggregation)

  6. 将 DataStream 或 DataSet 转换成 Table

  7. 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Flink与Blink流批环境

流处理

  • Flink Stream Table Environment

  def flink_streaming_env(): StreamTableEnvironment ={
    val fsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
    val fsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val fsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(fsEnv, fsSettings)
    fsTableEnv
  }
  • Blink Stream Table Environment

  def blink_streaming_env(): StreamTableEnvironment ={
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
    bsTableEnv
  }

批处理

  • Flink Batch Table Environment

  def flink_batch_env(): BatchTableEnvironment ={
    val fbEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val fbTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(fbEnv)
    fbTableEnv
  }
  • Blink Batch Table Environment

  def blink_batch_env(): TableEnvironment ={
    val bbSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
    val bbTableEnv: TableEnvironment = TableEnvironment.create(bbSettings)
    bbTableEnv
  }

数据类型

时间格式的较多,当然也有自定义的数据类型

SQL
CHAR CHAR/CHAR(n)
VARCHAR / STRING VARCHAR/VARCHAR(n)/STRING
BINARY BINARY/BINARY(n)
VARBINARY / BYTES VARBINARY/VARBINARY(n)/BYTES
DECIMAL DECIMAL/DECIMAL(p)/DECIMAL(p, s)/DEC/DEC(p)/DEC(p, s)/NUMERIC/NUMERIC(p)/NUMERIC(p, s)
TINYINT TINYINT
INT INT/INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE/DOUBLE PRECISION
DATE DATE
TIME TIME/TIME(p)
TIMESTAMP TIMESTAMP/TIMESTAMP(p)/TIMESTAMP WITHOUT TIME ZONE/TIMESTAMP(p) WITHOUT TIME ZONE
TIMESTAMP WITH TIME ZONE TIMESTAMP WITH TIME ZONE/TIMESTAMP(p) WITH TIME ZONE
TIMESTAMP_LTZ TIMESTAMP_LTZ/TIMESTAMP_LTZ(p)/TIMESTAMP WITH LOCAL TIME ZONE/TIMESTAMP(p) WITH LOCAL TIME ZONE
INTERVAL YEAR TO MONTH INTERVAL YEAR/INTERVAL YEAR(p)/INTERVAL YEAR(p) TO MONTH/INTERVAL MONTH
INTERVAL DAY TO SECOND INTERVAL DAY/INTERVAL DAY(p1)/INTERVAL DAY(p1) TO HOUR/INTERVAL DAY(p1) TO MINUTE/INTERVAL DAY(p1) TO SECOND(p2)/INTERVAL HOUR/INTERVAL HOUR TO MINUTE/INTERVAL HOUR TO SECOND(p2)/INTERVAL MINUTE/INTERVAL MINUTE TO SECOND(p2)/INTERVAL SECOND/INTERVAL SECOND(p2)
ARRAY ARRAY< t>/ t ARRAY
MAP MAP<kt, vt>
MULTISET MULTISET < t>/t MULTISET
ROW ROW<n0 t0, n1 t1, ...>
BOOLEAN BOOLEAN
NULL NULL

创建表

临时表(Temporary Table)

临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。

val sql=
      """
        |CREATE TEMPORARY TABLE users (
        |    id BIGINT,
        |    name STRING,
        |    age INT
        |) WITH (
        |    'connector' = 'xxxx'
        |)
        |""".stripMargin

永久表(Permanent Table)

永久表需要 catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除

val sql=
      """
        |CREATE TABLE users (
        |    id BIGINT,
        |    name STRING,
        |    age INT
        |) WITH (
        |    'connector' = 'xxxx'
        |)
        |""".stripMargin

支持的连接资源一览

Name Version Source Sink
Filesystem Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink
Elasticsearch 6.x & 7.x Not supported Streaming Sink, Batch Sink
Apache Kafka 0.10+ Unbounded Scan Streaming Sink, Batch Sink
Amazon Kinesis Data Streams Unbounded Scan Streaming Sink
JDBC Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache HBase 1.4.x & 2.2.x Bounded Scan, Lookup Streaming Sink, Batch Sink
Apache Hive Supported Versions Unbounded Scan, Bounded Scan, Lookup Streaming Sink, Batch Sink

注意:数据资源如果没有开发需要的?别着急,可以自定义

创建数据源表

环境初始化

  // flink环境
  private var bsEnv: StreamExecutionEnvironment = _
  // 流处理table环境
  private var tableEnv: StreamTableEnvironment = _
  // 批处理table环境
  private var tableBatchEnv: TableEnvironment = _

  def init_environment(): Unit ={
    bsEnv=StreamExecutionEnvironment.getExecutionEnvironment
    // 流处理
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    tableEnv=StreamTableEnvironment.create(bsEnv, bsSettings)
    // 批处理
    val bbSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
    tableBatchEnv=TableEnvironment.create(bbSettings)
  }

CSV

 def fromFilesystemWithCSV(): Unit ={
    val sql=
      """
        |CREATE TABLE users (
        |    id BIGINT,
        |    name STRING,
        |    age INT
        |) WITH (
        |    'connector' = 'filesystem',
        |    'path' = 'xxx\xx\something.csv',
        |    'format' = 'csv'
        |)
        |""".stripMargin

    tableBatchEnv.executeSql(sql)
    tableBatchEnv.executeSql("select * from users").print()
  }

Kafka

  • kafka基本

  // kafka基础
  def fromKafkaWithBasic(): Unit ={
    val kafka_sql=
      """
        |CREATE TABLE users (
        |  id BIGINT,
        |  name STRING,
        |  age INT
        |) WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'testtopic',
        |  'properties.bootstrap.servers' = 'xxxxxxxx',
        |  'format' = 'json'
        |)
        |""".stripMargin
    tableEnv.executeSql(kafka_sql)
    tableEnv.executeSql("select * from users").print()
  }
  • 可用的kafka元数据操作

  def fromKafkaWithMetadata(): Unit ={
   val kafka_with_metadata=
      """
        |CREATE TABLE users (
        |  event_time TIMESTAMP(3) METADATA FROM 'timestamp',
        |  partition BIGINT METADATA VIRTUAL,
        |  offset BIGINT METADATA VIRTUAL,
        |  id BIGINT,
        |  name STRING,
        |  age INT
        |) WITH (
        |  'connector' = 'kafka',
        |  'topic' = 'testtopic',
        |  'properties.group.id' = 'testGroup',
        |  'properties.bootstrap.servers' = 'xxxxxxx',
        |  'format' = 'json'
        |)
        |""".stripMargin

    tableEnv.executeSql(kafka_with_metadata)
    tableEnv.executeSql("select partition,offset,id,age,name from users").print()
    }

元信息列

您可以在源表中定义元信息列,以获取Kafka消息的元信息。例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元信息列,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据

Key 数据类型 说明
topic STRING NOT NULL METADATA VIRTUAL Kafka消息所在的Topic名称。
partition INT NOT NULL METADATA VIRTUAL Kafka消息所在的Partition ID。
headers MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL Kafka消息的消息头(header)。
leader-epoch INT NOT NULL METADATA VIRTUAL Kafka消息的Leader epoch。
offset BIGINT NOT NULL METADATA VIRTUAL Kafka消息的偏移量(offest)。
timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL Kafka消息的时间戳。
timestamp-type STRING NOT NULL METADATA VIRTUAL Kafka消息的时间戳类型:NoTimestampType:消息中没有定义时间戳CreateTime:消息产生的时间。LogAppendTime:消息被添加到Kafka Broker的时间。

WITH参数

参数 说明 是否必选 数据类型
connector 源表类型。 String
topic topic名称。 String
topic-pattern 匹配读取topic名称的正则表达式。所有匹配该正则表达式的topic在作业运行时均会被订阅。 String
properties.bootstrap.servers Kafka Broker地址。 String
properties.group.id Kafka消费组ID。 String
properties.* Kafka配置。 String
format Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 String
value.format Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 String
key.format 反序列化Kafka消息键(key)时使用的格式。 String
key.fields Kafka消息键(key)解析出来的数据存放的字段。 String
key.fields-prefix 为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 String
value.fields-include 在解析消息体时,是否要包含消息键字段。 String
scan.startup.mode Kafka读取数据的启动位点。 String
scan.startup.specific-offsets 在specific-offsets启动模式下,指定每个分区的启动偏移量。 String
scan.startup.timestamp-millis 在timestamp启动模式下,指定启动位点时间戳。 Long

以上以kafka和filesystem为例,其他可以参考官网

创建数据结果表

create table kafka_sink(  
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)        
) with (
  'connector' = 'kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'format' = 'csv'
)

WITH参数

参数 说明 是否必选 数据类型
connector 结果表类型 STRING
topic 结果表对应的Topic STRING
properties.bootstrap.servers Kafka Broker地址 STRING
format Flink Kafka Connector在反序列化来自Kafka的消息时使用的格式。 STRING
sink.partitioner 从Flink分区到Kafka分区的映射模式。 STRING

小案例从csv文件读取落到mysql

def fromCSVToJDBC(): Unit ={

    val sql_csv=
      """
        |CREATE TABLE person (
        |    id BIGINT,
        |    name STRING,
        |    age INT
        |) WITH (
        |    'connector' = 'filesystem',
        |    'path' = 'xxx\xxx\something.csv',
        |    'format' = 'csv'
        |)
        |""".stripMargin

    tableBatchEnv.executeSql(sql_csv)

    val sql=
      """
        |CREATE TABLE users (
        |  id BIGINT,
        |  name STRING,
        |  age INT,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'username' = 'root',
        |   'password' = 'root',
        |   'url' = 'jdbc:mysql://localhost:3306/test?useUnicode=yes&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC',
        |   'driver'='com.mysql.cj.jdbc.Driver',
        |   'table-name' = 'users'
        |)
        |""".stripMargin

    tableBatchEnv.executeSql(sql)
    tableBatchEnv.executeSql("insert into users select id,name,age from person")
  }

创建数据维表

CREATE TABLE filesystem_dim (
  id STRING,
  name STRING
) WITH (
  'connector' = 'filesystem',
  'path' = 'csv/json/avro/parquet/orc/raw',
  'format' = 'CSV' 
)

WITH 参数

参数 说明 是否必选 备注
connector 维表类型 固定值为filesystem。
path 文件路径 URI格式,例如:oss://my_path/my_file。
format 文件格式 参数取值如下:CSV/JSON/AVRO/PARQUET/ORC/RAW
lookup.join.cache.ttl 重新读取数据的TTL时间 默认值为60分钟,即每隔60分钟重新读取数据。

代码示例

--创建event源表
CREATE TABLE event (
  id STRING, 
  data STRING
) WITH (
  'connector' = 'datahub'
   ...
);

--创建white_list维表。
CREATE TABLE white_list (
  id STRING,
  name STRING,
) WITH (
  'connector' = 'filesystem',
  'path' = '${remote_path_uri}',
  'format' = '${format}'
);

--关联event源表和white_list维表。
SELECT e.*, w.*
FROM event AS e
JOIN white_list FOR SYSTEM_TIME AS OF proctime() AS w
ON e.id = w.id;

本篇总结

本篇向大家介绍了 Flink SQL的入门简介,并没有细究原理与实现逻辑。就是先让大家对Flink SQL先有一个直观的体验。先有兴趣才有深究的动力,未完待续......

 

大数据左右手

技术如同手中的水有了生命似的,汇聚在了一起。作为大数据开发工作者,致力于大数据技术的学习与工作,分享大数据原理、架构、实时、离线、面试与总结,分享生活思考与读书见解。总有适合你的那一篇。

关注公众号!!!

和我联系吧,加群交流大数据知识,一起成长~~~