• Flink之输出算子Redis Sink


    Redis Sink

    在新版Flink的文档中,并没有发现Redis Sink的具体使用,但可通过 Apache Bahir了解到其具体使用

    Redis具有其极高的写入读取性能,因此也是经常使用的Sink之一。可以使用Java Redis客户端Jedis手动实现,也可以使用Flink和Bahir提供的实现来实现。

    开源实现的Redis Connector使用非常方便,但是无法使用一些Jedis中的高级功能,如设置过期时间等

    jedis实现

    添加依赖

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.0.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    自定义Redis Sink

    自定义一个 RedisSink 函数,继承 RichSinkFunction,重写其中的open、invoke和close方法

    open:用于新建Redis客户端
    
    invoke:将数据存储到Redis中,这里将数据以字符串的形式存储到Redis中
    
    close:使用完毕后关闭Redis客户端
    
    • 1
    • 2
    • 3
    • 4
    • 5
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import redis.clients.jedis.Jedis;
    
    public class RedisSink extends RichSinkFunction {
    
        private transient Jedis jedis;
        
    	@Override
        public void open(Configuration config) {
            jedis = new Jedis("localhost", 6379);
        }
    
        @Override
        public void invoke(Object value, Context context) throws Exception {
            Tuple2<String, String> val = (Tuple2<String, String>) value;
            if (!jedis.isConnected()) {
                jedis.connect();
            }
            jedis.set(val.f0, val.f1);
        }
    
        @Override
        public void close() throws Exception {
            jedis.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    使用Sink

        public static void main(String[] args) throws Exception {
            // 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 创建data source
            DataStreamSource<String> dataStreamSource = env.fromElements("Flink", "Spark", "Storm");
            // 应用转换算子
            SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
                @Override
                public Tuple2<String, String> map(String s) throws Exception {
                    return new Tuple2<>(s, s);
                }
            });
    
            // 关联sink:将给定接收器添加到此数据流
            streamOperator.addSink(new RedisSink());
    
            // 执行
            env.execute("redis sink");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    验证

    在Redis的控制台中查询数据

    本地:0>keys *
    1) "Flink"
    2) "Storm"
    3) "Spark"
    
    • 1
    • 2
    • 3
    • 4

    开源 Redis Connector

    可以使用Flink和Bahir提供的实现,其内部都是使用Java Redis客户端Jedis实现Redis Sink。

    Flink:https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis

    Bahir:https://bahir.apache.org/#home

    添加依赖

    Flink与Bahir的使用方法类似,这里以使用Flink提供的依赖Jar使用为例,引入flink-connector-redis。

    Flink提供的依赖包

    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.12</artifactId>
        <version>1.1.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Bahir提供的依赖包

    <dependency>
       <groupId>org.apache.bahir</groupId>
       <artifactId>flink-connector-redis_2.11</artifactId>
       <version>1.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    自定义Redis Sink

    可以通过实现 RedisMapper 来自定义Redis Sink

    自定义的MyRedisSink实现了RedisMapper并覆写了其中的getCommandDescription、getKeyFromData、getValueFromData。

    getCommandDescription:定义存储到Redis中的数据格式,这里定义RedisCommandSET,将数据以字符串的形式存储
    
    getKeyFromData:定义SETKey
    
    getValueFromData:定义SET的值
    
    • 1
    • 2
    • 3
    • 4
    • 5

    RedisCommand

    Redis的所有可用命令,每个命令属于RedisDataType。

    public enum RedisCommand {
    
        /**
         * 将指定值插入到存储在键上的列表的头部。
         * 如果键不存在,则在执行推送操作之前将其创建为空列表。
         */
        LPUSH(RedisDataType.LIST),
        /**
         * 将指定值插入到存储在键上的列表的尾部。
         * 如果键不存在,则在执行推送操作之前将其创建为空列表。
         */
        RPUSH(RedisDataType.LIST),
        /**
         * 将指定成员添加到存储在键上的集合中。
         * 忽略已经是该集合成员的指定成员。
         */
        SADD(RedisDataType.SET),
        /**
         * 设置键的字符串值。如果键已经持有一个值,
         * 则无论其类型如何,都会被覆盖。
         */
        SET(RedisDataType.STRING),
        /**
         * 设置键的字符串值,并设置生存时间(TTL)。
         * 如果键已经持有一个值,则无论其类型如何,都会被覆盖。
         */
        SETEX(RedisDataType.STRING),
        /**
         * 将元素添加到以第一个参数指定的变量名存储的HyperLogLog数据结构中。
         */
        PFADD(RedisDataType.HYPER_LOG_LOG),
        /**
         * 将消息发布到给定的频道。
         */
        PUBLISH(RedisDataType.PUBSUB),
        /**
         * 将具有指定分数的指定成员添加到存储在键上的有序集合中。
         */
        ZADD(RedisDataType.SORTED_SET),
        ZINCRBY(RedisDataType.SORTED_SET),
    
        /**
         * 从存储在键上的有序集合中删除指定的成员。
         */
        ZREM(RedisDataType.SORTED_SET),
        /**
         * 在键上的哈希中设置字段的值。如果键不存在,
         * 则创建一个持有哈希的新键。如果字段已经存在于哈希中,则会覆盖它。
         */
        HSET(RedisDataType.HASH),
        HINCRBY(RedisDataType.HINCRBY),
    
        /**
         * 为指定的键进行加法操作。
         */
        INCRBY(RedisDataType.STRING),
        /**
         * 为指定的键进行加法操作,并在固定时间后过期。
         */
        INCRBY_EX(RedisDataType.STRING),
        /**
         * 为指定的键进行减法操作。
         */
        DECRBY(RedisDataType.STRING),
        /**
         * 为指定的键进行减法操作,并在固定时间后过期。
         */
        DESCRBY_EX(RedisDataType.STRING);
        /**
         * 此命令所属的{@link RedisDataType}。
         */
        private RedisDataType redisDataType;
    
        RedisCommand(RedisDataType redisDataType) {
            this.redisDataType = redisDataType;
        }
    
        /**
         * 获取此命令所属的{@link RedisDataType}。
         *
         * @return {@link RedisDataType}
         */
        public RedisDataType getRedisDataType() {
            return redisDataType;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    String数据类型示例

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    public class RedisStringSink implements RedisMapper<Tuple2<String, String>> {
    
        /**
         * 设置redis数据类型
         */
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.SET);
        }
    
        /**
         * 设置Key
         */
        @Override
        public String getKeyFromData(Tuple2<String, String> data) {
            return data.f0;
        }
    
        /**
         * 设置value
         */
        @Override
        public String getValueFromData(Tuple2<String, String> data) {
            return data.f1;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    Hash数据类型示例

    创建一个Order对象

    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.math.BigDecimal;
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class Order {
    
        private Integer id;
        private String name;
        private BigDecimal price;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    编码示例:

    import com.alibaba.fastjson.JSON;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    public class RedisHashSink implements RedisMapper<Order> {
    
        /**
         * 设置redis数据类型
         */
        @Override
        public RedisCommandDescription getCommandDescription() {
            /**
             * 第二个参数是Hash数据类型, 第二个参数是外面的key
             *
             * @redisCommand: Hash数据类型
             * @additionalKey: 哈希和排序集数据类型时使用(RedisDataType.HASH组或RedisDataType.SORTED_SET), 其他类型忽略additionalKey
             */
            return new RedisCommandDescription(RedisCommand.HSET, "redis");
        }
    
        /**
         * 设置Key
         */
        @Override
        public String getKeyFromData(Order oder) {
            return oder.getId() + "";
        }
    
        /**
         * 设置value
         */
        @Override
        public String getValueFromData(Order oder) {
            // 从数据中获取Value: Hash的value
            return JSON.toJSONString(oder);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    使用Sink

    RedisStringSink

        public static void main(String[] args) throws Exception {
            // 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 创建data source
            DataStreamSource<String> dataStreamSource = env.fromElements("Flink", "Spark", "Storm");
            // 应用转换算子
            SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
                @Override
                public Tuple2<String, String> map(String s) throws Exception {
                    return new Tuple2<>(s, s);
                }
            });
    
            // Jedis池配置
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
            // 关联sink:将给定接收器添加到此数据流
            streamOperator.addSink(new RedisSink<>(conf, new RedisStringSink()));
    
            // 执行
            env.execute("redis sink");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    RedisHashSink

    public static void main(String[] args) throws Exception {
            // 创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 创建data source
            DataStreamSource<Order> dataStreamSource = env.fromElements(
                    new Order(1,"Flink", BigDecimal.valueOf(100)),
                    new Order(2,"Spark", BigDecimal.valueOf(200)),
                    new Order(3,"Storm", BigDecimal.valueOf(300))
            );
    
    
            // Jedis池配置
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().
                    setHost("localhost")
                    .setPort(6379)
                    // 池可分配的最大对象数,默认值为8
                    .setMaxTotal(100)
                    // 设置超时,默认值为2000
                    .setTimeout(1000 * 10)
                    .build();
            // 关联sink:将给定接收器添加到此数据流
            dataStreamSource.addSink(new RedisSink<>(conf, new RedisHashSink()));
    
            // 执行
            env.execute("redis sink");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    验证

    在Redis的控制台中查询数据

    查看String数据类型

    本地:0>get Flink
    "Flink"
    本地:0>get Spark
    "Spark"
    
    • 1
    • 2
    • 3
    • 4

    查看Hash数据类型

    Redis:0>keys *
    1) "redis"
    Redis:0>HGETALL redis
    1) "3"
    2) "{"id":3,"name":"Storm","price":300}"
    3) "2"
    4) "{"id":2,"name":"Spark","price":200}"
    5) "1"
    6) "{"id":1,"name":"Flink","price":100}"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
  • 相关阅读:
    亚马逊化学物质和重金属限制物质清单
    [CISCN 2023 初赛]go_session
    基于C#的学生选课管理系统
    vscode远程linux安装codelldb
    Python爬虫实战:图片爬取与保存
    Java异步注解@Async详解
    BeautifulSoup的使用
    二分算法(蓝桥杯 C++ 题目 代码 注解)
    这些并发测试知识点,你掌握了吗?
    Docker安装Mycat和Mysql进行水平分库分表实战【图文教学】
  • 原文地址:https://blog.csdn.net/qq_38628046/article/details/133811880