• Flink学习12:DataStreaming API


     1.概念

    2.DatStream编程

     需要代码实现的就3部分,1.数据源  2.转换 3.sink指定输出格式

    2.1 创建环境

    val env = ExecutionEnvironment.getExecutionEnvironment

    语句比较固定

     2.2 数据源

     内置数据源主要有3种:

     2.2.1 文本数据源

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    object readText {
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //create datasource
        val dataStream = env.readTextFile(filePath = "C:\\doc\\temp\\1.正行项目介绍.txt")
        
    
        //print
        dataStream.print()
        
    
        //execute
        env.execute()
    
    
      }
    
    }
    

    2.2.2  套接字流

    使用Ubantu系统自带的NC生成一个socket数据源

    然后编写代码,监听socket数据

    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    object socketSourceTest {
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //create datasource
        val socketDataStream = env.socketTextStream("localhost", 9999, '\n')
    
        //print
        socketDataStream.print()
    
        //execute
        env.execute()
    
      }
    
    }
    
    

    输出结果

    2.2.3 集合数据源

     
    

     

    import org.apache.flink.api.scala._
    object CollectionSourceTest {
    
      def main(args: Array[String]): Unit = {
    
    
        //create env
        val env = ExecutionEnvironment.getExecutionEnvironment
    
        //collection data source
        val collectionDataStream = env.fromElements(Tuple1(1L, 2L), Tuple1(3L, 4L))
    
    
        //print
        collectionDataStream.print()
    
      }
    
    }
    输出结果:

    外部数据源(kafka)

     kafka的broker

    borker即是kafka集群的每台机器Topic是一类数据的集合

     

     

    Partition

    是Topic数据的物理分区

     

    Producer

    负责生成数据到kafka的broker中

     

     

    consumer

     

    consumer Group

    为consumer指定对应的consumer Group

     

    kafka的安装

    下载kafka

    kafka_2.12-3.2.0kafkakakakakakakakakakakakaka-Java文档类资源-CSDN下载

    下载完成后直接解压

    tar -zxvf kafka_2.12-3.2.0.tgz

    修改kafka配置文件

    vi /opt/kafka/config/server.properties

    增加3个配置

    listeners=PLAINTEXT://10.31.126.10:9092

    advertised.listeners=PLAINTEXT://10.31.126.10:9092

    zookeeper.connect=10.31.126.10:2181

    启动kafka

    1.先启动zookeeper服务

    cd /opt/kafka

    ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
     

    开启以后,不要关闭当前窗口,不然的话zookeeper服务会中断

    ps: 如果报错:/opt/kafka/bin/kafka-run-class.sh: line 342: /opt/kafka/echo/bin/java: No such file or directory

    可以看下 echo $JAVA_HOME 是不是路径打印不出。

    解决方案: 执行    source /etc/profile ,再打印下  echo $JAVA_HOME,看下是否正常。

    2.开启kafka服务

    cd /opt/kafka

    ./bin/kafka-server-start.sh ./config/server.properties

    开启以后,不要关闭当前窗口,不然的话kafka服务会中断

    3.测试kafka

    bin/kafka-topics.sh --create --bootstrap-server 10.31.126.10:9092 --replication-factor 1 --partitions 1 --topic wordTest

    ps:在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代- -zookeeper localhost:2181。
     

    2.2一下版本:./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication 1 --partitions 1 --topic wordsendertest

    来启动
     

     

    查看topic列表

    bin/kafka-topics.sh --bootstrap-server 10.31.126.10:9092 --list

    查看对应Topic描述: 

    bin/kafka-topics.sh --describe --bootstrap-server 10.31.126.10:9092 --topic wordTest

    删除topic

    bin/kafka-topics.sh -delete --bootstrap-server hp2:9092 --topic hello

  • 相关阅读:
    大一作业HTML网页作业:中华传统文化题材网页设计(纯html+css实现)
    Python&OpenCV自动人脸打马赛克&调色系统[源码&UI操作界面&部署教程]
    c++ 学习之 初始化列表初始化属性
    Hadoop分布式集群搭建教程
    linux内核模块编译方法详解
    C++类模板
    K-近邻算法(KNN)
    Nginx介绍 安装
    羧基修饰青色乳胶微球100nm
    ELK - CentOS7 安装 elasticsearch 7.17.5
  • 原文地址:https://blog.csdn.net/hzp666/article/details/126152676