• Flink--- 批处理 / 流处理


    目录

    Flink的主要特点

    Flink 和 Spark Streaming

    搭建maven工程 FlinkTutorial

    添加Scala框架 和 Scala文件夹

     Flink-批处理wordcount

     Flink---流处理wordcount


    Flink 是一个框架和分布式的处理引擎,用于对无界和有界数据流进行状态计算。

    传统数据处理架构

    事务处理

    分析处理

    :将数据从业务数据库复制到数仓,再进行分析和查询

     流处理的演变

    lambda架构

    :用两套系统,同时保证低延迟和结果准确

    流处理的演变

    Flink的主要特点

    1、事件驱动

    2、基于流的世界观

    在Flink的世界观中,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流 :这就是所谓的有节流和无界流

    3、分层API

    越顶层越抽象,表达含义越简明,使用越方便

    越底层越具体,表达能力越丰富,使用越灵活

    Flink的其他特点

    1、支持事件时间(event-time)和处理时间(processing-time)语义

    2、精确一次(exactly-once) 的状态一致性保证

    3、低延迟,每秒处理数百万个事件,毫秒级延迟

    4、与众多常用存储系统的连接

    5、高可用,动态扩展,实现7*24小时全天候运行

    Flink 和 Spark Streaming

    流(stream)和微批(micro-batching)

     数据模型:

    --- spark采用RDD模型,spark streaming 是 DStream实际上也就是一组组小批数据RDD的集合

    --- flink基本数据模型是数据流,以及事件(Event)序列

    运行时架构:

    --- spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个 

    --- flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

    搭建maven工程 FlinkTutorial

    文件---新建---项目---maven

    在pom文件中插入

    如下内容:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.flink</groupId>
    4. <artifactId>flink-scala_2.12</artifactId>
    5. <version>1.10.1</version>
    6. </dependency>
    7. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
    8. <dependency>
    9. <groupId>org.apache.flink</groupId>
    10. <artifactId>flink-streaming-scala_2.12</artifactId>
    11. <version>1.10.1</version>
    12. </dependency>
    13. </dependencies>
    14. <build>
    15. <plugins>
    16. <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
    17. <plugin>
    18. <groupId>net.alchim31.maven</groupId>
    19. <artifactId>scala-maven-plugin</artifactId>
    20. <version>3.4.6</version>
    21. <executions>
    22. <execution>
    23. <!-- 声明绑定到 maven 的 compile 阶段 -->
    24. <goals>
    25. <goal>compile</goal>
    26. </goals>
    27. </execution>
    28. </executions>
    29. </plugin>
    30. <plugin>
    31. <groupId>org.apache.maven.plugins</groupId>
    32. <artifactId>maven-assembly-plugin</artifactId>
    33. <version>3.0.0</version>
    34. <configuration>
    35. <descriptorRefs>
    36. <descriptorRef>jar-with-dependencies</descriptorRef>
    37. </descriptorRefs>
    38. </configuration>
    39. <executions>
    40. <execution>
    41. <id>make-assembly</id>

    添加Scala框架 和 Scala文件夹

    在src-main目录下创建一个新目录,命名为:Scala

     

    (在新建目录上)单击右键---将目录标记为----源 根

    然后,创建一个Scala类--object--命名;即可 

     

    首先创建一个  . txt 文件

    在resources目录下创建,命名为:hello

    在新建文件夹中输入一些英语单词,一会进行批处理即可!

     运行代码

    1. package com.atguigu.wc
    2. import org.apache.flink.api.scala.ExecutionEnvironment
    3. import org.apache.flink.api.scala._
    4. object WordCount {
    5. def main(args: Array[String]): Unit = {
    6. //创建一个批处理的执行环境
    7. val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    8. //接收文件
    9. val inputPath:String = "D:\\HYF\\FlinkTutorial\\src\\main\\resources\\hello.txt"
    10. val inputDataSet:DataSet[String] = env.readTextFile(inputPath)
    11. //对数据进行转换处理统计,先分词,再按照word进行分组,最后进行聚合统计
    12. val resultDataSet:DataSet[(String,Int)] = inputDataSet
    13. .flatMap(_.split(" ")) //按照空格对String进行一个分割
    14. .map((_,1)) // _进行分组,1进行求和
    15. .groupBy(0) //以第一个元素作为key,进行分组
    16. .sum(1) //对所有数据的第二个元素求和
    17. resultDataSet.print()
    18. }
    19. }

    运行结果

    运行代码如下

    1. package com.atguigu.wc
    2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    3. import org.apache.flink.streaming.api.scala._
    4. object StreamWordCount {
    5. def main(args: Array[String]): Unit = {
    6. val env = StreamExecutionEnvironment.getExecutionEnvironment
    7. val inputDataStream:DataStream[String] = env.socketTextStream("localhost",7777)
    8. val resultDataStream:DataStream[(String,Int)] = inputDataStream
    9. .flatMap(_.split(" "))
    10. .filter(_.nonEmpty)
    11. .map((_,1))
    12. .keyBy(0)
    13. .sum(1)
    14. resultDataStream.print()
    15. //启动任务执行
    16. env.execute("stream word count")
    17. }
    18. }

    测试——在 linux 系统中用 netcat 命令进行发送测试

    启动命令 ---- nc -lk 7777 

    输入一些数据即可!

    运行结果:当时监听窗口出现错误了,所以没有监听成功,结果这里就不显示了

  • 相关阅读:
    操作失误损失60亿美元,Excel还能是电脑上的常驻将军吗?
    SSL证书申购指南教程
    upload-labs通关
    Idea中Spring bean自动注入提示错误以及AOP不提示问题
    linux-文件管理
    诸葛广告分析3大能力全面升级,新增巨量引擎渠道广告监测!
    高创新 | CEEMDAN-VMD-BiLSTM-Attention双重分解+双向长短期记忆神经网络+注意力机制多元时间序列预测
    drawcall,batch,setpass
    Redis 设置密码
    工作流之activiti7学习进阶篇
  • 原文地址:https://blog.csdn.net/qq_70085330/article/details/126396109