Flink常用API详解

概述:
    Flink根据抽象程度分层,提供了3种不同的API和库,每一种API在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景.


Environment

Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单

ProcessFunction

概述:
    ProcessFunctino是Flink所提供最底层接口. ProcessFunction可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口
    内的多个事件,它提供了对于时间和状态的细粒度控制,开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发
    回调函数.因此你可以利用ProcessFunctino实现许多有状态的事件驱动应用所需要的基于单个事件的复杂业务逻辑.

DataStream API

概述: 为许多通用的流处理操作提供了处理原语
    这些操作包括窗口、逐条记录的的转换操作,在处理时间时进行外部数据库查询等. DataStream API支持JavaScala语言,预先定义了
    例如map()reduce()Aggregate()等函数,你可以通过扩展实现预定义接口或使用Java、SCala的lambda表达式实现自定义的函数


4. DataStream 转换算子

1. Map[DataStream->DataStream]
       调用用户自定义的MapFunction对DataStream[T]数据进行处理,形成新的DataStream[T],其中数据格式可能会发生变化,
        常用作对数据集内数据的清洗和转换,例如将输入数据集中的每个数值全部加1处理,并且将数据输出到下游数据集

2. FlatMap[DataStream->DataStream]
    该算子主要应用处理输入一个元素产生一个或者多个元素的计算场景,比较常见的是在经典例子WordCount中,将每一行的
    文本数据切割,生成单词序列

3. Filter[DataStream->DataStream]
   该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出,将不符合条件的数据过滤掉.
    3.1 // 通过通配符
        val filter:DataStream[Int] = dataStream.filter{_%2==0}
    3.2 // 或者指定运算表达式
        val filter:DataStream[Int] = dataStream.filter{x=> x%2==0}

4. KeyBy[DataStream->KeyedStream]
  该算子根据指定的key将输入的DataStream[T]数据格式转换为KeyedStream[T],也就是在数据集中执行Partition操作,将
    相同的Key值的数据放置在相同的分区中.
 例如WordCount-> 将数据集中第一个参数作为Key,对数据集进行KeyBy函数操作,形成根据Id分区的KeyedStream数据集.
    eg:
        val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
        //指定第一个字段为分区key
        val keyedStream: KeyedStream [(String,Int),Tuple] = dataStream.keyBy(0)

