前言

在使用FlinkSQL考虑到做指标统计,心中疑问好多:

状态能恢复吗?

状态数据越来越大怎么办,怎么去清除之前数据?

我怎么按天去统计数据?

如果遇到主表要join很早的数据怎么办?......

心中想到这些问题,一万个xxx而过。所以需要做个热身准备,不然真的不敢上手开发业务。

  1. 验证状态数据恢复

  2. 验证状态的清除策略与时效

  3. 验证按天计算数据

  4. 遇到Join表怎么办?(其他方案替代)

  5. 「附带验证过程的」Bug记录

如果其他疑点需要验证或者调研的,可以评论或者留言给我。欢迎转发给需要的朋友......

Flink环境配置

def getTable(): StreamTableEnvironment ={
    val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    bsEnv.enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE)
    bsEnv.getCheckpointConfig.setCheckpointTimeout(60000)
    bsEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    bsEnv.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    bsEnv.setStateBackend(new EmbeddedRocksDBStateBackend)
    val dir ="file:///D:/hdfs/
    bsEnv.getCheckpointConfig.setCheckpointStorage(dir)
    val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
    bsTableEnv
  }

主体逻辑

从source kafka 到 sink kafka:做简单的pv和uv的

 val source =
      s"""
         |CREATE TABLE pageviews  (
         | user_id BIGINT,
         |  page_id BIGINT,
         |  user_region STRING
         |) WITH (
         |    'connector' = 'kafka',
         |    'topic' = 'testtopic',
         |    'properties.bootstrap.servers' = 'devcdh1:9092,devcdh2:9092,devcdh3:9092',
         |    'format' = 'json'
         |)
       """.stripMargin
       
    val sink =
      s"""
         |CREATE TABLE pageviews_per_region (
         | user_region STRING,
         |  pv BIGINT,
         |  uv BIGINT,
         |  PRIMARY KEY (user_region) NOT ENFORCED
         |) WITH (
         | 'connector' = 'upsert-kafka',
         | 'topic' = 'test',
         | 'properties.bootstrap.servers' = 'xxxx',
         | 'properties.group.id' = 'test',
         |  'key.format' = 'json',
         |  'value.format' = 'json'
         |)
       """.stripMargin
    tableEnv.executeSql(source)
    tableEnv.executeSql(sink)
    val insert =
      s"""
         |INSERT INTO pageviews_per_region
         |SELECT
         |  user_region,
         |  COUNT(*),
         |  COUNT(DISTINCT user_id)
         |FROM pageviews
         |GROUP BY user_region
       """.stripMargin
    tableEnv.executeSql(insert)
    

1. 验证状态数据恢复

往期对状态的详细解读,这里不再细说:

Flink的Checkpoint机制详解

Flink的一致性保证

source数据

>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U003"}
>{"user_id":1002,"page_id":1002,"user_region":"U003"}
>{"user_id":1002,"page_id":1002,"user_region":"U003"}
>{"user_id":1002,"page_id":1002,"user_region":"U003"}
>{"user_id":1002,"page_id":1002,"user_region":"U003"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}
>{"user_id":1002,"page_id":1002,"user_region":"U002"}

sink结果

kill前的
{"user_region":"U002","pv":1,"uv":1}
{"user_region":"U002","pv":2,"uv":1}
{"user_region":"U002","pv":3,"uv":1}
{"user_region":"U002","pv":1,"uv":1}
{"user_region":"U002","pv":2,"uv":1}
{"user_region":"U003","pv":1,"uv":1}
{"user_region":"U003","pv":2,"uv":1}
{"user_region":"U003","pv":3,"uv":1}
{"user_region":"U003","pv":4,"uv":1}
{"user_region":"U003","pv":5,"uv":1}
{"user_region":"U002","pv":3,"uv":1}

恢复后的

{"user_region":"U002","pv":4,"uv":1}
{"user_region":"U002","pv":5,"uv":1}

操作

第一次启动
flink run  -m yarn-cluster  -ys 1 -p 1 -ynm flink-sql /data/flink_jars/indicator-sql.jar

kill后的再启动

flink run -s hdfs://nameservice//user/flink/checkpoint/8d80fe1615666f65342b2348d3c39044/chk-18/_metadata -m yarn-cluster  -ys 1 -p 1 -ynm flink-sql /data/flink_jars/indicator-sql.jar

2. 验证状态的清除(如何限制state)

「说明」:实时计算不断地去充实内存保存状态,达到一定量的时候肯定撑不住,现需要定期去清除状态。

TableEnvironmentTableConfig tConfig = tableEnv.getConfig();

tConfig.setIdleStateRetentionTime(Time.hours(5), Time.hours(10));

