• flink job同时使用BroadcastProcessFunction和KeyedBroadcastProcessFunction例子


    背景:

    广播状态可以用于规则表或者配置表的实时更新,本文就是用一个欺诈检测的flink作业作为例子看一下BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用

    BroadcastProcessFunction和KeyedBroadcastProcessFunction的使用

    1.首先看主流程,主流程中使用了两个Broadcast广播的状态,这两个Broadcast广播的状态是独立的

     // 这里面包含规则广播状态的两次使用方法,分别在DynamicKeyFunction处理函数和DynamicAlertFunction处理函数,注意这两个处理函数中的广播状态是独立的,也就是需要分别维度,不能共享
        // Processing pipeline setup
        DataStream<Alert> alerts =
            transactions
                .connect(rulesStream)
                .process(new DynamicKeyFunction())
                .uid("DynamicKeyFunction")
                .name("Dynamic Partitioning Function")
                .keyBy((keyed) -> keyed.getKey())
                .connect(rulesStream)
                .process(new DynamicAlertFunction())
                .uid("DynamicAlertFunction")
                .name("Dynamic Rule Evaluation Function");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2.BroadcastProcessFunction的处理,这里面会维护这个算子本身的广播状态,并把所有的事件扩散发送到下一个算子

    public class DynamicKeyFunction
        extends BroadcastProcessFunction<Transaction, Rule, Keyed<Transaction, String, Integer>> {
    
      @Override
      public void open(Configuration parameters) {
      }
    
      // 这里会把每个事件结合上广播状态中的每个规则生成N条记录,流转到下一个算子
      @Override
      public void processElement(
          Transaction event, ReadOnlyContext ctx, Collector<Keyed<Transaction, String, Integer>> out)
          throws Exception {
        ReadOnlyBroadcastState<Integer, Rule> rulesState =
            ctx.getBroadcastState(Descriptors.rulesDescriptor);
        forkEventForEachGroupingKey(event, rulesState, out);
      }
    
      // 独立维护广播状态,可以在广播状态中新增删除或者清空广播状态
      @Override
      public void processBroadcastElement(
          Rule rule, Context ctx, Collector<Keyed<Transaction, String, Integer>> out) throws Exception {
        log.info("{}", rule);
        BroadcastState<Integer, Rule> broadcastState =
            ctx.getBroadcastState(Descriptors.rulesDescriptor);
        handleRuleBroadcast(rule, broadcastState);
        if (rule.getRuleState() == RuleState.CONTROL) {
          handleControlCommand(rule.getControlType(), broadcastState);
        }
      }
      }
        static void handleRuleBroadcast(Rule rule, BroadcastState<Integer, Rule> broadcastState)
          throws Exception {
        switch (rule.getRuleState()) {
          case ACTIVE:
          case PAUSE:
            broadcastState.put(rule.getRuleId(), rule);
            break;
          case DELETE:
            broadcastState.remove(rule.getRuleId());
            break;
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    3.KeyedBroadcastProcessFunction的处理,这里面也是会维护这个算子本身的广播状态,此外还有键值分区状态,特别注意的是在处理广播元素时,可以用applyToKeyedState方法对所有的键值分区状态应用某个方法,对于ontimer方法,依然可以访问键值分区状态和广播状态

    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.ververica.field.dynamicrules.functions;
    
    import static com.ververica.field.dynamicrules.functions.ProcessingUtils.addToStateValuesSet;
    import static com.ververica.field.dynamicrules.functions.ProcessingUtils.handleRuleBroadcast;
    
    import com.ververica.field.dynamicrules.Alert;
    import com.ververica.field.dynamicrules.FieldsExtractor;
    import com.ververica.field.dynamicrules.Keyed;
    import com.ververica.field.dynamicrules.Rule;
    import com.ververica.field.dynamicrules.Rule.ControlType;
    import com.ververica.field.dynamicrules.Rule.RuleState;
    import com.ververica.field.dynamicrules.RuleHelper;
    import com.ververica.field.dynamicrules.RulesEvaluator.Descriptors;
    import com.ververica.field.dynamicrules.Transaction;
    import java.math.BigDecimal;
    import java.util.*;
    import java.util.Map.Entry;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.api.common.accumulators.SimpleAccumulator;
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.metrics.Meter;
    import org.apache.flink.metrics.MeterView;
    import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
    import org.apache.flink.util.Collector;
    
    /** Implements main rule evaluation and alerting logic. */
    @Slf4j
    public class DynamicAlertFunction
        extends KeyedBroadcastProcessFunction<
            String, Keyed<Transaction, String, Integer>, Rule, Alert> {
    
      private static final String COUNT = "COUNT_FLINK";
      private static final String COUNT_WITH_RESET = "COUNT_WITH_RESET_FLINK";
    
      private static int WIDEST_RULE_KEY = Integer.MIN_VALUE;
      private static int CLEAR_STATE_COMMAND_KEY = Integer.MIN_VALUE + 1;
    
      private transient MapState<Long, Set<Transaction>> windowState;
      private Meter alertMeter;
    
      private MapStateDescriptor<Long, Set<Transaction>> windowStateDescriptor =
          new MapStateDescriptor<>(
              "windowState",
              BasicTypeInfo.LONG_TYPE_INFO,
              TypeInformation.of(new TypeHint<Set<Transaction>>() {}));
    
      @Override
      public void open(Configuration parameters) {
    
        windowState = getRuntimeContext().getMapState(windowStateDescriptor);
    
        alertMeter = new MeterView(60);
        getRuntimeContext().getMetricGroup().meter("alertsPerSecond", alertMeter);
      }
    
      // 键值分区状态和广播状态联合处理,在这个方法中可以更新键值分区状态,然后广播状态只能读取
      @Override
      public void processElement(
          Keyed<Transaction, String, Integer> value, ReadOnlyContext ctx, Collector<Alert> out)
          throws Exception {
    
        long currentEventTime = value.getWrapped().getEventTime();
    
        addToStateValuesSet(windowState, currentEventTime, value.getWrapped());
    
        long ingestionTime = value.getWrapped().getIngestionTimestamp();
        ctx.output(Descriptors.latencySinkTag, System.currentTimeMillis() - ingestionTime);
    
        Rule rule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(value.getId());
    
        if (noRuleAvailable(rule)) {
          log.error("Rule with ID {} does not exist", value.getId());
          return;
        }
    
        if (rule.getRuleState() == Rule.RuleState.ACTIVE) {
          Long windowStartForEvent = rule.getWindowStartFor(currentEventTime);
    
          long cleanupTime = (currentEventTime / 1000) * 1000;
          ctx.timerService().registerEventTimeTimer(cleanupTime);
    
          SimpleAccumulator<BigDecimal> aggregator = RuleHelper.getAggregator(rule);
          for (Long stateEventTime : windowState.keys()) {
            if (isStateValueInWindow(stateEventTime, windowStartForEvent, currentEventTime)) {
              aggregateValuesInState(stateEventTime, aggregator, rule);
            }
          }
          BigDecimal aggregateResult = aggregator.getLocalValue();
          boolean ruleResult = rule.apply(aggregateResult);
    
          ctx.output(
              Descriptors.demoSinkTag,
              "Rule "
                  + rule.getRuleId()
                  + " | "
                  + value.getKey()
                  + " : "
                  + aggregateResult.toString()
                  + " -> "
                  + ruleResult);
    
          if (ruleResult) {
            if (COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {
              evictAllStateElements();
            }
            alertMeter.markEvent();
            out.collect(
                new Alert<>(
                    rule.getRuleId(), rule, value.getKey(), value.getWrapped(), aggregateResult));
          }
        }
      }
    
      //维护广播状态,新增/删除或者整个清空,值得注意的是,处理广播元素时可以对所有的键值分区状态应用某个函数,比如这里当收到某个属于控制消息的广播消息时,使用applyToKeyedState方法把所有的键值分区状态都清空
      @Override
      public void processBroadcastElement(Rule rule, Context ctx, Collector<Alert> out)
          throws Exception {
        log.info("{}", rule);
        BroadcastState<Integer, Rule> broadcastState =
            ctx.getBroadcastState(Descriptors.rulesDescriptor);
        handleRuleBroadcast(rule, broadcastState);
        updateWidestWindowRule(rule, broadcastState);
        if (rule.getRuleState() == RuleState.CONTROL) {
          handleControlCommand(rule, broadcastState, ctx);
        }
      }
    
      private void handleControlCommand(
          Rule command, BroadcastState<Integer, Rule> rulesState, Context ctx) throws Exception {
        ControlType controlType = command.getControlType();
        switch (controlType) {
          case EXPORT_RULES_CURRENT:
            for (Map.Entry<Integer, Rule> entry : rulesState.entries()) {
              ctx.output(Descriptors.currentRulesSinkTag, entry.getValue());
            }
            break;
          case CLEAR_STATE_ALL:
            ctx.applyToKeyedState(windowStateDescriptor, (key, state) -> state.clear());
            break;
          case CLEAR_STATE_ALL_STOP:
            rulesState.remove(CLEAR_STATE_COMMAND_KEY);
            break;
          case DELETE_RULES_ALL:
            Iterator<Entry<Integer, Rule>> entriesIterator = rulesState.iterator();
            while (entriesIterator.hasNext()) {
              Entry<Integer, Rule> ruleEntry = entriesIterator.next();
              rulesState.remove(ruleEntry.getKey());
              log.info("Removed Rule {}", ruleEntry.getValue());
            }
            break;
        }
      }
    
      private boolean isStateValueInWindow(
          Long stateEventTime, Long windowStartForEvent, long currentEventTime) {
        return stateEventTime >= windowStartForEvent && stateEventTime <= currentEventTime;
      }
    
      private void aggregateValuesInState(
          Long stateEventTime, SimpleAccumulator<BigDecimal> aggregator, Rule rule) throws Exception {
        Set<Transaction> inWindow = windowState.get(stateEventTime);
        if (COUNT.equals(rule.getAggregateFieldName())
            || COUNT_WITH_RESET.equals(rule.getAggregateFieldName())) {
          for (Transaction event : inWindow) {
            aggregator.add(BigDecimal.ONE);
          }
        } else {
          for (Transaction event : inWindow) {
            BigDecimal aggregatedValue =
                FieldsExtractor.getBigDecimalByName(rule.getAggregateFieldName(), event);
            aggregator.add(aggregatedValue);
          }
        }
      }
    
      private boolean noRuleAvailable(Rule rule) {
        // This could happen if the BroadcastState in this CoProcessFunction was updated after it was
        // updated and used in `DynamicKeyFunction`
        if (rule == null) {
          return true;
        }
        return false;
      }
    
      private void updateWidestWindowRule(Rule rule, BroadcastState<Integer, Rule> broadcastState)
          throws Exception {
        Rule widestWindowRule = broadcastState.get(WIDEST_RULE_KEY);
    
        if (rule.getRuleState() != Rule.RuleState.ACTIVE) {
          return;
        }
    
        if (widestWindowRule == null) {
          broadcastState.put(WIDEST_RULE_KEY, rule);
          return;
        }
    
        if (widestWindowRule.getWindowMillis() < rule.getWindowMillis()) {
          broadcastState.put(WIDEST_RULE_KEY, rule);
        }
      }
    
      // ontimer方法中可以访问/更新键值分区状态,读取广播状态,此外ontimer方法和processElement方法以及processBroadcastElement方法是同步的,不需要考虑并发访问的问题
      @Override
      public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<Alert> out)
          throws Exception {
    
        Rule widestWindowRule = ctx.getBroadcastState(Descriptors.rulesDescriptor).get(WIDEST_RULE_KEY);
    
        Optional<Long> cleanupEventTimeWindow =
            Optional.ofNullable(widestWindowRule).map(Rule::getWindowMillis);
        Optional<Long> cleanupEventTimeThreshold =
            cleanupEventTimeWindow.map(window -> timestamp - window);
    
        cleanupEventTimeThreshold.ifPresent(this::evictAgedElementsFromWindow);
      }
    
      private void evictAgedElementsFromWindow(Long threshold) {
        try {
          Iterator<Long> keys = windowState.keys().iterator();
          while (keys.hasNext()) {
            Long stateEventTime = keys.next();
            if (stateEventTime < threshold) {
              keys.remove();
            }
          }
        } catch (Exception ex) {
          throw new RuntimeException(ex);
        }
      }
    
      private void evictAllStateElements() {
        try {
          Iterator<Long> keys = windowState.keys().iterator();
          while (keys.hasNext()) {
            keys.next();
            keys.remove();
          }
        } catch (Exception ex) {
          throw new RuntimeException(ex);
        }
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268

    ps: ontimer方法和processElement方法是同步访问的,没有并发的问题,所以不需要考虑同时更新键值分区状态的线程安全问题

    参考文献:
    https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/

  • 相关阅读:
    浅析Linux进程间通信方式之消息队列
    蓝牙耳机什么牌子的好用?口碑比较好的国产蓝牙耳机推荐
    数据库中常见的六种约束,有一种MySql不支持,你知道是哪个吗?
    权重叠加:如果是复合选择器,则会有权重叠加需要计算权重
    Java--Spring应用在Web项目
    微服务系统设计——商场停车需求分析
    南京--ChatGPT/GPT4 科研实践应用
    体系结构26_输入输出系统(3)
    有效 TCP RST
    quartz框架(七)-JobStore
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/134191767