5. Reduce[KeyedStream->DataStream]
    该算子和MapReduce中Reduce原理基本一致,主要目的是将输入的KeyedStream通过传入的用户自定义地ReduceFunction
    滚动地进行数据聚合处理,其中定义ReduceFunction必须满足运算结合律和交换律,
    eg:
        对传入的KeyedStream数据集中相同key值的数据独立进行求和运算,得到每个key所对应的求和值.
        val dataStream = env.fromElements(("a",3),("d",4),("c",2),("a",5))
        //指定第一个字段为分区key`
        val keyedStream:KeyedStream[(String,Int),Tuple] = dataStream.keyBy(0)
        // 滚动对第二个字段进行reduce相加求和
        val reduceStream = keyedStream.reduce{(x1,x2)=>(x1._1,x1._2+x2._2)

6. Aggregations[KeyedStream->DataStream]
  Aggregations是KeyedDataStream接口提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果.
    其实是将Reduce算子中的函数进行了封装,封装的聚合操作有sum、min、minBy、max、maxBy等,这样就不需要用户自己定义
    Reduce函数.
    eg:
        指定数据集中第一个字段作为key,用第二个字段作为累加字段,然后滚动地对第二个字段的数值进行累加并输出
        //指定第一个字段为分区key
        val keyedStream:KeyedStream[(Int,Int),Tuple] = dataStream.keyBy(0)
        // 对对第二个字段进行sum统计
        val sumStream:DataStream[(Int,Int)] = keyedStream.sum(1)
        // 输出计算结果
        sumStream.print()

7. Union[DataStream->DataStream]
 union算子主要是将两个或者多个输入的数据集合并成一个数据集,需要保证两个数据集的格式一致,输出的数据集的格式和
    输入的数据集格式保持一致,
    code:
        //获取flink实时流处理的环境
        val env = ExecutionEnvironment.getExecutionEnvironment
        // 创建不同的数据集
        val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
        val dataStream2 = env.fromElements(("a", 1), ("d", 1), ("c", 1), ("a", 1))
        // 调用union算子进行不同的数据集合并
        dataStream.union(dataStream2).print()

8.Connect,CoMap,CoFlatMap[DataStream->ConnectedStream->DataStream](只能在Stream才可以用)
    connect算子主要是为了合并两种或者多种不同数据类型的数据集,合并后会保留原来原来数据集的数据类型.
    例如:  dataStream1数据集为(String,Int) 元组类型,dataStream2数据集为Int类型,通过connect连接算子将两个不同数据
    类型的流结合在一起,形成格式为ConnectedStreams的数据集,其内部数据为[(String,Int),Int]的混合数据类型,保留了两个
    原始数据集的数据类型
    eg:
        //获取flink实时流处理的环境
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            // 创建不同的数据集
            val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
            val dataStream2 = env.fromElements(1, 2, 4, 5)
            // 连接两个DataStream数据集
            val connectedStream = dataStream.connect(dataStream2)
            val result = connectedStream.map(
              //第一个处理函数
              t1 => {
                (t1._1, t1._2)
              },
              //第二个处理函数
              t2 => {
                (t2, 0)
              })
            result.print()
            env.execute("h")


    注意:Union和Connect区别
        1. Union之间两个流的类型必须是一样,Connect可以不一样,在之后的coMap中在去调整成为一样的.
        2. Connect只能操作两个流,Union可以操作多个

9. Split 和 select [DataStream -> SplitStream->DataStream]
    Split算子是将一个DataStream数据集按照条件进行拆分,形成两个数据集的过程,也是Union算子的逆向实现,每个接入的数据
    都会被路由到一个或者多个输出数据集中,
    在使用Splict函数中,需要定义split函数中的切分逻辑,通过调用split函数,然后指定条件判断函数,
    例如: 如下代码所示,将根据第二个字段的奇偶性将数据集标记出来,如果是偶数则标记为even,如果是奇数则标记为odd,然后通过集合将标记返回,最终生成格式SplitStream的数据集
    code:
        
        //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 导入Flink隐式转换
        import org.apache.flink.streaming.api.scala._
        // 创建不同的数据集
        val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
        val splitedStream = dataStream.split(t => if (t._2 % 2 == 0) Seq("even") else Seq("odd"))
    
        // Split函数本身只是对输入数据集进行标记,并没有将数据集真正的实现拆分,因此需要借助Select函数根据标记将数据切分成不同的数据集,
        //筛选出偶数数据集
        val evenStream = splitedStream.select("even").print()
        //筛选出偶数数据集
        val oddStream = splitedStream.select("odd")
    
        env.execute("l ")

函数类和富函数类

    概述:
    前面学过的所有算子集合都可以自定义一个函数类,富函数类作为参数,因为Flink暴露了这两种函数类的接口,常见的函数接口:
    1. MapFunction
    2. FlatMapFunction
    3. ReduceFunction
    富函数接口它与其他常规函数接口的不同在于:可以获取运行环境的上下文,在上下文环境中可以管理状态(State),并拥有一些生命周期方法,
    所以可以实现更复杂的功能.富函数的接口有:
    1. RichMapFunction
    2. RichFlatMapFunction
    3. RichFilterFunction

1. 普通函数类举例:
    按照指定的时间格式输出每个通话的拨号时间和结束时间
    code:
        import java.text.SimpleDateFormat
        import java.util.Date
        import org.apache.flink.api.common.functions.MapFunction
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        object FunctionClassTransformation {
          case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
          def main(args: Array[String]): Unit = {
            //获取flink实时流处理的环境
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            env.setParallelism(1)
            // 导入Flink隐式转换
            import org.apache.flink.streaming.api.scala._
            val data = env.readTextFile(getClass.getResource("station.log").getPath)
              .map { line =>
                var arr = line.split(",")
                StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
              }
            // 定义时间输出格式
            val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
            // 过滤那些通话成功的
            data.filter(_.callType.equals("success"))
              .map(new CallMapFunction(format))
              .print()
        
            env.execute("l ")
        
          }
        
          class CallMapFunction(format: SimpleDateFormat) extends
            MapFunction[StationLog, String] {
            override def map(t: StationLog): String = {
              var startTime = t.callTime
              val endTime = t.callTime + t.duration * 1000
              "主叫号码: " + t.callOut + " , 被叫号码: " + t.callInt + ", 呼叫起始时间: " + format.format(new Date(startTime)) + ",呼叫结束时间: " + format.format(new Date(endTime))
            }
          }}

    result:
        主叫号码: 18600003186 , 被叫号码: 18900002113, 呼叫起始时间: 2019-12-23 13:54:13,呼叫结束时间: 2019-12-23 13:54:45
        主叫号码: 18600003794 , 被叫号码: 18900009608, 呼叫起始时间: 2019-12-23 13:54:13,呼叫结束时间: 2019-12-23 13:54:17
        主叫号码: 18600000005 , 被叫号码: 18900007729, 呼叫起始时间: 2019-12-23 13:56:43,呼叫结束时间: 2019-12-23 14:02:32
        主叫号码: 18600005404 , 被叫号码: 18900000558, 呼叫起始时间: 2019-12-23 13:54:17,呼叫结束时间: 2019-12-23 13:54:22
        主叫号码: 18600003532 , 被叫号码: 18900008128, 呼叫起始时间: 2019-12-23 13:54:19,呼叫结束时间: 2019-12-23 13:54:29
        主叫号码: 18600003532 , 被叫号码: 18900008128, 呼叫起始时间: 2019-12-23 13:54:26,呼叫结束时间: 2019-12-23 13:54:41
        主叫号码: 18600003502 , 被叫号码: 18900009859, 呼叫起始时间: 2019-12-23 13:54:28,呼叫结束时间: 2019-12-23 13:54:28
        主叫号码: 18600003502 , 被叫号码: 18900009859, 呼叫起始时间: 2019-12-23 13:54:28,呼叫结束时间: 2019-12-23 13:54:28   

2.  富函数类举例:
        把呼叫成功的通话信息转换成真实的用户姓名,通话用户对应的用户表(在MySql数据库中),
        由于需要从数据库中查询数据,就需要创建连接,创建连接的代码必须写在生命周期的open方法中,所以需要使用富函数类.
    Rich Function有一个生命周期的概念,典型的生命周期方法有:
        open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用.
        clsose()方法是生命周期中的最后一个调用的方法,做一些清理工作
        getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
    code:
        package com.bjsxt.flink.transformation
        
        import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
        
        import com.bjsxt.flink.source.StationLog
        import org.apache.flink.api.common.functions.RichMapFunction
        import org.apache.flink.configuration.Configuration
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        
        object TestRichFunctionClass {
        
          /**
           * 把通话成功的电话号码转换成真是用户姓名,用户姓名保存在Mysql表中
           * @param args
           */
          def main(args: Array[String]): Unit = {
            val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
            import org.apache.flink.streaming.api.scala._
        
            //读取数据源
            var filePath =getClass.getResource("/station.log").getPath
            val stream: DataStream[StationLog] = streamEnv.readTextFile(filePath)
              .map(line=>{
                var arr=line.split(",")
                new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
              })
        
            //计算:把电话号码变成用户姓名
            val result: DataStream[StationLog] = stream.filter(_.callType.equals("success"))
              .map(new MyRichMapFunction)
            result.print()
        
            streamEnv.execute()
          }
        
          //自定义一个富函数类
          class MyRichMapFunction extends RichMapFunction[StationLog,StationLog]{
            var conn:Connection=_
            var pst:PreparedStatement=_
            override def open(parameters: Configuration): Unit = {
              conn =DriverManager.getConnection("jdbc:mysql://localhost/test","root","123123")
              pst =conn.prepareStatement("select name from t_phone where phone_number=?")
            }
        
            override def close(): Unit = {
              pst.close()
              conn.close()
            }
        
            override def map(value: StationLog): StationLog = {
              println(getRuntimeContext.getTaskNameWithSubtasks)
              //查询主叫号码对应的姓名
              pst.setString(1,value.callOut)
              val result: ResultSet = pst.executeQuery()
              if(result.next()){
                value.callOut=result.getString(1)
              }
              //查询被叫号码对应的姓名
              pst.setString(1,value.callInt)
              val result2: ResultSet = pst.executeQuery()
              if(result2.next()){
                value.callInt=result2.getString(1)
              }
              value
            }
          }
        }


6. 底层 ProcessFunctionAPI
    概述:
        ProcessFunction是一个低层次的流处理操作,允许所有返回Stream的基础构建模块:
        访问Event本身数据(比如:Event的时间, Event的当前Key)
        管理状态 State(仅在keyed Stream中)
        管理定时器Timer( 包括: 注册定时器,删除定时器等)
    总而言之,ProcessFunction是Flink最底层的API,也是功能最强大的.
    例如:
        监控每一个手机,如果在5s内呼叫它的通话都是失败的,发出警告信息.
    注意:
        这个案例中会使用到状态编程,请同学们只要知道状态的意思,不需要掌握,
    code:
        package com.bjsxt.flink.transformation
        
        import com.bjsxt.flink.source.StationLog
        import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
        import org.apache.flink.streaming.api.functions.KeyedProcessFunction
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        import org.apache.flink.util.Collector
        
        object TestProcessFunction {
        
          //监控每一个手机号码,如果这个号码在5秒内,所有呼叫它的日志都是失败的,则发出告警信息
          //如果在5秒内只要有一个呼叫不是fail则不用告警
          def main(args: Array[String]): Unit = {
        
            val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
            import org.apache.flink.streaming.api.scala._
        
            //读取数据源
            val stream: DataStream[StationLog] = streamEnv.socketTextStream("hadoop101",8888)
              .map(line=>{
                var arr=line.split(",")
                new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
              })
        
            //计算
            val result: DataStream[String] = stream.keyBy(_.callInt)
              .process(new MonitorCallFail)
            result.print()
        
            streamEnv.execute()
          }
        
          //自定义一个底层的类
          class MonitorCallFail extends KeyedProcessFunction[String,StationLog,String]{
            //使用一个状态对象记录时间
            lazy val timeState :ValueState[Long] =getRuntimeContext.getState(new ValueStateDescriptor[Long]("time",classOf[Long]))
        
            override def processElement(value: StationLog, ctx: KeyedProcessFunction[String, StationLog, String]#Context, out: Collector[String]): Unit = {
              //从状态中取得时间
              var time =timeState.value()
              if(time==0 && value.callType.equals("fail") ){ //表示第一次发现呼叫失败,记录当前的时间
                //获取当前系统时间,并注册定时器
                var nowTime =ctx.timerService().currentProcessingTime()
                //定时器在5秒后触发
                var onTime =nowTime+8*1000L
                ctx.timerService().registerProcessingTimeTimer(onTime)
                //把触发时间保存到状态中
                timeState.update(onTime)
              }
              if (time!=0 && !value.callType.equals("fail")){ //表示有一次成功的呼叫,必须要删除定时器
                ctx.timerService().deleteProcessingTimeTimer(time)
                timeState.clear() //清空状态中的时间
              }
            }
        
            //时间到了,定时器执行,
            override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext, out: Collector[String]): Unit = {
              var warnStr="触发的时间:"+timestamp +" 手机号 :"+ctx.getCurrentKey
              out.collect(warnStr)
              timeState.clear()
            }
          }
        }

