• flink的KeyedBroadcastProcessFunction测试


    背景

    我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试,以确保上线之前这个函数的功能是正常的,包括里面的广播状态和键值分区状态

    测试KeyedBroadcastProcessFunction类

        @Test
        public void testHarnessForKeyedBroadcastProcessFunction() throws Exception {
            KeyedBroadcastProcessFunction<String, String, String, String> function = new MyKeyedBroadcastProcessFunction();
    
            // 键值分区状态
            final ValueStateDescriptor<String> valueStateDescriptor =
                    new ValueStateDescriptor<>("item", BasicTypeInfo.STRING_TYPE_INFO);
            // 广播状态
            final MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",
                    BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    
            KeyedBroadcastOperatorTestHarness<String, String, String, String> harness =
                    ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction(function, x -> x,
                            TypeInformation.of(String.class), ruleStateDescriptor);
    
            harness.processBroadcastElement("0", 1);
            harness.processBroadcastElement("000", 2);
            harness.processElement("1", 10);
    
            // 判断键值分区状态(注意这里最好就只是某个key下面,也就是分组key直接设置为x->"固定常数值"即可)
            ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);
            Assert.assertEquals(valueState.value(), "1");
    
            // 判断广播状态
            BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);
            Assert.assertTrue(broadcastState.contains("0"));
            Assert.assertTrue(broadcastState.contains("000"));
    
            // 判断输出的列表
            Assert.assertEquals(harness.extractOutputValues(), Arrays.asList("0", "000", "1"));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    关键代码:
    1.获取键值分区状态

    ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);
    
    • 1

    2.获取广播状态:

    BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);
    
    • 1

    3.工具类

    public class ProcessFunctionTestHarnesses {
        public ProcessFunctionTestHarnesses() {
        }
    
        public static <IN, OUT> OneInputStreamOperatorTestHarness<IN, OUT> forProcessFunction(ProcessFunction<IN, OUT> function) throws Exception {
            OneInputStreamOperatorTestHarness<IN, OUT> testHarness = new OneInputStreamOperatorTestHarness(new ProcessOperator((ProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);
            testHarness.setup();
            testHarness.open();
            return testHarness;
        }
    
        public static <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> forKeyedProcessFunction(KeyedProcessFunction<K, IN, OUT> function, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {
            KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedProcessOperator((KeyedProcessFunction)Preconditions.checkNotNull(function)), keySelector, keyType, 1, 1, 0);
            testHarness.open();
            return testHarness;
        }
    
        public static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> forCoProcessFunction(CoProcessFunction<IN1, IN2, OUT> function) throws Exception {
            TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> testHarness = new TwoInputStreamOperatorTestHarness(new CoProcessOperator((CoProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);
            testHarness.open();
            return testHarness;
        }
    
        public static <K, IN1, IN2, OUT> KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> forKeyedCoProcessFunction(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {
            KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator((KeyedCoProcessFunction)Preconditions.checkNotNull(function)), keySelector1, keySelector2, keyType, 1, 1, 0);
            testHarness.open();
            return testHarness;
        }
    
        public static <IN1, IN2, OUT> BroadcastOperatorTestHarness<IN1, IN2, OUT> forBroadcastProcessFunction(BroadcastProcessFunction<IN1, IN2, OUT> function, MapStateDescriptor<?, ?>... descriptors) throws Exception {
            BroadcastOperatorTestHarness<IN1, IN2, OUT> testHarness = new BroadcastOperatorTestHarness(new CoBroadcastWithNonKeyedOperator((BroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), 1, 1, 0);
            testHarness.open();
            return testHarness;
        }
    
        public static <K, IN1, IN2, OUT> KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> forKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector, TypeInformation<K> keyType, MapStateDescriptor<?, ?>... descriptors) throws Exception {
            KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedBroadcastOperatorTestHarness(new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), keySelector, keyType, 1, 1, 0);
            testHarness.open();
            return testHarness;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
  • 相关阅读:
    【系统设计】S3 对象存储
    IDEA中如何查看自己的SpringBoot的版本
    鸿蒙 API9 接入 Crypto库
    右值引用, 完美转发, 万能引用, lambda表达式, 包装器 用法
    力扣 设计链表707
    CorelDRAWX4的C++插件开发(四十二)纯C++插件开发(6)其它invoke的DISPID的功能如打印时鼠标点击时等等
    Jetpack学习之Navigation(1)
    条件语句 Switch 、if 、if ...else 、if ..else if ...else
    使用HTML5画布(Canvas)模拟图层(Layers)效果
    六自由度机械臂参数化设计
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/134300178