• Flink-常用算子、自定义函数以及分区策略


    5.4 转换算子

    5.4.1 map(映射)

    1. 静态内部类实现接口
    public class TransfromMapTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //从元素读取数据
            DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice","./prod?id=100",3000L));
            //进行转换计算,提取user字段
            SingleOutputStreamOperator<String> result = stream.map(new MyMapper());
            result.print();
    ​
            env.execute();
        }
        //自定义MapFunction接口
        //1.使用自定义静态内部类实现接口
        public static class MyMapper implements MapFunction<Event,String>{@Override
            public String map(Event value) throws Exception {
                return value.user;
            }
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    1. 匿名类实现
            //2.使用匿名类实现MapFunction接口
            SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() {
                @Override
                public String map(Event value) throws Exception {
                    return value.user;
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. lambda表达式

    一步到位

            //3.传入lambda表达式
            SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);
    
    • 1
    • 2

    5.4.2 过滤filter

    1. 静态内部类实现接口
    public class TransformFilterTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //从元素读取数据
            //1.使用自定义静态内部类实现接口
            DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice","./prod?id=100",3000L));
    ​
    ​
            SingleOutputStreamOperator<Event> result1 = stream.filter(new MyFilter());
    ​
    ​
            });
    ​
            result1.print();
            env.execute();
        }public static class MyFilter implements FilterFunction<Event>{@Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Mary");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    1. 匿名类实现
    SingleOutputStreamOperator<Event> result2 = stream.filter(new FilterFunction<Event>() {
                @Override
                public boolean filter(Event value) throws Exception {
                    return value.user.equals("Mary");
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. lambda表达式
            SingleOutputStreamOperator<Event> result3 = stream.filter(data -> data.user.equals("Mary"));
    
    • 1

    5.4.3 flatmap(扁平映射)

    一对多

    1. 静态内部类实现接口
    public class TransformFlatMapTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //从元素读取数据
            //1.使用自定义静态内部类实现接口
            DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice","./prod?id=100",3000L));
    ​
            stream.flatMap(new MyFlatMap()).print();
            env.execute();}public static class MyFlatMap implements FlatMapFunction<Event,String>{@Override
            //返回是用户collector收集器的collect方法收集
            //如果不掉输出的,也可以实现filter的功能
            public void flatMap(Event value, Collector<String> out) throws Exception {
                out.collect(value.user);
                out.collect(value.url);
                out.collect(value.timestamp.toString());}
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    1. 匿名类实现

    1. lambda表达式
            stream.flatMap((Event value,Collector<String> out) -> {
                if(value.user.equals("Mary")){
                    out.collect(value.url);
                }else if(value.user.equals("Bob")){
                    out.collect(value.user);
                    out.collect(value.url);
                    out.collect(value.timestamp.toString());
                }
            }).returns(new TypeHint<String>() {})//需要泛型返回
                .print();
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    5.4.4 keyby逻辑分区

    1. 分析
      在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    严格来说不是算子,没有返回SingleOutputStreamOperator类(继承DataStream),而是仅仅追加了key的逻辑分区,返回的是keyedstream键控流,只有keyedstream类才有sum,max,reduce方法在进行聚合操作

    1. 简单聚合算子使用
    public class TransformSingleAggTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice","./prod?id=100",3000L),
                    new Event("Bob","./prod?id=1",3300L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Alice","./prod?id=200",3000L),
                    new Event("Bob","./prod?id=2",3800L),
                    new Event("Bob","./prod?id=3",4200L));//按键分组之后进行聚合,提取当前用户最近一次访问数据
            stream.keyBy(new KeySelector<Event, String>() {
                @Override
                public String getKey(Event value) throws Exception {
                    return value.user;
                }
            }).max("timestamp")
                    .print("max:");//maxby
            stream.keyBy(data->data.user)
                    .maxBy("timestamp")
                    .print("maxBy:");
    ​
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    max和maxby的区别,max仅仅针对我们定义的字段去截取最大值,其他的字段采用第一条,maxby是全部字段跟着最大值字段更改

    在这里插入图片描述

    1. 归约聚合reduce

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    把集合每一个数据拿出来,然后按照一定的规则不停的规约,最终得到一个唯一规约聚合后的结果

    • 访问量最多的用户
    public class TransformReduceTest{
        public static void main(String[] args) throws Exception  {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Alice", "./prod?id=200", 3000L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Bob", "./prod?id=2", 3800L),
                    new Event("Bob", "./prod?id=3", 4200L));//计算访问量最大的数据,先计算访问量,后求max
            //1.统计每个用户的访问频次
            SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> map(Event value) throws Exception {
                            return Tuple2.of(value.user, 1L);
                        }
                    }).keyBy(data -> data.f0)
                    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                        @Override
                        //values是累计的数据,values2是1
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> values1, Tuple2<String, Long> values2) throws Exception {
                            return Tuple2.of(values1.f0, values1.f1 + values2.f1);
                        }
                    });//2.选取当前最活跃的用户
            //所有数据都分配到相同的一个key,也就会分配到一个分区
            SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key")
                    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                            return value1.f1 > value2.f1 ? value1 : value2;
                        }
                    });
            result.print();
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 结果
    (Mary,1)
    (Bob,1)
    (Alice,1)
    (Bob,2)
    (Alice,2)
    (Bob,3)
    (Bob,4)
    (Bob,5)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    5.4.5 用户自定义函数

    1. 富函数类

    在这里插入图片描述

    RichMapFunction是一个抽象类,继承自AbstractRichFunction抽象富函数类,同时实现了MapFunction接口

    并且有抽象的map方法

    1. 代码
    public class TransformRichFunctionTest {
        public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Alice", "./prod?id=200", 3000L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Bob", "./prod?id=2", 3800L),
                    new Event("Bob", "./prod?id=3", 4200L));
    ​
            stream.map(new MyRichMapper()).setParallelism(2).print();
            env.execute();}//实现一个自定义的富函数类
        //RichMapFunction是抽象类
        public static class MyRichMapper extends RichMapFunction<Event,Integer>{//继承AbstractRichFunction抽象富函数类的声明周期方法
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //可以自定义状态,使得可以进行聚合功能的使用
                //这边获取子任务的索引号
                System.out.println("open声明周期被调用"+getRuntimeContext().getIndexOfThisSubtask()+"号任务启动");
            }//RichMapFunction抽象类的map
            @Override
            public Integer map(Event value) throws Exception {
                return value.url.length();
            }@Override
            public void close() throws Exception {
                super.close();
                System.out.println("close声明周期被调用"+getRuntimeContext().getIndexOfThisSubtask()+"号任务启动");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    1. 结果
    open声明周期被调用0号任务启动
    open声明周期被调用1号任务启动
    close声明周期被调用0号任务启动
    close声明周期被调用1号任务启动
    6
    13
    13
    11
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    5.4.6 物理分区

    与keyby不同的是,可以制定分区策略

    1. shuffle随机分区
    • 代码
    public class TransformPartitionTest {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Alice", "./prod?id=200", 3000L),
                    new Event("Bob", "./home", 3500L),
                    new Event("Bob", "./prod?id=2", 3800L),
                    new Event("Bob", "./prod?id=3", 4200L));
    ​
            stream.shuffle().print().setParallelism(4);
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 结果
    1> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
    4> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
    2> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
    3> Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3}
    2> Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5}
    4> Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2}
    1> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0}
    2> Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8}Process finished with exit code 0
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    1. 轮询分区
            //2.轮询分区
            stream.rebalance().print().setParallelism(4);
            env.execute();
    
    • 1
    • 2
    • 3
    • 结果
    2> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
    1> Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3}
    3> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
    1> Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2}
    4> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}
    3> Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5}
    2> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0}
    4> Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. rescale重缩放分区

    跟rebalance区别是按照自己小组分区

    目的是为了减少网络传输损耗,减少资源

    • 代码
        //3.rescale重缩放分区
            //DataSource并行度为1,因此重新创建自定义Source,运行上下文富函数类,可以获取运行时的上下文
            env.addSource(new RichParallelSourceFunction<Integer>() {@Override
                public void run(SourceContext<Integer> ctx) throws Exception {
                    for(int i =1 ;i<=8;i++){
                        //将奇数偶数发送到0号和1号并行分区
                        if(i%2== getRuntimeContext().getIndexOfThisSubtask()){
                            ctx.collect(i);
                        }
                    }
                }@Override
                public void cancel() {}
            }).setParallelism(2)
                    .rescale()
                    .print()
                    .setParallelism(4);
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 结果
    2> 4
    1> 2
    3> 1
    3> 5
    4> 3
    1> 6
    2> 8
    4> 7
    ​
    Process finished with exit code 0
    1,3,5,7对应输出的是3,4的分区
    2,4,6,8对应输出的是1,2分区
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 如果是rebalance的话,就是1,3,5,7是1,2,3,4完全轮询分区
            env.addSource(new RichParallelSourceFunction<Integer>() {@Override
                public void run(SourceContext<Integer> ctx) throws Exception {
                    for(int i =1 ;i<=8;i++){
                        //将奇数偶数发送到0号和1号并行分区
                        if(i%2== getRuntimeContext().getIndexOfThisSubtask()){
                            ctx.collect(i);
                        }
                    }
                }@Override
                public void cancel() {}
            }).setParallelism(2)
                    .rebalance()
                    .print()
                    .setParallelism(4);
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 结果
    3> 6
    1> 2
    2> 4
    4> 8
    3> 5
    4> 7
    2> 3
    1> 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 广播分区
    • 概念

    一个数据分发到所有并行子任务上

    • 代码
            stream.broadcast().print().setParallelism(4);
    
    • 1
    • 结果
    1> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
    4> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
    3> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
    2> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:01.0}
    3> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
    4> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
    1> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
    2> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
    ​
    一条数据四个分区全部并行输出
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 全局分区
    • 概念

    把所有数据分配到一个子任务上

    • 代码
            //5.全局分区
            stream.global().print().setParallelism(4);
    
    • 1
    • 2
    • 注意

    会造成系统压力

    1. 自定义重分区
    • 代码
    //6.自定义重分区
            env.fromElements(1,2,3,4,5,6,7,8)
                            //传入分区器Partitioner和keyselector(类比keyby了)
                            .partitionCustom(new Partitioner<Integer>() {
                                @Override
                                //返回的int0或者1就表示了物理分区的索引号
                                //指定当前分区策略,只有第一个和第二个分区会输出信息 
                                public int partition(Integer key, int numPartitions) {
                                    return key%2;
                                }//Integer输入,Integer输出
                            }, new KeySelector<Integer, Integer>() {@Override
                                public Integer getKey(Integer value) throws Exception {
                                    //按照value数字分组
                                    return value;
                                }
                            })
                            .print().setParallelism(4);
    ​
    ​
            env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 结果
    1> 2
    1> 4
    2> 1
    1> 6
    2> 3
    1> 8
    2> 5
    2> 7Process finished with exit code 0
     //指定当前分区策略,只有第一个和第二个分区会输出信息 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
  • 相关阅读:
    你好,区块链操作系统上的完全去中心化国际象棋
    基于沙猫群优化算法的线性规划求解matlab程序
    包教包会:Mysql主从复制搭建
    Android12 启动页适配
    u盘文件突然不见了如何找回呢?
    股票魔法师第二阶段趋势模板选股公式,寻找上涨趋势
    【Linux进行时】环境变量and进程优先级
    ESP8266-Arduino编程实例-LM75温度传感器驱动
    利用 API 接口进行自动代码生成的最佳实践
    判断二叉树是否为满二叉树
  • 原文地址:https://blog.csdn.net/m0_46507516/article/details/127914313