7. 侧输出流 Side Output

概述:
    在Flink处理数据流时,我们经常会遇到这样的情况: 在处理一个数据源时,往往需要将该源中的不同类型的数据做分割处理,如果使用filter
    算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;flink中的侧输出就是将数据流进行分割,而不对流
    进行复制的一种分流机制,
    Flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据.

案例: (根据基站的日志,请把呼叫成功的Stream(主流)和不成功的Stream(侧流分别输出)
code:
    package FlinkDemo.functions
    9
    import FlinkDemo.functions.FunctionClassTransformation.StationLog
    import org.apache.flink.streaming.api.functions.ProcessFunction
    import org.apache.flink.util.Collector
    
    object TestSideOutpurStream {
      // 导入Flink隐式转换
      import org.apache.flink.streaming.api.scala._
    
      // 侧输出流首先需要定义一个流的标签
      val notSuccessTag = new OutputTag[StationLog]("not_success")
    
      def main(args: Array[String]): Unit = {
        //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
    
        val data = env.readTextFile(getClass.getResource("station.log").getPath)
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
        val mainStream = data
          .process(new CreateSideOutputStream(notSuccessTag))
        // 得到测流
        val sideStream = mainStream.getSideOutput(notSuccessTag)
        mainStream.print("main")
        sideStream.print("sideOutput")
        env.execute()
      }
    
      class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {
        override def processElement(value: StationLog, context: ProcessFunction[StationLog, StationLog]#Context, collector: Collector[StationLog]): Unit = {
          //输出主流
          if (value.callType.equals("success")) {
            collector.collect(value)
          }
          else {
            //输出侧流
            context.output(tag, value)}}}}

            对kafka的数据通过值可以分别放入侧输出流中,实现分流



行动算子


SQL& Table API:

        

复杂事件处理-CEP


DataSet API