• 【Flink实战】玩转Flink里面核心的Sink Operator实战


    🚀 作者 :“大数据小禅”

    🚀 文章简介 :玩转Flink里面核心的Sink Operator实战

    🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


    Flink Sink Operator简介

    • 在Flink中,Sink Operator(也称为Sink Function或Sink)是指负责将DataStream或DataSet的数据发送到外部存储或外部系统的操作符。Sink Operator是Flink的数据输出端,它的作用是将处理过的数据写入目标位置,如数据库、文件系统、消息队列等。

    • Sink Operator通过将数据传输到外部系统来完成最终的数据存储、展示或其他类型的处理。它可以将数据单个地或批量地发送到目标系统,具体取决于Sink操作符的实现。例如,可以将数据写入关系型数据库、NoSQL数据库、消息队列、文件系统等。

    • 在Flink中,可以使用预定义的Sink操作符,如addSink()方法,或自定义Sink函数来实现数据的输出。预定义的Sink操作符可以满足一般的输出需求,而自定义Sink函数可以根据具体的业务逻辑实现特定的输出操作。

    • 自定义Sink函数需要实现SinkFunction接口或RichSinkFunction抽象类,并重写其中的方法。这些方法包括open()、invoke()和close()等,用于初始化和管理连接,以及处理数据发送等操作。

    使用Sink Operator时,需要考虑以下几个方面:

    • 目标系统的可用性和容错性:保证目标系统的可用性,并确保在故障发生时能够进行重试或恢复。
    • 写入的一致性:根据需求选择适当的写入一致性级别,如精确一次(exactly-once)或最少一次(at-least-once)语义。
    • 并行度和性能:根据目标系统的特性和可用资源,设置合适的并行度以提高任务并行处理和整体性能。

    Flink 核心知识 Sink Operator速览

    • Flink编程模型
      在这里插入图片描述
    • Sink 输出源
      • 预定义
        • print
        • writeAsText (过期)
      • 自定义
        • SinkFunction
        • RichSinkFunction
          • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
      • flink官方提供 Bundle Connector
        • kafka、ES 等
      • Apache Bahir
        • kafka、ES、Redis等

    Flink 自定义的Sink 连接Mysql存储商品订单案例实战

    • 自定义

      • SinkFunction
      • RichSinkFunction
        • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
    • Flink连接mysql的几种方式(都需要加jdbc驱动)

      • 方式一:自带flink-connector-jdbc 需要加依赖包
      <dependency>
          <groupId>org.apache.flinkgroupId>
          <artifactId>flink-connector-jdbc_2.12artifactId>
          <version>1.12.0version>
      dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 方式二:自定义sink
    • 保存视频订单到Mysql

      CREATE TABLE `video_order` (
        `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
        `user_id` int(11) DEFAULT NULL,
        `money` int(11) DEFAULT NULL,
        `title` varchar(32) DEFAULT NULL,
        `trade_no` varchar(64) DEFAULT NULL,
        `create_time` date DEFAULT NULL,
        PRIMARY KEY (`id`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 添加jdbc依赖
      <dependency>
                  <groupId>mysqlgroupId>
                  <artifactId>mysql-connector-javaartifactId>
                  <version>8.0.25version>
      dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 编码
      public class MysqlSink extends RichSinkFunction {
      
          private Connection conn = null;
          private PreparedStatement ps = null;
      
          @Override
          public void open(Configuration parameters) throws Exception {
              conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "xdclass.net");   //url user passwd
              String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
              ps = conn.prepareStatement(sql);
          }
      
          @Override
          public void close() throws Exception {
              if (conn != null) {
                  conn.close();
              }
              if (ps != null) {
                  ps.close();
              }
          }
      
          @Override
          public void invoke(VideoOrder videoOrder, Context context) throws Exception {
              //给ps中的?设置具体值
              ps.setInt(1,videoOrder.getUserId());
              ps.setInt(2,videoOrder.getMoney());
              ps.setString(3,videoOrder.getTitle());
              ps.setString(4,videoOrder.getTradeNo());
              ps.setDate(5,new Date(videoOrder.getCreateTime().getTime()));
      
              ps.executeUpdate();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34

      在这里插入图片描述
      在这里插入图片描述

  • 相关阅读:
    【SpringCloud-Seata分布式事物】
    Questions Per Chapter
    用大模型实现PPT可视化几种思路
    ORACLE创建用户
    TSINGSEE视频AI智能分析技术:水泥厂安全生产智能监管解决方案
    docker-redis
    Linux journal日志文件维护
    《软件质量保证与测试》第 7 章——验收测试 重点部分总结
    Ubuntu 安装Kafka
    自学Python 57 多线程开发(七)使用 Connection对象和共享对象 Shared
  • 原文地址:https://blog.csdn.net/weixin_45574790/article/details/132857273