flink支持4种类型的窗口计算函数。


跟之前reduce算子学习差不多。
对数据进行累计计算。
Flink学习20:算子介绍reduce_hzp666的博客-CSDN博客



重写3个方法:
createAccumulator(生成累加器)、 add(编辑累加逻辑)、getResult(编辑输出结果)、merge(编辑累加器合并逻辑)
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import scala.util.Random
object windowsAggregateFunc {
//create case class stock
case class stock(stockId:String,stockPrice:Double)
//create my ds
class myDataSource extends RichSourceFunction[stock]{
//for loop
var isRunning = true
//initialization the stock
//create the price list
var stockId = 0
var curPrice = 0.0
//create the random price
var priceList = List(1.0, 2.0, 3.0, 4.0)
//for random change price
val rand = new Random()
override def run(sourceContext: SourceFunction.SourceContext[stock]): Unit = {
while (isRunning){
//price change random
stockId = rand.nextInt(priceList.size)
curPrice = priceList(stockId)+rand.nextGaussian()*0.05
//update the new price list
priceList=priceList.updated(stockId,curPrice)
//put the data into data stream
sourceContext.collect(stock("stock_"+stockId, curPrice))
Thread.sleep(500)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
//over write the aggregate func
class myAggregateFunc extends AggregateFunction[stock,(String,Double,Long),(String,Double)]{
//create the acc
override def createAccumulator(): (String, Double, Long) = ("",0D,0L)
//create the acc's func
override def add(in: stock, acc: (String, Double, Long)): (String, Double, Long) = {
//add every stock's price and count how many times the stock's price have changed, for get stock price's average
(in.stockId,acc._2+in.stockPrice,acc._3+1)
}
//this is to get the average of stock price
override def getResult(acc: (String, Double, Long)): (String, Double) = {
(acc._1,acc._2/acc._3)
}
//set the acc's merge func
override def merge(acc: (String, Double, Long), acc1: (String, Double, Long)): (String, Double, Long) = {
//to merge the stock in the acc, the price and the times of every stock's price have changed
(acc._1,acc._2+acc1._2,acc._3+acc1._3)
}
}
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
//add my ds
val ds = env.addSource(new myDataSource)
ds.keyBy(s => s.stockId)
.timeWindow(Time.seconds(10))
.aggregate(new myAggregateFunc)
.print("outPUT")
env.execute()
}
}
输出结果:


FoldFunc不能用于会话窗口和可合并的窗口,
fold已经不能用了,必须用reduce来替代,这里就不多讲了。
