• 52、Flink 使用 Parametertool 获取应用参数代码示例


    1、获取配置参数-1

    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.io.IOException;
    import java.util.Map;
    
    public class _01_ParameterToolReadArgs {
        public static void main(String[] args) throws IOException {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 配置值来自 .properties 文件
    //        String propertiesFilePath = "*";
    //        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(propertiesFilePath);
    
    //        File file = new File("*");
    //        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(file);
    
    //        FileInputStream fileInputStream = new FileInputStream(new File("*"));
    //        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fileInputStream);
    
            // 配置值来自命令行
            // 输入
            // --input hdfs:///mydata
            // --elements 42
            // 输出
            // input=hdfs:///mydata
            // elements=42
    //        ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
            // 配置值来自系统属性,VmOptions
            // 输入
            // -Dinput=hdfs:///mydata
            // 输出
            // input=hdfs:///mydata
            ParameterTool parameterTool = ParameterTool.fromSystemProperties();
    
            for (Map.Entry<String, String> entry : parameterTool.toMap().entrySet()) {
                if ("input".equals(entry.getKey())) {
                    System.out.println(entry.getKey() + "=" + entry.getValue());
                }
            }
        }
    }
    
    

    2、获取配置参数-2

    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.Map;
    
    /**
     * 输入 --input myGlobalParamsInput
     * 输出 myGlobalParamsInput
     */
    public class _02_ParameterToolGlobalParams {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            ParameterTool parameters = ParameterTool.fromArgs(args);
            env.getConfig().setGlobalJobParameters(parameters);
    
            DataStreamSource<Integer> source = env.fromData(1, 2, 3);
            source.map(new MyRichMapFunc()).print();
    
            env.execute();
        }
    }
    
    class MyRichMapFunc extends RichMapFunction<Integer, String> {
    
        @Override
        public String map(Integer value) throws Exception {
            Map<String, String> globalJobParameters = getRuntimeContext().getGlobalJobParameters();
            return globalJobParameters.get("input");
        }
    }
    
  • 相关阅读:
    Vue Router的安装
    Jmeter分布式压测
    低代码平台前端的设计与实现(四)组件大纲树的构建设计
    多商家AI智能名片商城系统(开源版)——构建高效数字化商业新生态
    正态分布核函数
    智能制造时代下,MES管理系统需要解决哪些问题
    5000+封号血泪史,造就这16条WhatsApp防死号秘籍
    哈希函数2:用于哈希表的存储和扩容
    学习C++语言可以适用于哪些方面
    Hadoop(HDFS)
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/140058323