• Flink学习25:窗口计算函数


    1.窗口计算函数简介

    flink支持4种类型的窗口计算函数。

     2.ReduceFunction

    跟之前reduce算子学习差不多。

    对数据进行累计计算。

    Flink学习20:算子介绍reduce_hzp666的博客-CSDN博客

    3.窗口计算函数 AggregateFunction

    3.1自定义Aggregate方法

    重写3个方法:

    createAccumulator(生成累加器)、 add(编辑累加逻辑)、getResult(编辑输出结果)、merge(编辑累加器合并逻辑)

    3.2 示例(计算随机股票价格的平均值):

    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.windowing.time.Time
    
    import scala.util.Random
    
    object windowsAggregateFunc {
      //create case class stock
      case class stock(stockId:String,stockPrice:Double)
    
      //create my ds
      class myDataSource extends RichSourceFunction[stock]{
    
        //for loop
        var isRunning = true
    
        //initialization the stock
        //create the price list
        var stockId = 0
        var curPrice = 0.0
    
        //create the random price
        var priceList = List(1.0, 2.0, 3.0, 4.0)
    
        //for random change price
        val rand = new Random()
    
    
        override def run(sourceContext: SourceFunction.SourceContext[stock]): Unit = {
    
          while (isRunning){
    
            //price change random
            stockId = rand.nextInt(priceList.size)
            curPrice = priceList(stockId)+rand.nextGaussian()*0.05
    
            //update the new price list
            priceList=priceList.updated(stockId,curPrice)
    
            //put the data into data stream
            sourceContext.collect(stock("stock_"+stockId, curPrice))
    
            Thread.sleep(500)
            
    
          }
    
        }
    
        override def cancel(): Unit = {
    
          isRunning = false
        }
      }
    
      //over write the aggregate func
    
      class myAggregateFunc extends AggregateFunction[stock,(String,Double,Long),(String,Double)]{
    
        //create the acc
        override def createAccumulator(): (String, Double, Long) = ("",0D,0L)
    
        //create the acc's func
        override def add(in: stock, acc: (String, Double, Long)): (String, Double, Long) = {
    
          //add every stock's price and count how many times the stock's price have changed, for get stock price's average
           (in.stockId,acc._2+in.stockPrice,acc._3+1)
        }
    
        //this is to get the average of stock price
        override def getResult(acc: (String, Double, Long)): (String, Double) = {
          (acc._1,acc._2/acc._3)
        }
    
        //set the acc's merge func
        override def merge(acc: (String, Double, Long), acc1: (String, Double, Long)): (String, Double, Long) = {
    
          //to merge the stock in the acc, the price and the times of every stock's price have changed
          (acc._1,acc._2+acc1._2,acc._3+acc1._3)
        }
      }
    
    
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    
        //add my ds
        val ds = env.addSource(new myDataSource)
    
        ds.keyBy(s => s.stockId)
          .timeWindow(Time.seconds(10))
          .aggregate(new myAggregateFunc)
          .print("outPUT")
    
    
        env.execute()
        
    
      }
    
    }
    

    输出结果:

     4.窗口计算函数 FoldFunction

    (flink 1.13.0 已经把fold废弃了,现在只能用reduce了。)

    FoldFunc不能用于会话窗口和可合并的窗口,

    fold已经不能用了,必须用reduce来替代,这里就不多讲了。

    5. 窗口计算函数 ProcessFunc

     

  • 相关阅读:
    (计算机组成原理)第二章数据的表示和运算-第一节3:无符号数的表示和运算
    华为OD机试真题【服务器能耗统计】
    Django学习的第三课 (争取后续写完一套学习Django的笔记)
    (附源码)ssm考试题库管理系统 毕业设计 069043
    全局指令选择
    【MM32F5270开发板试用】基于MindSDK测试MM32F5270开发板IIC
    Hadoop虚拟机安装超详细版
    Python基础语法
    第四章 文件管理 九、文件系统的层次结构
    Sentinel热点参数限流
  • 原文地址:https://blog.csdn.net/hzp666/article/details/126723626