• flink的AggregateFunction,merge方法作用范围


    背景

    AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现

    AggregateFunction.merge方法调用时机

    AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示
    在这里插入图片描述

    对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并

    public void processElement(StreamRecord<IN> element) throws Exception {
            final Collection<W> elementWindows =
                    windowAssigner.assignWindows(
                            element.getValue(), element.getTimestamp(), windowAssignerContext);
     
            // if element is handled by none of assigned elementWindows
            boolean isSkippedElement = true;
     
            final K key = this.<K>getKeyedStateBackend().getCurrentKey();
     
            if (windowAssigner instanceof MergingWindowAssigner) {
                MergingWindowSet<W> mergingWindows = getMergingWindowSet();
     
                for (W window : elementWindows) {
     
                    // adding the new window might result in a merge, in that case the actualWindow
                    // is the merged window and we work with that. If we don't merge then
                    // actualWindow == window
                    W actualWindow =
                            mergingWindows.addWindow(
                                    window,
                                    new MergingWindowSet.MergeFunction<W>() {
                                        @Override
                                        public void merge(
                                                W mergeResult,
                                                Collection<W> mergedWindows,
                                                W stateWindowResult,
                                                Collection<W> mergedStateWindows)
                                                throws Exception {
     
                                            triggerContext.key = key;
                                            triggerContext.window = mergeResult;
     
                                            triggerContext.onMerge(mergedWindows);
     
                                            for (W m : mergedWindows) {
                                                triggerContext.window = m;
                                                triggerContext.clear();
                                                deleteCleanupTimer(m);
                                            }
     
                                            // 合并窗口的状态
                                            windowMergingState.mergeNamespaces(
                                                    stateWindowResult, mergedStateWindows);
                                        }
                                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    继续查看AbstractHeapMergingState.mergeNamespaces方法,

    public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
        if (sources == null || sources.isEmpty()) {
            return; // nothing to do
        }
     
        final StateTable<K, N, SV> map = stateTable;
     
        SV merged = null;
     
        // merge the sources
        for (N source : sources) {
     
            // get and remove the next source per namespace/key
            SV sourceState = map.removeAndGetOld(source);
     
            if (merged != null && sourceState != null) {
                //此处合并状态并调用AggregateFunction.merge方法
                merged = mergeState(merged, sourceState);
            } else if (merged == null) {
                merged = sourceState;
            }
        }
     
        // merge into the target, if needed
        if (merged != null) {
            map.transform(target, merged, mergeTransformation);
        }
    }
     
    //真正调用AggregateFunction.merge方法合并自定义的状态
    @Override
    protected ACC mergeState(ACC a, ACC b) {
        return aggregateTransformation.aggFunction.merge(a, b);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错

  • 相关阅读:
    matlab图像的运算有点运算、代数运算、逻辑运算和几何运算
    Linux系统编程系列之线程池
    点燃市场热情,让产品风靡全球——实用推广策略大揭秘!
    用flex实现grid布局
    Java 反射系列 —— 学习笔记
    高能数造电池3D打印智能制造小试线,开启全固态电池数字化新时代
    知识图谱顶会论文(KDD-2022) kgTransformer:复杂逻辑查询的预训练知识图谱Transformer
    完美掌握MySQL登录方法]MySQL登录教程
    DataBinding 基础用法
    回溯法求解n个元素的集合的幂集
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/134277215