• Flink Java Table API & SQL 之 wordcount


    Table API

    package com.daidai.table;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import java.util.Arrays;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    public class WordCountTable {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            DataStreamSource<String> source = env.fromCollection(Arrays.asList("hello", "word", "java", "scala", "java"));
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
    
            Table table = tableEnv.fromDataStream(wordAndOne, $("word"), $("sum"));
            Table result = table.groupBy($("word"))
                    .select($("word"), $("sum").sum().as("count"));
    
            tableEnv.toRetractStream(result, Row.class).print();
    
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    SQL

    package com.daidai.table;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import java.util.Arrays;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    public class WordCountSQL {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            DataStreamSource<String> source = env.fromCollection(Arrays.asList("hello", "word", "java", "scala", "java"));
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
    
            Table table = tableEnv.fromDataStream(wordAndOne, $("word"), $("sum"));
    
            tableEnv.createTemporaryView("wc", table);
    
            Table result = tableEnv.sqlQuery("select word, sum(`sum`) from wc group by word");
    
            tableEnv.toRetractStream(result, Row.class).print();
    
            env.execute();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
  • 相关阅读:
    【测试开发】用例篇 · 熟悉黑盒测试用例设计方法(2)· 正交表 · 场景设计 · 常见案例练习
    算法学习笔记(18): 平衡树(一)
    【Maven】SpringBoot多模块项目利用reversion占位符,进行版本管理.打包时版本号不能识别问题
    Learning How to Ask: Querying LMs with Mixtures of Soft Prompts
    docker常见命令
    C++17:variant
    Mybatis面试题(三十三道)
    最优乘车——最短路
    SpringCloud 学习笔记(3 / 3)
    vue使用ant design Vue中的a-select组件实现下拉分页加载数据
  • 原文地址:https://blog.csdn.net/weixin_46376562/article/details/125635530