对setIdleStateRetentionTime两个参数的理解

这两个参数的含义:

minIdleStateRetentionTime: key被移除前state最少的空闲时间;

maxIdleStateRetentionTime:key被移除前state最长的空闲时间。


第一次:我们假设在时间为0的时候来了一条数据,那么会根据这个数据的key以及时当前系统时间为其注册一个Timer,这个Timer是用来清除state的,那么注册的Timer的时间是什么呢?

long cleanupTime = currentTime + maxRetentionTime;

其实Timer的时间就是 0 + 10 =10,也就是在10min后会回调相应的方法,这个方法主要就是用来清除state的。


第二次:在时间为2的时候来了第二条数据,这个数据的key与第一个一样,那么会操作同一个state,那么这个数据是否会去注册Timer吗?

答案是肯定会去注册Timer,只不过引入了一个if条件,只有满足这个条件才会去把之前的Timer删掉,然后重新注册一个Timer,否则维持之前的Timer不变。

(currentTime + minRetentionTime) > curCleanupTime

currentTime = 2

minRetentionTime = 5

curCleanupTime = 10

curCleanupTime我们可以理解为当前这个key对应的Timer的时间,也就是第一条数据注册的Timer的时间 10,因为2 + 5 > 10 条件不成立,所以不会对这个key的Timer做任何操作。


第三次:在时间为10的时候来了第三条数据,并且key与前两个数据一样,我们看一下此时条件是否成立吗?

currentTime = 10

minRetentionTime = 5

curCleanupTime = 10

那么10 +5  > 10 成立,会对这个key的Timer进行更新,把之前的10的Timer删掉,然后重新注册一个Timer时间是

currentTime = 10

maxRetentionTime = 10

所以最新的Timer时间 = 10 + 10= 20


第四次:继续,第四条数据在当前时间是12到来,12 + 5  > 20不成立,不去做任和Timer的更新操作。


第五次:继续,第五条相同key的数据到来,此时时间是16,但是在20的时候这个key的Timer执行了回调,进行了state的清除操作,所以这条数据已经找不到之前的state了,相当于重新开始,创建state,然后去注册Timer,Timer的时间是20 + 10 = 30。


对上面几次小结:通过上面可以发现,第3条数据到来时间是10,第四条数据到来时间是12,然后20min时候state清除,所以这个state的空闲时间是 20 - 12 =8,我们可以说这个state空闲了8,然后被删除,完全符合最少5,最长10的定义。

我们假设第4条数据没来过,那么第3条数据到来时间是10,然后20时候state被清除,期间一直无这个key的数据,所以state存活时间是10,同样完全符合我们定义的最小时间5以及最大时间10的范围。


「源码」

public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
    if (maxTime.toMilliseconds() - minTime.toMilliseconds() >= 300000L || maxTime.toMilliseconds() == 0L && minTime.toMilliseconds() == 0L) {
        this.minIdleStateRetentionTime = minTime.toMilliseconds();
        this.maxIdleStateRetentionTime = maxTime.toMilliseconds();
    } else {
        throw new IllegalArgumentException("Difference between minTime: " + minTime.toString() + " and maxTime: " + maxTime.toString() + "shoud be at least 5 minutes.");
    }
}


public interface CleanupState {
    default void registerProcessingCleanupTimer(
            ValueState<Long> cleanupTimeState,
            long currentTime,
            long minRetentionTime,
            long maxRetentionTime,
            TimerService timerService)
            throws Exception {
        // last registered timer
        Long curCleanupTime = cleanupTimeState.value();
 
        // check if a cleanup timer is registered and
        // that the current cleanup timer won't delete state we need to keep
        if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) {
            // we need to register a new (later) timer
            long cleanupTime = currentTime + maxRetentionTime;
            // register timer and remember clean-up time
            timerService.registerProcessingTimeTimer(cleanupTime);
            // delete expired timer
            if (curCleanupTime != null) {
                timerService.deleteProcessingTimeTimer(curCleanupTime);
            }
            cleanupTimeState.update(cleanupTime);
        }
    }
}

「总结」

对于上面,最好的理解是:

「最长体现在」: 如果在0的时间进来一条数据,然后10内没有数据进来,这个时候会触发清除操作,也就是所谓的key被移除前state最长的空闲时间。

「最短体现在」:第一次进来数据时间是5.1(刚过5),5+5.1>10成立,Timer的时间更新为15.1,这个时候闲置5min+,即5.1+5+5+ >15.1 然后被清除,也就是所谓的key被移除前state最短的空闲时间。


验证

