• flink中维表Join几种常见方式总结


    flink中维表Join

    需求如下:
    一个主流中数据是用户信息,字段包括用户姓名、城市id;
    维表是城市数据,字段包括城市ID、城市名称。
    要求用户表与城市表关联,输出为:用户名称、城市ID、城市名称。

    (1)预加载维表信息

    通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在probe流map()方法中与维表数据进行关联。

    RichMapFunction中open方法里加载维表数据到内存的方式特点如下:
    + 优点:实现简单.
    + 缺点:因为数据存于内存,所以只适合小数据量并且维表数据更新频率不高的情况下。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况。

    代码实现:

    public class _01_DimJoin {
    
        public static void main(String[] args) throws Exception {
    
            // 获取的执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 获取主流数据
            DataStreamSource<String> sourceStream = env.socketTextStream("hadoop01", 9999);
    
            // 原始数据的格式为user,1000(用户姓名、城市id)
            SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = sourceStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String line) throws Exception {
                    String[] arr = line.split(",");
                    return Tuple2.of(arr[0], Integer.valueOf(arr[1]));
                }
            });
    
            // 预加载维表
            SingleOutputStreamOperator<Tuple3<String, Integer, String>> resStream = mapStream.map(new _01_MyMapFunction());
    
            // 打印数据
            resStream.print();
            env.execute("_01_DimJoin");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    package com.yyds.flink.functions;
    
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.configuration.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 预加载维表方式 与  主流数据进行join
     */
    public class _01_MyMapFunction extends RichMapFunction<Tuple2<String,Integer>, Tuple3<String,Integer,String>> {
    
        //定义一个变量,用于保存维表数据在内存
        Map<Integer, String> dim;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            //在open方法中读取维表数据,可以从数据库中读取、文件中读取、接口中读取等等。
            dim = new HashMap<>();
            dim.put(1001,"北京");
            dim.put(1002,"上海");
            dim.put(1003,"广州");
            dim.put(1004,"深圳");
            // open中可以定时更新维表
            // 创建线程池
            ScheduledExecutorService executors = Executors.newSingleThreadScheduledExecutor();
            // 每隔5 min 更新一次
            executors.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    reload();
                }
            },5,5, TimeUnit.MINUTES);
        }
    
        // 加载
        private void reload() {
            dim.put(1004,"深圳_" + System.currentTimeMillis());
        }
    
        @Override
        public Tuple3<String, Integer, String> map(Tuple2<String, Integer> tp2) throws Exception {
            // 在map 方法中进行主流  和 维表的关联
            String cityName = "";
            if(dim.containsKey(tp2.f1)){
                cityName = dim.get(tp2.f1);
            }
            return new Tuple3<>(tp2.f0,tp2.f1,cityName);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    (2)热存储维表

    这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:

    • 优点:维度数据量不受内存限制,可以存储很大的数据量。
    • 缺点:因为维表数据在外部存储中,读取速度受制于外部存储的读取速度;另外维表的同步也有延迟。

    1) 使用cache来减轻访问压力

     可以使用缓存来存储一部分常访问的维表数据,以减少访问外部系统的次数,比如使用guava Cache。
    
    • 1

    代码实现:

    public class _02_DimJoin {
        public static void main(String[] args) throws Exception{
    
            // 获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
    
            DataStreamSource<String> streamSource = env.socketTextStream("hadoop01", 9999);
    
            // 原始数据的格式为user,1000(用户姓名、城市id)
            SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = streamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String line) throws Exception {
                    String[] arr = line.split(",");
                    return Tuple2.of(arr[0], Integer.valueOf(arr[1]));
                }
            });
    
            // 预加载维表(使用缓存cache)
            SingleOutputStreamOperator<Tuple3<String, Integer, String>> resStream = mapStream.map(new _02_MyMapFunction());
    
            // 打印数据
            resStream.print();
            env.execute("_02_DimJoin");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    /**
     * 热存储维表: 使用cache来减轻访问压力
     */
    public class _02_MyMapFunction extends RichMapFunction<Tuple2<String,Integer>, Tuple3<String,Integer,String>> {
    
        LoadingCache<Integer,String> dim;
    
        private String readFromHbase(Integer cityId) {
            //读取hbase
            //这里写死,模拟从hbase读取数据
            Map<Integer, String> temp = new HashMap<>();
            temp.put(1001,"北京");
            temp.put(1002,"上海");
            temp.put(1003,"广州");
            temp.put(1004,"深圳");
            String cityName = "";
            if (temp.containsKey(cityId)) {
                cityName = temp.get(cityId);
            }
    
            return cityName;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 使用google LoadingCache进行缓存
           dim =  CacheBuilder.newBuilder()
                    // 最多缓存的个数,超过了就根据最近最少使用的算法来移除缓存
                    .maximumSize(1000)
                    // 更新后的指定时间后回收
                    .expireAfterWrite(10,TimeUnit.MINUTES)
                    // 指定移除通知
                    .removalListener(new RemovalListener<Integer, String>() {
                        @Override
                        public void onRemoval(RemovalNotification<Integer, String> notification) {
                            System.out.println(notification.getKey() + "被移除了,值为:" + notification.getValue());
                        }
                    }).build(
                            // 指定加载缓存的逻辑
                    new CacheLoader<Integer, String>() {
                        @Override
                        public String load(Integer cityId) throws Exception {
                            String cityName = readFromHbase(cityId);
                            return cityName;
                        }
                    }
            );
    
            System.out.println("open方法中:" + dim.asMap().toString());
    
    
    
        }
    
        @Override
        public Tuple3<String, Integer, String> map(Tuple2<String, Integer> tp2) throws Exception {
            System.out.println("map方法中:" +  dim.asMap().toString());
            // 主流 和 维表进行关联
            String cityName = "";
            if(dim.get(tp2.f1) != null){
                cityName = dim.get(tp2.f1);
            }
            return new Tuple3<>(tp2.f0,tp2.f1,cityName);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    (2) 使用异步IO

    Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,
    这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。

    Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,不过目前很多系统都支持异步IO客户端。但是如果使用异步就要涉及到三个问题:

    • 超时:如果查询超时那么就认为是读写失败,需要按失败处理;

    • 并发数量:如果并发数量太多,就要触发Flink的反压机制来抑制上游的写入;

    • 返回顺序错乱:顺序错乱了要根据实际情况来处理,Flink支持两种方式:允许乱序、保证顺序。

    public class _03_DimJoin {
        public static void main(String[] args) throws Exception{
    
            // 获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> streamSource = env.socketTextStream("hadoop01", 9999);
    
            // 原始数据的格式为user,1000(用户姓名、城市id)
            SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = streamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String line) throws Exception {
                    String[] arr = line.split(",");
                    return Tuple2.of(arr[0], Integer.valueOf(arr[1]));
                }
            });
    
            // 预加载维表(使用异步IO)
            // 保证顺序:异步返回的结果保证顺序
            SingleOutputStreamOperator<Tuple3<String, Integer, String>> resStream = AsyncDataStream.orderedWait(
                    mapStream,
                    new _03_MyRichAsyncFunction(),
                    1000L, // 超时时间为1S
                    TimeUnit.MILLISECONDS,
                    2
            ).setParallelism(1);;
    
            SingleOutputStreamOperator<Tuple3<String,Integer, String>> unorderedResult = AsyncDataStream
                    //允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压
                    .unorderedWait(mapStream, new _03_MyRichAsyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
                    .setParallelism(1);
    
            // 打印数据
            resStream.print();
            env.execute("_02_DimJoin");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    
    
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 热存储维表—使用异步IO(从mysql中读取数据)
     */
    public class _03_MyRichAsyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
    
    
        // 链接
        private static String jdbcUrl = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false";
        private static String username = "root";
        private static String password = "zsd123456";
        private static String driverName = "com.mysql.jdbc.Driver";
        java.sql.Connection conn;
        PreparedStatement ps;
    
    
        // 从mysql中读取数据
        @Override
        public void open(Configuration parameters) throws Exception {
            Class.forName(driverName);
            conn = DriverManager.getConnection(jdbcUrl, username, password);
            ps = conn.prepareStatement("select city_name from city_info where id = ?");
        }
    
        // 异步方法
        @Override
        public void asyncInvoke(Tuple2<String, Integer> tp2, ResultFuture<Tuple3<String, Integer, String>> resultFuture) throws Exception {
            ps.setInt(1,tp2.f1);
            ResultSet rs = this.ps.executeQuery();
            String cityName = "";
            if(rs.next()){
                cityName = rs.getString(1);
            }
    
            List list = new ArrayList<Tuple3<String, Integer, String>>();
            list.add(new Tuple3<>(tp2.f0,tp2.f1,cityName));
            resultFuture.complete(list);
        }
    
        // 超时处理
        @Override
        public void timeout(Tuple2<String, Integer> tp2, ResultFuture<Tuple3<String, Integer, String>> resultFuture) throws Exception {
    
            List list = new ArrayList<Tuple2<Integer, String>>();
            list.add(new Tuple3<>(tp2.f0,tp2.f1, ""));
            resultFuture.complete(list);
        }
    
        @Override
        public void close() throws Exception {
            // 关闭连接
            conn.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    (3)广播

    利用Flink的Broadcast State将维度数据流广播到下游做join操作。

    • 优点:维度数据变更后可以即时更新到结果中。
    • 缺点:数据保存在内存中,支持的维度数据量比较小。
    public class _04_BroadCastDimJoin {
        public static void main(String[] args) throws Exception {
    
            // 获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> streamSource = env.socketTextStream("hadoop01", 9999);
    
            // 原始数据的格式为user,1000(用户姓名、城市id)
            SingleOutputStreamOperator<Tuple2<String, Integer>> mainStream = streamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String line) throws Exception {
                    String[] arr = line.split(",");
                    return Tuple2.of(arr[0], Integer.valueOf(arr[1]));
                }
            });
    
    
            //定义城市流(维度数据流)
            SingleOutputStreamOperator<Tuple2<Integer, String>> cityStream = env.socketTextStream("hadoop01", 8888, "\n")
                    .map(p -> {
                        //输入格式为:城市ID,城市名称
                        String[] list = p.split(",");
                        return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
                    }).returns(new TypeHint<Tuple2<Integer, String>>() {
                    });
    
            // 将城市流定义为广播流
            MapStateDescriptor<Integer, String> broadCastDes = new MapStateDescriptor<Integer, String>("broad",Integer.class,String.class);
    
            BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadCastDes);
    
            // 主流 和 广播流 进行连接
            SingleOutputStreamOperator<Tuple3<String, Integer, String>> resStream = mainStream.connect(broadcastStream)
                    .process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() {
    
                        MapStateDescriptor<Integer, String> broadCastDes = new MapStateDescriptor<>("broad", Integer.class, String.class);
    
                        @Override
                        public void processBroadcastElement(Tuple2<Integer, String> value, Context context, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
                            System.out.println("收到广播流: " + value);
                            context.getBroadcastState(broadCastDes).put(value.f0, value.f1);
                        }
    
    
                        @Override
                        public void processElement(Tuple2<String, Integer> value, ReadOnlyContext readOnlyContext, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
                            // 处理非广播流,关联维度
                            ReadOnlyBroadcastState<Integer, String> broadcastState = readOnlyContext.getBroadcastState(broadCastDes);
    
                            String cityName = "";
                            if (broadcastState.contains(value.f1)) {
                                cityName = broadcastState.get(value.f1);
                            }
                            collector.collect(new Tuple3<>(value.f0, value.f1, cityName));
                        }
    
    
                    });
            resStream.print();
    
            env.execute("_04_BroadCastDimJoin");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    (4) Lookup join(flinksql 维表join)

    可以参考:https://blog.csdn.net/qq_44665283/article/details/125908709

  • 相关阅读:
    前端异步+token下载文件
    Nginx基于Basic Auth实现静态资源的访问权限控制
    Java synchronized 关键字
    python项目-飞机大战
    华为新设备升级示例
    【数据结构】二叉树
    Python图像处理【1】图像与视频处理
    第八章:最新版零基础学习 PYTHON 教程—Python 初学者项目(Python 3 和 C 中的猜数字游戏)
    sql基础语法
    Linux学习第42天:Linux RS232/485/GPS 驱动实验:天外来客
  • 原文地址:https://blog.csdn.net/qq_44665283/article/details/125909562