• flink keyby指定key方式详解


    1.keyby算子

    keyby是flink中非常常见的操作。其作用为在逻辑上将流划分为不相交的分区,而具有相同key的数据都分配到同一个分区。这种操作在各种大数据计算引擎中都非常常见,比如最早的mapreduce,从map阶段到reduce阶段,就是通过shuffle操作将具有相同key的数据分配到同一个reduce端进行处理。在flink内部,keyby是通过哈希分区来实现的,并且自带有多种指定key的方式。

    2.源码分析

    我们先通过源码,来看看keyby指定key的几种不同方式,flink版本1.7.2

    	/**
    	 * Partitions the operator state of a {@link DataStream} by the given key positions.
    	 *
    	 * @param fields
    	 *            The position of the fields on which the {@link DataStream}
    	 *            will be grouped.
    	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
    	 */
    	public KeyedStream keyBy(int... fields) {
    		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
    			return keyBy(KeySelectorUtil.getSelectorForArray(fields, getType()));
    		} else {
    			return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    第一种方式,通过指定字段的位置来进行分组,输入参数为一个或多个整数,整数即代表字段对应位置。

    	/**
    	 * Partitions the operator state of a {@link DataStream} using field expressions.
    	 * A field expression is either the name of a public field or a getter method with parentheses
    	 * of the {@link DataStream}'s underlying type. A dot can be used to drill
    	 * down into objects, as in {@code "field1.getInnerField2()" }.
    	 *
    	 * @param fields
    	 *            One or more field expressions on which the state of the {@link DataStream} operators will be
    	 *            partitioned.
    	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
    	 **/
    	public KeyedStream keyBy(String... fields) {
    		return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    第二种方式,通过指定字段名来指定key。这个字段名是有一定要求的,后面我们再详细解释。

    	/**
    	 * It creates a new {@link KeyedStream} that uses the provided key for partitioning
    	 * its operator states.
    	 *
    	 * @param key
    	 *            The KeySelector to be used for extracting the key for partitioning
    	 * @return The {@link DataStream} with partitioned state (i.e. KeyedStream)
    	 */
    	public  KeyedStream keyBy(KeySelector key) {
    		Preconditions.checkNotNull(key);
    		return new KeyedStream<>(this, clean(key));
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    第三种方式,通过KeySelector的方式指定。

    而KeySelector是一个接口,里面只有一个方法getKey,我们使用的时候实现getKey方法即可。

    @FunctionalInterface
    public interface KeySelector extends Function, Serializable {
    	KEY getKey(IN value) throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4

    3.通过字段号指定key

    通过字段号指定key相对比较简单,直接看一个wordcount例子即可。

        public static void baseVersion() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStream stream = env.fromElements("java python c python python c");
            DataStream> flatstream = stream.flatMap(new FlatMapFunction>() {
                @Override
                public void flatMap(String value, Collector> out) throws Exception {
                    for(String word: value.split("\\W+")) {
                        out.collect(new Tuple2<>(word, 1));
                    }
                }
            })
                    .keyBy(0)
                    .sum(1);
    
            flatstream.print();
            env.execute("keyby base version");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    keyBy(0)表示对第一个字段,即word进行分区,而sum(1)则表示对第二个字段即count进行求和。

    4.通过字段名指定key

    通过字段号指定key使用比较简单方便,但是如果是比较复杂的场景,就不好搞定了。比如如果数据是个比较复杂的嵌套结构Tuple2, Integer>,如果我们想对内部嵌套的Tuple2的第一个字段进行keyby操作,就无法通过字段号来操作,这个时候我们可以通过字段名的方式来进行代替。
    字段名的方式相对来说复杂一些,下面我们来进行示范。

    还是先以简单的wordcount为例。

    先定义个内部静态类,静态类包含有两个字段,分别为word与count。

        public static final class WC {
            public String word;
            public int count;
            public WC() {}
            public WC(String word, int count) {
                this.word = word;
                this.count = count;
            }
            public String getWord() {
                return word;
            }
            public int getCount() {
                return count;
            }
            public void setWord(String word) {
                this.word = word;
            }
            public void setCount(int count) {
                this.count = count;
            }
    
            @Override
            public String toString() {
                return this.word + ": " + this.count;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    该POLO类中的两个字段word与count,可以传到keyby算子中。

    然后再进行flink相关代码的编写。

        public static void nameVersion() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);`在这里插入代码片`
            WC wc1 = new WC("java", 1);
            WC wc2 = new WC("python", 2);
            WC wc3 = new WC("c", 3);
            WC wc4 = new WC("c", 4);
            WC wc5 = new WC("java", 5);
    
            DataStream stream = env.fromElements(wc1, wc2, wc3, wc4, wc5);
            stream = stream.keyBy("word").sum("count");
            stream.print();
    
            env.execute("keyed base");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    上面代码输出为

    java: 1
    python: 2
    c: 3
    c: 7
    java: 6
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意上面的WC pojo类是有要求的
    1.keyby中的字段名必须与pojo类的字段名一致。
    2.pojo类一定要提供默认的构造函数,否则代码会报如下错误。

    Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType) cannot be used as key.
    	at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:330)
    	at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:337)
    	...
    
    • 1
    • 2
    • 3
    • 4

    3.字段需要提供get/set方法。(但是在1.7.2版本测试,如果对字段不提供get/set方法,wordcount代码也可以正常运行)

    5.通过嵌套字段名指定key

    接下来我们看嵌套的字段名如何在keyby中被指定。

        public static final class WC {
            public int count;
            public InnerClass inner;
            public WC() {}
            public WC(InnerClass inner, int count) {
                this.inner = inner;
                this.count = count;
            }
    
            public int getCount() {
                return count;
            }
            public WC setCount(int count) {
                this.count = count;
                return this;
            }
    
            @Override
            public String toString() {
                return this.inner.name + ": " + this.count;
            }
        }
        public static final class InnerClass {
            public String name;
            public String department;
    
            public InnerClass() {}
            public InnerClass(String name) {
                this.name = name;
            }
            public String getName() {
                return name;
            }
            public InnerClass setName(String name) {
                this.name = name;
                return this;
            }
            public String getDepartment() {
                return department;
            }
            public InnerClass setDepartment(String department) {
                this.department = department;
                return this;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    首先我们定义了两个pojo类,一个是WC类,包含有count字段以及InnerClass对象。而InnerClass有name与department两个字段。
    有同学可能会问,搞这么复杂干嘛,直接将所有字段定义到WC类中不就好了。同学们,我们这里是演示嵌套字段的用法…

    接下来,我们想将name字段指定为keyby中的key

        public static void run() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            InnerClass inn1 = new InnerClass("jojo"); InnerClass inn2 = new InnerClass("jojo");
            InnerClass inn3 = new InnerClass("lili"); InnerClass inn4 = new InnerClass("lili");
            WC wc1 = new WC(inn1, 1);
            WC wc2 = new WC(inn2, 2);
            WC wc3 = new WC(inn3, 3);
            WC wc4 = new WC(inn4, 4);
    
            DataStream stream = env.fromElements(wc1, wc2, wc3, wc4)
                    .keyBy("inner.name")
                    .sum("count");
            stream.print();
            env.execute("keyby complex version");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    上面的例子中
    count指的是WC中的count字段
    inner.word指的是InnerClass中的word字段,inner则表示WC类中的inner属性。

    这样就达到了指定复杂嵌套结构中key的目的。

    6.通过KeySelector的方式指定key

    看一个例子,就能明白上述方式的用法。

        public static void keyselect() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStream stream = env.fromElements("java python c python python c");
            DataStream> flatstream = stream.flatMap(new FlatMapFunction>() {
                @Override
                public void flatMap(String value, Collector> out) throws Exception {
                    for(String word: value.split("\\W+")) {
                        out.collect(new Tuple2<>(word, 1));
                    }
                }
            })
                    .keyBy((KeySelector, Object>) value -> value.f0)
                    .sum(1);
    
            flatstream.print();
            env.execute("key select version");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  • 相关阅读:
    (十九)STM32——输入捕获
    【Vue2.x源码系列04】依赖收集原理(Dep、Watcher、Observer)
    智能安防监控如何助力汽车4S店信息化精细化管理?最大程度做到降本增效?
    大学刚毕业,如何才能拿到月薪10K+的offer?
    对doccano自动标注使用的默认UIE模型进行微调以提高特定领域的实体识别能力,提高标注速度
    【室内设计作品】关于崂山酒店,我有一个新设计
    C++ Qt / VS2019 +opencv + onnxruntime 部署语义分割模型【经验】
    MYSQLg高级------批量插入百万级数据量
    【leetcode 力扣刷题】栈—波兰式///逆波兰式相关知识和题目
    NodeJS爬取墨刀上的设计图片
  • 原文地址:https://blog.csdn.net/bitcarmanlee/article/details/126902923