• 广播状态实现注意事项


    背景:

    日常我们事件流总要关联上其他的静态数据来组成一条完整的记录,例如事件流+规则表来组合出一条完整的记录流,这个时候规则表就要设置成广播状态的形式来支持快速流操作

    技术实现

    // 广播处理函数
            new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
    
                // 键值分区状态
                private final MapStateDescriptor<String, List<Item>> mapStateDesc =
                        new MapStateDescriptor<>(
                                "items",
                                BasicTypeInfo.STRING_TYPE_INFO,
                                new ListTypeInfo<>(Item.class));
    
                // 广播状态
                private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
                        new MapStateDescriptor<>(
                                "RulesBroadcastState",
                                BasicTypeInfo.STRING_TYPE_INFO,
                                TypeInformation.of(new TypeHint<Rule>() {}));
    
                @Override
                public void processBroadcastElement(Rule value,
                        Context ctx,
                        Collector<String> out) throws Exception {
                    // 更新广播状态
                    ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
                    // 这里不能访问单个键值分区状态,因为广播元素没有对应的键值key,但是这里提供一个函数可以对所有的键值key进行处理
                    ctx.applyToKeyedState(mapStateDesc, new KeyedStateFunction(){
    
                        @Override
                        public void process(Object key, State state) throws Exception {
                            // key是键值, state是状态
                            Color colorKey = (Color) key;
                            final MapState<String, List<Item>> kvMapState = (MapState<String, List<Item>>) state);
                            // 可以对每个键值进行处理
                        }
                    });
                }
    
                @Override
                public void processElement(Item value,
                        ReadOnlyContext ctx,
                        Collector<String> out) throws Exception {
                    // 操作键值分区状态
                    final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
                    final Shape shape = value.getShape();
                    // 操作广播状态
                    for (Map.Entry<String, Rule> entry :
                            ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
                        final String ruleName = entry.getKey();
                        final Rule rule = entry.getValue();
    
                        List<Item> stored = state.get(ruleName);
                        if (stored == null) {
                            stored = new ArrayList<>();
                        }
    
                        if (shape == rule.second && !stored.isEmpty()) {
                            for (Item i : stored) {
                                out.collect("MATCH: " + i + " - " + value);
                            }
                            stored.clear();
                        }
    
                        // there is no else{} to cover if rule.first == rule.second
                        if (shape.equals(rule.first)) {
                            stored.add(value);
                        }
    
                        if (stored.isEmpty()) {
                            state.remove(ruleName);
                        } else {
                            state.put(ruleName, stored);
                        }
                    }
                }
    
                //可以注册定时器
                public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<OUT> out)
                        throws Exception {
                    // the default implementation does nothing.
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    要点总结:

    1.在处理广播元素的方法processBroadcastElement中是没法访问单个键值分区状态的,因为广播元素并没有对应某个键值,但是在该方法中可以对所有的键值状态进行处理,也就是对键值状态进行统一的处理,此时你可以只处理对应的键值(不建议这么做)

    2.处理广播元素的方法processBroadcastElement中更新广播状态时不要依赖于广播元素到达的顺序,当上游算子的并行度大于1时,下游处理广播元素的算子收到的广播元素的顺序有可能不一样

    3.KeyedBroadcastProcessFunction也可以注册计时器,这个计时器是和对应的键值的key关联的

    4.广播状态也可以应用于DataStream,也就是使用BroadcastProcessFunction应用于普通的数据流,而不一定是KeyStreamed

    5.广播状态和代码中使用executor执行器定时更新内存记录的区别是广播状态可以持久化,而使用executor执行器定时更新内存记录可以不依赖于flink的状态管理,比如定时加载配置表到内存中也可以实现类似广播想要达到的效果

  • 相关阅读:
    期货-股票交易规则
    IDEA插件开发(22)--Status Bar Widgets
    英文网页批量翻译导出本地教程
    【JAVA】Java中Spring Boot如何设置全局的BusinessException
    消息队列缓存,以蓝牙消息服务为例
    数据结构--》解锁数据结构中树与二叉树的奥秘(一)
    2022年安全员-C证考试模拟100题模拟考试平台操作
    【电力系统】基于两阶段鲁棒优化算法的微网多电源容量配置附matlab代码
    对接抖音开发之售后消息实时通知订单部分退款
    基于Java+SpringBoot+Vue前后端分离人事管理系统设计和实现
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133690771