• Flink将数据写入MySQL(JDBC)


    一、写在前面

    在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

    二、代码示例

    2.1 版本说明

            <flink.version>1.14.6</flink.version>
            <spark.version>2.4.3</spark.version>
            <hadoop.version>2.8.5</hadoop.version>
            <hbase.version>1.4.9</hbase.version>
            <hive.version>2.3.5</hive.version>
            <java.version>1.8</java.version>
            <scala.version>2.11.8</scala.version>
            <mysql.version>8.0.22</mysql.version>
            <scala.binary.version>2.11</scala.binary.version>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.2 导入相关依赖

     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!--mysql连接器依赖-->
    <dependency>
       <groupId>mysql</groupId>
       <artifactId>mysql-connector-java</artifactId>
       <version>8.0.22</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.3 连接数据库,创建表

    mysql> CREATE TABLE `ws` ( 
          `id` varchar(100) NOT NULL
          ,`ts` bigint(20) DEFAULT NULL
          ,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`) 
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.4 创建POJO类

    package com.flink.POJOs;
    
    
    import java.util.Objects;
    
    /**
     * TODO POJO类的特点
     * 类是公有(public)的
     * 有一个无参的构造方法
     * 所有属性都是公有(public)的
     * 所有属性的类型都是可以序列化的
     */
    public class WaterSensor {
        //类的公共属性
        public String id;
        public Long ts;
        public Integer vc;
    
        //无参构造方法
        public WaterSensor() {
            //System.out.println("调用了无参数的构造方法");
        }
    
        public WaterSensor(String id, Long ts, Integer vc) {
            this.id = id;
            this.ts = ts;
            this.vc = vc;
        }
    
        //生成get和set方法
        public void setId(String id) {
            this.id = id;
        }
    
        public void setTs(Long ts) {
            this.ts = ts;
        }
    
        public void setVc(Integer vc) {
            this.vc = vc;
        }
    
        public String getId() {
            return id;
        }
    
        public Long getTs() {
            return ts;
        }
    
        public Integer getVc() {
            return vc;
        }
    
        //重写toString方法
        @Override
        public String toString() {
            return "WaterSensor{" +
                    "id='" + id + '\'' +
                    ", ts=" + ts +
                    ", vc=" + vc +
                    '}';
        }
    
        //重写equals和hasCode方法
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            WaterSensor that = (WaterSensor) o;
            return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);
        }
    
        @Override
        public int hashCode() {
            return Objects.hash(id, ts, vc);
        }
    }
    //scala的case类?
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    2.5 自定义map函数

    package com.flink.POJOs;
    
    import org.apache.flink.api.common.functions.MapFunction;
    
    public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
        @Override
        public WaterSensor map(String value) throws Exception {
            String[] datas = value.split(",");
            return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    2.5 Flink2MySQL

    package com.flink.DataStream.Sink;
    
    import com.flink.POJOs.WaterSensor;
    import com.flink.POJOs.WaterSensorMapFunction;
    import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
    import org.apache.flink.connector.jdbc.JdbcSink;
    import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    
    /**
     * Flink 输出到 MySQL(JDBC)
     */
    public class flinkSinkJdbc {
        public static void main(String[] args) throws Exception {
            //TODO 创建Flink上下文执行环境
            StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            streamExecutionEnvironment.setParallelism(1);
            //TODO Source
            DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
            //TODO Transfer
            SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());
            /**TODO 写入 mysql
             * 1、只能用老的 sink 写法
             * 2、JDBCSink 的 4 个参数:
             *   第一个参数: 执行的 sql,一般就是 insert into
             *   第二个参数: 预编译 sql, 对占位符填充值
             *   第三个参数: 执行选项 ---->攒批、重试
             *   第四个参数: 连接选项---->url、用户名、密码
             */
            SinkFunction<WaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",
                    new JdbcStatementBuilder<WaterSensor>() {
                        @Override
                        public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                            preparedStatement.setString(1, waterSensor.getId());
                            preparedStatement.setLong(2, waterSensor.getTs());
                            preparedStatement.setInt(3, waterSensor.getVc());
                            System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");
                        }
                    }
                    , JdbcExecutionOptions
                            .builder()
                            .withMaxRetries(3)         // 重试次数
                            .withBatchSize(100)        // 批次的大小:条数
                            .withBatchIntervalMs(3000) // 批次的时间
                            .build(),
                    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                            .withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                            .withUsername("root")
                            .withPassword("********")
                            .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                            .build()
            );
            //TODO 写入到Mysql
            waterSensorSingleOutputStreamOperator.addSink(sinkFunction);
    
            streamExecutionEnvironment.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    2.6 启动necat、Flink,观察数据库写入情况

    nc -lk 9999 #启动necat、并监听8888端口,写入数据
    
    • 1

    在这里插入图片描述
    启动Flink程序
    在这里插入图片描述
    查看数据库写入是否正常
    在这里插入图片描述

  • 相关阅读:
    AD - 将修改后的 PCB 封装更新到当前 PCB 中
    python 获取apk信息
    【Linux基础】3.1任务调度
    华为政企管理软件产品集
    一座“城池”:泡泡玛特主题乐园背后,IP梦想照亮现实
    【游戏开发算法每日一记】使用随机prime算法生成错综复杂效果的迷宫(C#,C++和Unity版)
    聊聊电商系统架构演进
    Tcpdump命令详解
    开源社区ECE:Elastic认证考试复盘总结134贴
    57、服务攻防——应用协议&Rsync&SSH&RDP&漏洞批扫&口令猜解
  • 原文地址:https://blog.csdn.net/dgssd/article/details/134065171