• Flink Table API & SQL


    示例:

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

        String filePath = "D:\\yly\\BaiduNetdiskDownload\\最全最新flink教程\\000.代码+环境\\00.flink-train-master\\flink-train\\data\\04\\sales.csv";

        DataSet csv = env.readCsvFile(filePath)

                .ignoreFirstLine().includeFields("1111").fieldDelimiter(",")

                .pojoType(Sales.class,"transactionId","customerId","itemId","amountPaid");

        //csv.print();

        Table sales = tableEnv.fromDataSet(csv);

        tableEnv.registerTable("sales", sales);

        Table resultTable = tableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId order by money");

        DataSet result = tableEnv.toDataSet(resultTable, Row.class);

        result.print();

    }

    public static class Sales{

        public String transactionId;

        public String customerId;

        public String itemId;

        public Double  amountPaid;

        @Override

        public String toString() {

            return "Sales{" +

                    "transactionId='" + transactionId + '\'' +

                    ", customerId='" + customerId + '\'' +

                    ", itemId='" + itemId + '\'' +

                    ", amountPaid=" + amountPaid +

                    '}';

        }

        public String getTransactionId() {

            return transactionId;

        }

        public void setTransactionId(String transactionId) {

            this.transactionId = transactionId;

        }

        public String getCustomerId() {

            return customerId;

        }

        public void setCustomerId(String customerId) {

            this.customerId = customerId;

        }

        public String getItemId() {

            return itemId;

        }

        public void setItemId(String itemId) {

            this.itemId = itemId;

        }

        public Double getAmountPaid() {

            return amountPaid;

        }

        public void setAmountPaid(Double amountPaid) {

            this.amountPaid = amountPaid;

        }

    }

  • 相关阅读:
    设计模式——享元模式(Flyweight Pattern)+ Spring相关源码
    无涯教程-JavaScript - OCT2HEX函数
    外贸线上推广引流的技巧
    零号培训平台课程-1、SQL注入基础
    doc转html后添加style和导航
    LINUX中的chmod修改文件权限命令
    LeetCode 面试题 08.04. 幂集
    国庆作业 9月30 消息队列实现进程间通信
    rust的struct
    限制容器中的进程
  • 原文地址:https://blog.csdn.net/daliyuan350649623/article/details/124981521