setIdleStateRetentionTime 方法在此版本已经过时,新版本已经没有maxIdleStateRetentionTime,变为取决于minIdleStateRetentionTime

    @Deprecated
    public void setIdleStateRetentionTime(Time minTime, Time maxTime) {
        if (maxTime.toMilliseconds() - minTime.toMilliseconds() >= 300000L || maxTime.toMilliseconds() == 0L && minTime.toMilliseconds() == 0L) {
            this.setIdleStateRetention(Duration.ofMillis(minTime.toMilliseconds()));
        } else {
            throw new IllegalArgumentException("Difference between minTime: " + minTime.toString() + " and maxTime: " + maxTime.toString() + " should be at least 5 minutes.");
        }
    }
    
    @Deprecated
    public long getMaxIdleStateRetentionTime() {
        return this.getMinIdleStateRetentionTime() * 3L / 2L;
    }
    
    public void setIdleStateRetention(Duration duration) {
        this.configuration.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, duration);
    }
   

所以,用setIdleStateRetention方法即可,原理和上面说的一样

    bsTableEnv
        .getConfig
        .setIdleStateRetention(
            Duration.ofMillis( 
                Time.minutes(1L).toMilliseconds
                )
        )

「首次输入一批数据」

{"user_id":1002,"page_id":1001,"user_region":"U001"}

{"user_id":1001,"page_id":1002,"user_region":"U001"}

{"user_id":1002,"page_id":1001,"user_region":"U002"}

{"user_id":1001,"page_id":1002,"user_region":"U001"}

{"user_id":1002,"page_id":1002,"user_region":"U001"}

「首次结果」

{"user_region":"U001","pv":1,"uv":1}
{"user_region":"U001","pv":2,"uv":1}
{"user_region":"U002","pv":1,"uv":1}
{"user_region":"U001","pv":3,"uv":2}
{"user_region":"U001","pv":4,"uv":2}

「一分钟后再次输入数据」

{"user_id":1001,"page_id":1002,"user_region":"U001"}
{"user_id":1002,"page_id":1002,"user_region":"U001"}

「一分钟后再次输入数据后的结果(从头计算)」

{"user_region":"U001","pv":1,"uv":1}
{"user_region":"U001","pv":2,"uv":2}

「验证状态的清除成功」


3. 验证按天计算数据

「逻辑SQL」

 val source =
      s"""
         |CREATE TABLE pageviews  (
         |  user_id BIGINT,
         |  page_id BIGINT,
         |  user_region STRING,
         |  app_time BIGINT,
         |  dt AS FROM_UNIXTIME(app_time / 1000, 'yyyy-MM-dd')
         |) WITH (
         |    'connector' = 'kafka',
         |    'topic' = 'testtopic',
         |    'properties.bootstrap.servers' = 'xxxx',
         |    'format' = 'json'
         |)
       """.stripMargin

    val sink =
      s"""
         |CREATE TABLE pageviews_per_region (
         |  dt STRING,
         |  user_region STRING,
         |  pv BIGINT,
         |  uv BIGINT,
         |  PRIMARY KEY (user_region) NOT ENFORCED
         |) WITH (
         | 'connector' = 'upsert-kafka',
         | 'topic' = 'test',
         | 'properties.bootstrap.servers' = 'xxxx',
         | 'properties.group.id' = 'test',
         |  'key.format' = 'json',
         |  'value.format' = 'json'
         |)
       """.stripMargin
    tableEnv.executeSql(source)
    tableEnv.executeSql(sink)

    val insert =
      s"""
         |INSERT INTO pageviews_per_region
         |SELECT
         |  dt,
         |  user_region,
         |  COUNT(*),
         |  COUNT(DISTINCT user_id)
         |FROM pageviews
         |GROUP BY dt,user_region
       """.stripMargin
    tableEnv.executeSql(insert)

「测试数据」

2021-08-04数据

>{"user_id":1002,"page_id":1002,"user_region":"U001","app_time":1628061421071}

>{"user_id":1002,"page_id":1002,"user_region":"U001","app_time":1628061421071}

>{"user_id":1002,"page_id":1002,"user_region":"U001","app_time":1628006400000}

2021-08-03数据

>{"user_id":1002,"page_id":1002,"user_region":"U001","app_time":1627920000000}

>{"user_id":1002,"page_id":1002,"user_region":"U001","app_time":1627920000000}

「测试结果」

{"dt":"2021-08-04","user_region":"U001","pv":1,"uv":1}

{"dt":"2021-08-04","user_region":"U001","pv":2,"uv":1}

{"dt":"2021-08-04","user_region":"U001","pv":3,"uv":1}


{"dt":"2021-08-03","user_region":"U001","pv":1,"uv":1}

