本文目录简介
-
介绍
-
计划器
-
Flink与Blink流批环境
-
表
-
支持连接资源
-
创建数据源表
-
创建数据结果表
-
创建数据维表
-
小案例
介绍
Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。
从1.9开始,Flink 提供了两个 Table Planner 实现来执行 Table API 和 SQL 程序:Blink Planner和Old Planner,Old Planner 在1.9之前就已经存在了 Planner 的作用主要是把关系型的操作翻译成可执行的、经过优化的 Flink 任务。两种 Planner 所使用的优化规则以及运行时类都不一样。它们在支持的功能上也有些差异。
两种计划器
-
Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。
-
Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。
-
旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至 FilterableTableSource,而 Blink 计划器则是将 Expression 下推。
-
基于字符串的键值配置选项仅在 Blink 计划器中使用。
-
PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。
-
Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
-
旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
1.13版本Maven资源
<properties>
<flink.version>1.13.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-format-changelog-json</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
环境:TableEnvironment
功能
-
在内部的 catalog 中注册 Table
-
注册外部的 catalog
-
加载可插拔模块
-
执行 SQL 查询
-
注册自定义函数 (scalar、table 或 aggregation)
-
将 DataStream 或 DataSet 转换成 Table
-
持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
Flink与Blink流批环境
流处理
-
Flink Stream Table Environment
def flink_streaming_env(): StreamTableEnvironment ={
val fsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(fsEnv, fsSettings)
fsTableEnv
}
-
Blink Stream Table Environment
def blink_streaming_env(): StreamTableEnvironment ={
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
bsTableEnv
}
批处理
-
Flink Batch Table Environment
def flink_batch_env(): BatchTableEnvironment ={
val fbEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(fbEnv)
fbTableEnv
}
-
Blink Batch Table Environment
def blink_batch_env(): TableEnvironment ={
val bbSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv: TableEnvironment = TableEnvironment.create(bbSettings)
bbTableEnv
}
数据类型
时间格式的较多,当然也有自定义的数据类型
SQL | |
---|---|
CHAR | CHAR/CHAR(n) |
VARCHAR / STRING | VARCHAR/VARCHAR(n)/STRING |
BINARY | BINARY/BINARY(n) |
VARBINARY / BYTES | VARBINARY/VARBINARY(n)/BYTES |
DECIMAL | DECIMAL/DECIMAL(p)/DECIMAL(p, s)/DEC/DEC(p)/DEC(p, s)/NUMERIC/NUMERIC(p)/NUMERIC(p, s) |
TINYINT | TINYINT |
INT | INT/INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE/DOUBLE PRECISION |
DATE | DATE |
TIME | TIME/TIME(p) |
TIMESTAMP | TIMESTAMP/TIMESTAMP(p)/TIMESTAMP WITHOUT TIME ZONE/TIMESTAMP(p) WITHOUT TIME ZONE |
TIMESTAMP WITH TIME ZONE | TIMESTAMP WITH TIME ZONE/TIMESTAMP(p) WITH TIME ZONE |
TIMESTAMP_LTZ | TIMESTAMP_LTZ/TIMESTAMP_LTZ(p)/TIMESTAMP WITH LOCAL TIME ZONE/TIMESTAMP(p) WITH LOCAL TIME ZONE |
INTERVAL YEAR TO MONTH | INTERVAL YEAR/INTERVAL YEAR(p)/INTERVAL YEAR(p) TO MONTH/INTERVAL MONTH |
INTERVAL DAY TO SECOND | INTERVAL DAY/INTERVAL DAY(p1)/INTERVAL DAY(p1) TO HOUR/INTERVAL DAY(p1) TO MINUTE/INTERVAL DAY(p1) TO SECOND(p2)/INTERVAL HOUR/INTERVAL HOUR TO MINUTE/INTERVAL HOUR TO SECOND(p2)/INTERVAL MINUTE/INTERVAL MINUTE TO SECOND(p2)/INTERVAL SECOND/INTERVAL SECOND(p2) |
ARRAY | ARRAY< t>/ t ARRAY |
MAP | MAP<kt, vt> |
MULTISET | MULTISET < t>/t MULTISET |
ROW | ROW<n0 t0, n1 t1, ...> |
BOOLEAN | BOOLEAN |
NULL | NULL |
创建表
临时表(Temporary Table)
临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。
val sql=
"""
|CREATE TEMPORARY TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| 'connector' = 'xxxx'
|)
|""".stripMargin
永久表(Permanent Table)
永久表需要 catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除
val sql=
"""
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| 'connector' = 'xxxx'
|)
|""".stripMargin
支持的连接资源一览
Name | Version | Source | Sink |
---|---|---|---|
Filesystem | Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink | ||
Elasticsearch | 6.x & 7.x | Not supported | Streaming Sink, Batch Sink |
Apache Kafka | 0.10+ | Unbounded Scan | Streaming Sink, Batch Sink |
Amazon Kinesis Data Streams | Unbounded Scan | Streaming Sink | |
JDBC | Bounded Scan, Lookup | Streaming Sink, Batch Sink | |
Apache HBase | 1.4.x & 2.2.x Bounded Scan, Lookup | Streaming Sink, Batch Sink | |
Apache Hive | Supported Versions | Unbounded Scan, Bounded Scan, Lookup | Streaming Sink, Batch Sink |
注意:数据资源如果没有开发需要的?别着急,可以自定义
创建数据源表
环境初始化
// flink环境
private var bsEnv: StreamExecutionEnvironment = _
// 流处理table环境
private var tableEnv: StreamTableEnvironment = _
// 批处理table环境
private var tableBatchEnv: TableEnvironment = _
def init_environment(): Unit ={
bsEnv=StreamExecutionEnvironment.getExecutionEnvironment
// 流处理
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
tableEnv=StreamTableEnvironment.create(bsEnv, bsSettings)
// 批处理
val bbSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
tableBatchEnv=TableEnvironment.create(bbSettings)
}
CSV
def fromFilesystemWithCSV(): Unit ={
val sql=
"""
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| 'connector' = 'filesystem',
| 'path' = 'xxx\xx\something.csv',
| 'format' = 'csv'
|)
|""".stripMargin
tableBatchEnv.executeSql(sql)
tableBatchEnv.executeSql("select * from users").print()
}
Kafka
-
kafka基本
// kafka基础
def fromKafkaWithBasic(): Unit ={
val kafka_sql=
"""
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'testtopic',
| 'properties.bootstrap.servers' = 'xxxxxxxx',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(kafka_sql)
tableEnv.executeSql("select * from users").print()
}
-
可用的kafka元数据操作
def fromKafkaWithMetadata(): Unit ={
val kafka_with_metadata=
"""
|CREATE TABLE users (
| event_time TIMESTAMP(3) METADATA FROM 'timestamp',
| partition BIGINT METADATA VIRTUAL,
| offset BIGINT METADATA VIRTUAL,
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'testtopic',
| 'properties.group.id' = 'testGroup',
| 'properties.bootstrap.servers' = 'xxxxxxx',
| 'format' = 'json'
|)
|""".stripMargin
tableEnv.executeSql(kafka_with_metadata)
tableEnv.executeSql("select partition,offset,id,age,name from users").print()
}
元信息列
您可以在源表中定义元信息列,以获取Kafka消息的元信息。例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元信息列,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据
Key | 数据类型 | 说明 |
---|---|---|
topic | STRING NOT NULL METADATA VIRTUAL | Kafka消息所在的Topic名称。 |
partition | INT NOT NULL METADATA VIRTUAL | Kafka消息所在的Partition ID。 |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Kafka消息的消息头(header)。 |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Kafka消息的Leader epoch。 |
offset | BIGINT NOT NULL METADATA VIRTUAL | Kafka消息的偏移量(offest)。 |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Kafka消息的时间戳。 |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Kafka消息的时间戳类型:NoTimestampType:消息中没有定义时间戳CreateTime:消息产生的时间。LogAppendTime:消息被添加到Kafka Broker的时间。 |
WITH参数
参数 | 说明 | 是否必选 | 数据类型 |
---|---|---|---|
connector | 源表类型。 | 是 | String |
topic | topic名称。 | 是 | String |
topic-pattern | 匹配读取topic名称的正则表达式。所有匹配该正则表达式的topic在作业运行时均会被订阅。 | 否 | String |
properties.bootstrap.servers | Kafka Broker地址。 | 是 | String |
properties.group.id | Kafka消费组ID。 | 是 | String |
properties.* | Kafka配置。 | 否 | String |
format | Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 | 是 | String |
value.format | Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 | 是 | String |
key.format | 反序列化Kafka消息键(key)时使用的格式。 | 否 | String |
key.fields | Kafka消息键(key)解析出来的数据存放的字段。 | 否 | String |
key.fields-prefix | 为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 | 否 | String |
value.fields-include | 在解析消息体时,是否要包含消息键字段。 | 否 | String |
scan.startup.mode | Kafka读取数据的启动位点。 | 否 | String |
scan.startup.specific-offsets | 在specific-offsets启动模式下,指定每个分区的启动偏移量。 | 否 | String |
scan.startup.timestamp-millis | 在timestamp启动模式下,指定启动位点时间戳。 | 否 | Long |
以上以kafka和filesystem为例,其他可以参考官网
创建数据结果表
create table kafka_sink(
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) with (
'connector' = 'kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'format' = 'csv'
)
WITH参数
参数 | 说明 | 是否必选 | 数据类型 |
---|---|---|---|
connector | 结果表类型 | 是 | STRING |
topic | 结果表对应的Topic | 是 | STRING |
properties.bootstrap.servers | Kafka Broker地址 | 是 | STRING |
format | Flink Kafka Connector在反序列化来自Kafka的消息时使用的格式。 | 是 | STRING |
sink.partitioner | 从Flink分区到Kafka分区的映射模式。 | 否 | STRING |
小案例从csv文件读取落到mysql
def fromCSVToJDBC(): Unit ={
val sql_csv=
"""
|CREATE TABLE person (
| id BIGINT,
| name STRING,
| age INT
|) WITH (
| 'connector' = 'filesystem',
| 'path' = 'xxx\xxx\something.csv',
| 'format' = 'csv'
|)
|""".stripMargin
tableBatchEnv.executeSql(sql_csv)
val sql=
"""
|CREATE TABLE users (
| id BIGINT,
| name STRING,
| age INT,
| PRIMARY KEY (id) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'username' = 'root',
| 'password' = 'root',
| 'url' = 'jdbc:mysql://localhost:3306/test?useUnicode=yes&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC',
| 'driver'='com.mysql.cj.jdbc.Driver',
| 'table-name' = 'users'
|)
|""".stripMargin
tableBatchEnv.executeSql(sql)
tableBatchEnv.executeSql("insert into users select id,name,age from person")
}
创建数据维表
CREATE TABLE filesystem_dim (
id STRING,
name STRING
) WITH (
'connector' = 'filesystem',
'path' = 'csv/json/avro/parquet/orc/raw',
'format' = 'CSV'
)
WITH 参数
参数 | 说明 | 是否必选 | 备注 |
---|---|---|---|
connector | 维表类型 | 是 | 固定值为filesystem。 |
path | 文件路径 | 是 | URI格式,例如:oss://my_path/my_file。 |
format | 文件格式 | 是 | 参数取值如下:CSV/JSON/AVRO/PARQUET/ORC/RAW |
lookup.join.cache.ttl | 重新读取数据的TTL时间 | 否 | 默认值为60分钟,即每隔60分钟重新读取数据。 |
代码示例
--创建event源表
CREATE TABLE event (
id STRING,
data STRING
) WITH (
'connector' = 'datahub'
...
);
--创建white_list维表。
CREATE TABLE white_list (
id STRING,
name STRING,
) WITH (
'connector' = 'filesystem',
'path' = '${remote_path_uri}',
'format' = '${format}'
);
--关联event源表和white_list维表。
SELECT e.*, w.*
FROM event AS e
JOIN white_list FOR SYSTEM_TIME AS OF proctime() AS w
ON e.id = w.id;
本篇总结
本篇向大家介绍了 Flink SQL的入门简介,并没有细究原理与实现逻辑。就是先让大家对Flink SQL先有一个直观的体验。先有兴趣才有深究的动力,未完待续......
大数据左右手
技术如同手中的水有了生命似的,汇聚在了一起。作为大数据开发工作者,致力于大数据技术的学习与工作,分享大数据原理、架构、实时、离线、面试与总结,分享生活思考与读书见解。总有适合你的那一篇。
关注公众号!!!
和我联系吧,加群交流大数据知识,一起成长~~~