• About-Flink


    About-Flink

    一、Flink简介

    1.1、flink特点

    • 低延迟,高吞吐、结果精准,良好的容错

      ? 支持事件时间(event-time)和处理时间(processing-time)语义
      ? 精确一次(exactly-once)的状态一致性保证
      ? 低延迟,每秒处理数百万个事件,毫秒级延迟
      ? 与众多常用存储系统的连接
      ? 高可用,动态扩展,实现7*24小时全天候运行

    1.2、分层Api

    	越顶层越抽象,表达含义越简明,使用越方便
    	越底层越具体,表达能力越丰富,使用越灵活
    
    • 1
    • 2

    在这里插入图片描述

    1.3、Flink vs Spark Streaming

    ?数据模型
    -spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
    -flink基秀数据模型是数据流,以及事件(Event)序列
    
    ?运行时架构
    -spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
    -flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节
    点进行处理
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    二、Flink批处理应用

    2.1、依赖的引入

    在这里插入图片描述

    2.2、准备批处理文件

    在这里插入图片描述

    2.3、wordCount编码

    在这里插入图片描述

    2.4、自定义类

    在这里插入图片描述

    • 结果输出
      在这里插入图片描述

    三、Flink流处理应用

    3.1、wordCount编码

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    3.2、设置并行度-默认为4

    在这里插入图片描述

    3.2、数据来源socket

    • nc lk -7777
      在这里插入图片描述

    3.3、配置文件参数提取

    在这里插入图片描述
    在这里插入图片描述

    四、Standlone环境运行job

    4.1、Standlone环境的搭建

    • 下载包 解压
    • flink-1.10.1-bin.scala_2.12.tgz

    4.2、配置文件说明

    在这里插入图片描述

    1、通常jobmanager 的配置比 taskmanager,因为干活的是taskmanager
    2、并行度不一定比slots小,一定比集群总的slots小
    
    • 1
    • 2
    • 启动一个jobmanage和一个taskmanager
      在这里插入图片描述
      在这里插入图片描述
    • 配置参考
      在这里插入图片描述

    4.3、提交jar包入口

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    • 配置文件4个槽,只占用了2个槽。以并行度最高来群定soilt的使用个数。个数不够,超时后会报超时。
      在这里插入图片描述
    • 运行结果
      在这里插入图片描述

    4.4、命令行提交Job

    在这里插入图片描述

    • 查看运行的job列表
      在这里插入图片描述
    • 命令行取消job
      在这里插入图片描述
    • 查看运行的和取消的所有列表
      在这里插入图片描述

    五、Flink On Yarn

    • flink提供了两种yarn上运行模式,分别为session-Cluster和per-Job-cluster的模式
    • 以Yarn模式部署Flink任务时,要求Flink是有Haddop支持的版本,1.7以上版本,需要将整合hadoop支持的依赖放入Flink 的 lib下。

    5.1、session-Cluster模式

    在这里插入图片描述
    在这里插入图片描述

    • Flink bin 目录下启动 -n 可不指定
      在这里插入图片描述
    • 执行提交job命令 有Session 找 Session集群 没session找 Standlone
      在这里插入图片描述
      在这里插入图片描述

    5.2、Per-Job-cluster模式

    在这里插入图片描述
    在这里插入图片描述

    • 基本操作
      在这里插入图片描述

    六、Flink 四大组件

    6.1、Flink运行时的组件

    在这里插入图片描述

    • 作业管理器 JobManager作用

      1、控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。
      2、JobManager会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow Graph)和打包了所有的类、库和其它资源的JAR包。
      3、JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"执行图"(ExecutionGraph),包含了所有可以并发执行的任务。
      4、JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(兀skManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的hskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
      
      • 1
      • 2
      • 3
      • 4
    • 任务管理器 TaskManager

      1、Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。
      2、启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
      3、在执行过程中,TgskManager可以跟其它运行同一应用程序的TaskManager交换数据。
      
      • 1
      • 2
      • 3
    • 资源管理器 ResourceManager

      1、主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定义的处理资源单元。
      2、Flink为不同的环境和资源管理工具提供了不同资源管理器,比如SRN、Mesos、K8s, 以及Standalone部署。
      3、当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分酉已给JobManager。如果ResourceManager没有有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动心TaskManager进程的容器。
      
      • 1
      • 2
      • 3
    • 分发器

      1、可以跨作业运行,它为应用提交提供了REST接口。
      2、当一个应用被提交执行时,分发器就会启动并将应用移交给一个
      
      • 1
      • 2

      JobManager
      3、Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执
      行的信息。
      4、Dispatcher在架构中可能并不是必需的,这取决于应用提交运行
      的方式。

    6.2、任务提交流程

    在这里插入图片描述
    在这里插入图片描述

    6.3、任务调度原理

    在这里插入图片描述

    6.4、并行度

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    6.5、程序和数据流

    在这里插入图片描述
    在这里插入图片描述

    6.6、数据的传输形式

    在这里插入图片描述

    6.6、任务链

    在这里插入图片描述
    在这里插入图片描述

    七、F流处理PAI

    7.1、Environment

    7.1.1、getExcuteionEnvironment

    在这里插入图片描述

    7.1.2、createLocalEnvironment

    在这里插入图片描述

    7.1.3、createRemoteEnvironment

    在这里插入图片描述

    7.2、Source

    7.2.1、List

    import com.tan.flink.bean.SensorReading;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import java.util.Arrays;
    
    public class SourceFromCollection {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream inputDataStream = env.fromCollection(Arrays.asList(
                    new SensorReading("sensor_1", 1547718199L, 35.8),
                    new SensorReading("sensor_6", 1547718201L, 15.4),
                    new SensorReading("sensor_7", 1547718202L, 6.7),
                    new SensorReading("sensor_10", 1547718205L, 38.1)
            ));
            inputDataStream.print();
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    7.2.2、source from file

    env.readTextFile(path);
    
    • 1

    7.2.3、source from kafka

    • pom 依赖

      	
              org.apache.flink
              flink-connector-kafka-0.11_2.12
              1.10.1
          
      
      • 1
      • 2
      • 3
      • 4
      • 5

      import org.apache.flink.api.common.serialization.SimpleStringSchema;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
      import java.util.Properties;

      public class SourceFromKafka {
      public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

          // kafka 配置
          Properties properties = new Properties();
          properties.setProperty("bootstrap.servers", "192.168.200.102:9092,192.168.200.102:9092,192.168.200.104:9092");
          properties.setProperty("group.id", "flink-kafka");
          properties.setProperty("key.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");
          properties.setProperty("value.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");
          properties.setProperty("auto.offset.reset", "latest");
      
          DataStreamSource inputDataStream = env.addSource(new FlinkKafkaConsumer011(
                  "sensor",
                  new SimpleStringSchema(),
                  properties
          ));
      
          inputDataStream.print();
          env.execute();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19

      }

    7.2.4、自定义source

    • 需要实现SourceFunction 或者继承SourceFunction的富函数RichSourceFunction

      import com.tan.flink.bean.SensorReading;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.source.SourceFunction;
      import java.util.Random;
      import java.util.UUID;
      
      public class SourceFromCustom {
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              DataStreamSource inputDataStream = env.addSource(new CustomSource());
              inputDataStream.print();
              env.execute();
          }
      
          public static class CustomSource implements SourceFunction {
              boolean running = true;
              @Override
              public void run(SourceContext sourceContext) throws Exception {
      
                  Random random = new Random();
                  while (running) {
                      // 每隔 100 秒数据
                      for (int i = 0; i < 5; i++) {
                          String id = UUID.randomUUID().toString().substring(0, 8);
                          long timestamp = System.currentTimeMillis();
                          double temperature = 60 + random.nextGaussian() * 20;
                          sourceContext.collect(new SensorReading(id, timestamp, temperature));
      
                          Thread.sleep(100L);
                      }
      
                      Thread.sleep(1000L);
                  }
              }
      
              @Override
              public void cancel() {
                  running = false;
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42

    7.3、Treansform

    7.3.1、map

    7.3.2、FlatMap

    7.3.3、Fliter

    7.3.4、Keyby

    7.3.5、滚动聚合

    7.3.6、Reduce

    7.3.7、Split和select

  • 相关阅读:
    【NLP】第14章 解释黑盒Transformer模型
    YOLO V7源码解析
    国际腾讯云自主拼装直播 URL教程!!!
    聊天机器人语料在开发中的重要性
    R语言mgcv包广义可加模型对分类曲线进行拟合
    PGL图学习之图神经网络GraphSAGE、GIN图采样算法[系列七]
    linux 安装Docker
    每日刷题-3
    Github 星标 57.9K!阿里巴巴 Java 面试突击汇总(全彩版)首次公开
    手记:把代码上传到Gitee等远程仓库的过程记录及常见问题
  • 原文地址:https://blog.csdn.net/m0_67390379/article/details/126325876