{"dt":"2021-08-03","user_region":"U001","pv":2,"uv":1}


4. 替代join方式

「方案一 UDF」

先实现一个简单的

class JoinFunction extends ScalarFunction {
 def eval(str:String): String ={
   str+"_hello"
 }

}
tableEnv.createTemporarySystemFunction("test_function",new JoinFunction())

val sql =
      s"""

         |SELECT
         |  dt,
         |  test_function(user_region),
         |  COUNT(*),
         |  COUNT(DISTINCT user_id)
         |FROM pageviews
         |GROUP BY dt,user_region
       """.stripMargin  
tableEnv.executeSql(sql).print()
+----+--------------------------------+--------------------------------+----------------------+----------------------+
| op |                             dt |                         EXPR$1 |               EXPR$2 |               EXPR$3 |
+----+--------------------------------+--------------------------------+----------------------+----------------------+
| +I |                     2021-08-03 |                       U001_hello |                    1 |                    1 |

去库里查找(类join取值)

class JoinFunction extends ScalarFunction {
 def eval(column:String): String ={
   println(s"进来的参数:column:${column}")
   val sql=s"select app_code from default.test_flink where order_number='$column'"
   val maps: util.List[util.Map[String, AnyRef]] = ImpalaDevUtil.executeQuery(sql, null)
   println(maps)
   println(!maps.isEmpty && maps.size()>0)
   if (!maps.isEmpty && maps.size()>0) {
     val app_code: String = maps.get(0).get("app_code").toString
     println(s"获取的值:$app_code")
     return app_code
   }
   null
 }

}

注册函数与逻辑

tableEnv.createTemporarySystemFunction("join_function",new JoinFunction())
 """
     |SELECT
     |  dt,
     |  join_function(user_region) appcode,
     |  COUNT(*),
     |  COUNT(DISTINCT user_id)
     |FROM pageviews
     |GROUP BY dt,user_region
""".stripMargin

数据

{"user_id":1002,"page_id":1002,"user_region":"1556666","app_time":1627920000000}

结果

+----+--------------------------------+--------------------------------+----------------------+----------------------+
| op |                             dt |                        appcode |               EXPR$2 |               EXPR$3 |
+----+--------------------------------+--------------------------------+----------------------+----------------------+
| +I |                     2021-08-03 |                       25656660 |                    1 |                    1 |

方案2:实现其他库的connectors(可以用Join)

JDBC 现支持Derby,MySQL,Postgres,对于不同的数据源需要自定义:比如Impala

1. ImpalaRowConverter 修改converterName即可

public class ImpalaRowConverter extends AbstractJdbcRowConverter {
    private static final long serialVersionUID = 1L;

    public String converterName() {
        return "Impala";
    }

    public ImpalaRowConverter(RowType rowType) {
        super(rowType);
    }
}

2. 实现 ImpalaDialect

并重写getUpsertStatement 或者直接复制MySQLDialect微修改就可以

ImpalaDialect extends AbstractDialect

3. 增添Impala

public final class JdbcDialects {

 private static final List<JdbcDialect> DIALECTS = Arrays.asList(new DerbyDialect(), new MySQLDialect(), new PostgresDialect());
}

4. 重新打包flink-connector-jdbc替换Flink集群/lib下面原生的jar包


5. Bug记录

FlinkSQL 找不到 kafka connector

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kafka'
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
        ... 53 more

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem

解决:下载到lib包下

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-kafka_2.11</artifactId>
    <version>1.13.1</version>
    <scope>provided</scope>
</dependency>

kafka目前只能写入append的数据

TableSink doesn't support consuming update changes which is produced by node GroupAggregate

解决:这个是正常现象。如果你用了普通的group by的话,那么它的结果就是有更新的,所以需要sink支持写入update的结果, 但是kafka目前只能写入append的数据,所以会报上面的错误。

  • 使用upsert-kafka

WITH (
'connector' = 'upsert-kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'xxxx',
'properties.group.id' = 'test',
 'key.format' = 'json',
 'value.format' = 'json'
)
  • 或者使用window group


总结

通过以上验证,最起码前期准备工作已经做完,可以用SQL去开发业务了。其他性能问题或者业务复杂度问题根据情况去解决。

如果其他疑点需要验证或者调研的,可以评论或者留言给我......


 

 

大数据左右手

技术如同手中的水有了生命似的,汇聚在了一起。作为大数据开发工作者,致力于大数据技术的学习与工作,分享大数据原理、架构、实时、离线、面试与总结,分享生活思考与读书见解。总有适合你的那一篇。

关注公众号!!!

和我联系吧,加群交流大数据知识,一起成长~~~