• 【API篇】一、执行环境API


    0、认识

    DataStream API是Flink的核心层API。一个Flink程序,其实就是对数据源DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:

    在这里插入图片描述

    后面章节,分别对每一个环节的API做整理。

    1、创建执行环境

    Flink的程序可以在各种上下文环境中运行,比如本地JVM中执行,也可以提交到远程集群中运行,分别对应着不同的Flink的运行环境,获取这个执行环境,也就是StreamExecutionEnvironment类的对象。

    获取方式一:

    StreamExecutionEnvironment.getExecutionEnvironment()
    
    • 1

    调用静态方法getExecutionEnvironment,会根据当前运行的上下文直接得到正确的结果:

    • 如果程序是独立运行的,就返回一个本地执行环境
    • 如果是命令行+jar包提交到集群执行,就返回集群的执行环境

    重点:这个静态方法根据当前运行方式,自行决定并返回一个适配的运行环境。 getExecutionEnvironment方法还可以传一个flink包下的Confiruration对象

    StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
    
    • 1

    此时,就可以访问web控制台了。localhost:8081。此外,通过Configuraiton对象,可以改一些默认的配置,比如端口8081

    Configuration conf = new Configuration();
    
    conf.set(RestOptions.BIND_PORT, "8082");
    
    
    • 1
    • 2
    • 3
    • 4

    此时,控制台就该访问localhost:8082

    获取方式二:

    StreamExecutionEnvironment.createLocalEnvironment()
    
    • 1

    返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

    //并行度为3
    StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment(3);  
    
    • 1
    • 2

    获取方式三:

    这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

    StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host",1234,"path/to/jarFile.jar");
    // 参数一:JobManager主机名
    // 参数二:JobManager进程端口号
    // 参数三:提交给JobManager的JAR包
    
    • 1
    • 2
    • 3
    • 4

    最后,不管用哪个方法,拿到执行环境对象后,还可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制等

    env.setParallelism(2);
    
    • 1

    2、执行模式

    从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理,不再使用DataSet API。

    // 流处理环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    • 1
    • 2

    DataStream API执行模式包括:

    • 流执行模式(Streaming):用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式
    • 批执行模式(Batch):专门用于批处理的执行模式
    • 自动模式(AutoMatic):由程序根据输入数据源是否有界,来自动选择是流处理还是批处理执行

    执行模式选择,可以通过命令行方式配置:

    //BATCH
    bin/flink run -Dexecution.runtime-mode=BATCH ...
    
    • 1
    • 2

    也可以通过代码配置:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    
    • 1
    • 2
    • 3

    前者用的更多,不推荐硬编码。

    3、触发程序执行

    写完输出(sink)操作并不代表程序已经结束,因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中,这时并没有真正处理数据,因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为延迟执行懒执行

    env.execute();
    
    • 1

    所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后才返回一个执行结果(JobExecutionResult)

    4、关于executeAsync方法

    • 正常情况下,一个execute方法执行,触发一个Flink Job。

    • 一个main方法里也可以调用多个execute,但没意义,因为execute()方法将一直等待作业完成,肯定阻塞后面的

    • env.executeAsync方法,可以异步触发,不会阻塞后面的任务

    public static void main(String[] args){
    
    	...
    	executeAsync();   //触发第一个Job
    	
    	//job2...(一般不这么写)
    	
    	....
    	executeAsync();  //第二个Job产生
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 一个main方法里executeAsync方法的调用次数,等于生成的flink job的个数
    • 同样的,再Yarn-Application集群,提交一次,当调用n次executeAsync,就有n个Job,对应在JobManager里,就有n个JobMaster
  • 相关阅读:
    ORACLE Redo Log Buffer 重做日志缓冲区机制的设计
    Java堆外缓存(一个很有意思的应用)
    【算法】【二叉树模块】求一个二叉树是否包含另一个二叉树的拓扑结构
    图像压缩(4)《数字图像处理》第八章 8.3节 数字图像水印
    优步让一切人工智能化
    Linux 驱动开发 五十六:Buildroot 笔记
    培养现货黄金投资的盈利能力
    探索光模块的MSA多源协议
    C++小游戏视频及资料集(一)
    动态内存管理(C语言)
  • 原文地址:https://blog.csdn.net/llg___/article/details/133798713