• spark(day01)


    官网

    https://spark.apache.org/
    spark下载
    https://archive.apache.org/dist/spark/spark-2.0.1/
    
    Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的,后贡献给Apache。是一种快速、通用、可扩展的大数据分析引擎。它是不断壮大的大数据分析解决方案家族中备受关注的明星成员,为分布式数据集的处理提供了一个有效框架,并以高效的方式处理分布式数据集。Spark集批处理、实时流处理、交互式查询、机器学习与图计算于一体,避免了多种运算场景下需要部署不同集群带来的资源浪费。目前,Spark社区也成为大数据领域和Apache软件基金会最活跃的项目之一,其活跃度甚至远超曾经只能望其项背的Hadoop
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    spark介绍
    1.Spark是一种快速,通用的分布式计算框架,可以用于处理海量数据

    目前大数据常用的计算框
    1.MapReduce(离线批处理)
    2.Spark(离线批处理+实时处理)
    3.Flink(实时处理)
    4.storm(实时处理)

    spark的性能表现:
    如果完全基于内存进行数据处理,要比MapReduce快100倍,如果基于磁盘处理,也比MapReduce快10倍

    在这里插入图片描述

    从上图可以看成,Hadoop的MapReduce是一种高度依赖于磁盘的框架,当发生Shuffle时,会产生大量的磁盘I/0.此外,在某些业务场景下,比如做一些迭代类型的算法时(梯度下降法,逻辑冥归模型等),需要重复用到计算链中某一步的结果,会导致大量的重新计算,即又产生了频繁的磁盘I/o,导致性能很低。

    在这里插入图片描述

    在这里插入图片描述
    综上,可以发现Spark涵盖了大数据处理的所有领域:
    1.离线批处理
    2.交互式查询
    3.建立数据仓库
    4.实时流计算
    5.机器学习的算法建模
    所以看出,Spark的设计团队要做的目标是一栈式处理大数据的所有应用场景,好处是集成方便,无缝衔接。

    ===============================

    Spark最核心的数据结构-RDD(弹性分布式数据集)

    ===============================

    spark的使用
    1.Local单机模式,用于练习或测试
    2.standalone集群模式
    3.on Yarn集群模式

    下载spark安装包
    解压

    [root@hadoop01 conf]# pwd
    /home/presoftware/spark-2.0.1-bin-hadoop2.7/conf
    [root@hadoop01 conf]# mv spark-env.sh.template spark-env.sh
    [root@hadoop01 conf]# vim spark-env.sh
    SPARK_LOCAL_IP=hadoop01
    #配置shuffle过程中产生临时文件存放的位置
     SPARK_LOCAL_DIRS=/home/presoftware/spark-2.0.1-bin-hadoop2.7/tmp
    #jdk路径配置
     export JAVA_HOME=/home/presoftware/jdk1.8.0_181
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    启动单机模式

    [root@hadoop01 bin]# sh spark-shell --master=local
    
    
    • 1
    • 2

    简单操作

    scala> val a1=Array(1,2,3,4)
    a1: Array[Int] = Array(1, 2, 3, 4)
    
    
    • 1
    • 2
    • 3
    scala> a1.max
    res1: Int = 4
    
    
    • 1
    • 2
    • 3

    如何理解Spark部RDD?
    初学时,就把RDD看做一种集合类型(类比于Array List)但是RDD是一种特殊的集合类型:
    1.RDD有分区机制,目的是可以分布式,并行的处理数据集,可以极大的提高数据的处理效率
    2.RDD有容错机制(数据丢失可以恢复)

    在这里插入图片描述

    创建RDD的两种途径:
    1.把一个普通的集合(Array或List)转变成一个RDD

    scala> val a1=Array(1,2,3,4)
    a1: Array[Int] = Array(1, 2, 3, 4)
    
    scala> a1.max
    res1: Int = 4
    
    scala> val r1=sc.parallelize(a1,2)
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里插入图片描述

    scala> r1.partitions.size
    res3: Int = 2
    
    scala> r1.glom.collect
    [Stage 0:>                                                          (0 + 0) [Stage 0:=============================>                             (1 + 1)                                                                             res4: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    分区练习

    scala> val l1=List("hello","hello","world","hello")
    l1: List[String] = List(hello, hello, world, hello)
    
    scala> val r2=sc.parallelize(l1,4)
    r2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:26
    
    scala> r2.glom.collect
    res5: Array[Array[String]] = Array(Array(hello), Array(hello), Array(world), Array(hello))
    
    scala> r2.collect
    res6: Array[String] = Array(hello, hello, world, hello)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    scala> val r3=sc.parallelize(l2,2)
    r3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:26
    
    scala> r3.filter{x=> x%2==0}
    res9: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:29
    
    scala> res9.collect
    [Stage 3:>                                                          (0 + 0)                                                                             res10: Array[Int] = Array(2, 4)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    scala> val r4=sc.makeRDD(List(1,2,3,4),2)
    r4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at makeRDD at <console>:24
    
    scala> r4.collect
    res11: Array[Int] = Array(1, 2, 3, 4)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Spark通过读取外部的存储文件;把文件数据转变为RDD
    在这里插入图片描述

    本地存储获取—rdd

    [root@hadoop01 ~]# cd /home/
    [root@hadoop01 home]# vim 1.txt
    
    hello world
    hello hadoop
    hello spark
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    scala> val data1=sc.textFile("file:///home/1.txt",2)
    data1: org.apache.spark.rdd.RDD[String] = file:///home/1.txt MapPartitionsRDD[8] at textFile at :24
    
    scala> data1.collect
    res13: Array[String] = Array(hello world, hello hadoop, hello spark)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    练习

    scala> data1.flatMap{x=>x.split(" ")}
    res14: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at <console>:27
    
    scala> res14.collect
    res15: Array[String] = Array(hello, world, hello, hadoop, hello, spark)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    练习

    [root@hadoop01 home]# vim 2.txt
    
    192.168.231.11
    192.168.231.12
    192.168.231.13
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    [root@hadoop01 home]# vim 3.txt
    
    192.168.231.12
    192.168.231.13
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    scala> val data2=sc.textFile("file:///home/2.txt",2)
    data2: org.apache.spark.rdd.RDD[String] = file:///home/2.txt MapPartitionsRDD[11] at textFile at :24
    
    
    • 1
    • 2
    • 3
    scala> val data3=sc.textFile("file:///home/3.txt",2)
    data3: org.apache.spark.rdd.RDD[String] = file:///home/3.txt MapPartitionsRDD[13] at textFile at :24
    
    
    • 1
    • 2
    • 3
    scala> val r1=data2.intersection(data3)
    r1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at intersection at <console>:28
    
    scala> r1.collect
    res16: Array[String] = Array(192.168.231.12, 192.168.231.13)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    hdfs存储文件获取—rdd

    启动
    [root@hadoop01 home]# start-dfs.sh
    
    • 1
    • 2
    查看
    [root@hadoop01 home]# hadoop fs -ls /
    [root@hadoop01 home]# hadoop fs -mkdir /data
    [root@hadoop01 home]# hadoop fs -put /home/1.txt /data
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    scala> val data4=sc.textFile("hdfs://hadoop01:9000/data/1.txt",2)
    data4: org.apache.spark.rdd.RDD[String] = hdfs://hadoop01:9000/data/1.txt MapPartitionsRDD[23] at textFile at <console>:24
    
    scala> data4.collect
    [Stage 10:>                                                         (0 + 0)                                                                             res18: Array[String] = Array(hello world, hello hadoop, hello spark)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    RDD的操作
    操作种类总和来分,两种:
    1.Transformation变化操作,算子-----(懒操作)->特点是:调用方法时,并不是马上触发计算
    此外,每执行一次懒方法,就会返回一个新的RDD
    2.Action执行操作->特点是:调用后,触发懒方法的执行,比;如collect

    map映射

    scala> val r1=sc.makeRDD(List(1,2,3,4),2)
    r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at makeRDD at <console>:24
    
    scala> val r2=r1.map{num=>num.toString}
    r2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at map at <console>:26
    
    scala> r2.collect
    res19: Array[String] = Array(1, 2, 3, 4)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

    对r1,按分区操作,求和

    
    scala> r1.glom.collect
    [Stage 12:>                                                         (0 + 0)                                                                             res21: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4))
    
    s
    
    
    cala> val r3=r1.mapPartitions{it=>
         | val result=List[Int]()
         | var sum=0
         | while(it.hasNext){
         | sum=sum+it.next
         | }
         | result.::(sum).iterator
         | }
    r3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at mapPartitions at <console>:26
    
    scala> r3.glom.collect
    res24: Array[Array[Int]] = Array(Array(3), Array(7))
    
    scala> r3.collect
    res25: Array[Int] = Array(3, 7)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    scala> val r4=r1.mapPartitionsWithIndex{(index,it)=>
         | val result=List[String]()
         | var sum=0
         | while(it.hasNext){
         | sum=sum+it.next
         | }
         | result.::(index+"|"+sum).iterator
         | }
    r4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[30] at mapPartitionsWithIndex at <console>:26
    
    scala> r4.glom.collect
    res26: Array[Array[String]] = Array(Array(0|3), Array(1|7))
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    scala> val r5=sc.textFile("file:///home/2.txt",2)
    r5: org.apache.spark.rdd.RDD[String] = file:///home/2.txt MapPartitionsRDD[33] at textFile at :24
    
    scala> val r6=sc.textFile("file:///home/3.txt",2)
    r6: org.apache.spark.rdd.RDD[String] = file:///home/3.txt MapPartitionsRDD[35] at textFile at :24
    
    
    scala> val r7=r5.union(r6)
    r7: org.apache.spark.rdd.RDD[String] = UnionRDD[36] at union at <console>:28
    
    scala> r7.glom.collect
    res27: Array[Array[String]] = Array(Array(192.168.231.11, 192.168.231.12), Array(192.168.231.13), Array(192.168.231.12, 192.168.231.13), Array())
    
    scala> r7.collect
    res28: Array[String] = Array(192.168.231.11, 192.168.231.12, 192.168.231.13, 192.168.231.12, 192.168.231.13)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    scala> r7.distinct
    res29: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[40] at distinct at <console>:31
    
    scala> res29.collect
    res30: Array[String] = Array(192.168.231.11, 192.168.231.12, 192.168.231.13)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    scala> val r8=r5.intersection(r6)
    r8: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[46] at intersection at <console>:28
    
    scala> r8.collect
    res31: Array[String] = Array(192.168.231.12, 192.168.231.13)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    scala> val r9=r5.subtract(r6)
    r9: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[54] at subtract at <console>:28
    
    scala> r9.collect
    res33: Array[String] = Array(192.168.231.11)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    scala> val r10=sc.makeRDD(List(("bj",1),("sh",2),("bj",3),("sh",4)),2)
    r10: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[55] at makeRDD at <console>:24
    
    scala> val r11=r10.groupBy{x=>x._1}
    r11: org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])] = ShuffledRDD[57] at groupBy at <console>:26
    
    scala> r11.collect
    res34: Array[(String, Iterable[(String, Int)])] = Array((bj,CompactBuffer((bj,1), (bj,3))), (sh,CompactBuffer((sh,2), (sh,4))))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    scala> val r12=r10.groupByKey
    r12: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[58] at groupByKey at <console>:26
    
    scala> r12.collect
    res35: Array[(String, Iterable[Int])] = Array((bj,CompactBuffer(1, 3)), (sh,CompactBuffer(2, 4)))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    scala> val r13=sc.makeRDD(List((1,"bj"),(2,"sh"),(3,"bj"),(4,"sh")),2)
    r13: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[59] at makeRDD at <console>:24
    
    scala> val r14=r13.map{x=>(x._2,x._1)}.groupByKey
    r14: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[63] at groupByKey at <console>:26
    
    scala> r14.collect
    res37: Array[(String, Iterable[Int])] = Array((bj,CompactBuffer(1, 3)), (sh,CompactBuffer(2, 4)))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    groupBy和groupBykey的区别

    scala> val r14=r13.map{x=>(x._2,x._1)}.groupBy{x=>x._1}
    r14: org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])] = ShuffledRDD[66] at groupBy at <console>:26
    
    scala> r14.collect
    res38: Array[(String, Iterable[(String, Int)])] = Array((bj,CompactBuffer((bj,1), (bj,3))), (sh,CompactBuffer((sh,2), (sh,4))))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    scala> val r15=sc.makeRDD(List(("hello",1),("world",2),("hello",1),("hel)lo",3)),2)
    
    scala> val r16=r15.reduceByKey{(a,b)=>a+b}
    
    scala> r16.collect
    [Stage 0:>                                                          (0 +[Stage 0:=============================>                             (1 +                                                                        res0: Array[(String, Int)] = Array((hello,5), (world,2))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    
    [root@hadoop01 home]# cat 1.txt 
    hello world
    hello hadoop
    hello spark
    
    scala> val r17=sc.textFile("file:///home/1.txt",2)
    
    scala> val r19=r17.flatMap { x => x.split(" ") }.map{world=>(world,1)}.reduceByKey{_+_}
    r19: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[10] at reduceByKey at <console>:26
    
    scala> r19.collect
    res4: Array[(String, Int)] = Array((hello,3), (world,1), (spark,1), (hadoop,1))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    val rdd=sc.makeRDD(List(("a",1),("b",2),("a",2),("b",3),("a",5),("a",3),("b",3),("b",3)),2)
    分区0---_+_
    a 1,2   --a,3
    b 2,3   --b,5
    
    分区1------_+_
    a 5,3   ----a,8
    b 3,3  ------b,6
    
    _*_
    a 3*8
    b 5*6
    
    
    scala> rdd.aggregateByKey (O)(_+_ ,_*_) ;
    
    scala> res10.collect
    res11: Array[(String, Int)] = Array((b,30), (a,24))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    scala> val r19=sc.makeRDD(List((3,"sz"),(1,"bj"),(2,"sh"),(4,"gz")),2)
    r19: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at makeRDD at <console>:24
    
    scala> r19.sortBy{x=>x._1}
    res16: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[22] at sortBy at <console>:27
    
    scala> res16.collect
    res17: Array[(Int, String)] = Array((1,bj), (2,sh), (3,sz), (4,gz))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    scala> r19.sortByKey(true)---升序
    res20: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at sortByKey at <console>:27
    
    scala> res20.collect
    res21: Array[(Int, String)] = Array((1,bj), (2,sh), (3,sz), (4,gz))
    
    
    scala> r19.sortByKey(false)--降序
    res22: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[28] at sortByKey at <console>:27
    
    scala> res22.collect
    res23: Array[(Int, String)] = Array((4,gz), (3,sz), (2,sh), (1,bj))
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    scala> val rdd1=sc.makeRDD(List(("cat",1),("dog",2)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at makeRDD at <console>:24
    
    scala> val rdd2=sc.makeRDD(List(("cat",3),("dog",4),("tiger",9)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at makeRDD at <console>:24
    
    scala> rdd1.join(rdd2)
    res24: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[33] at join at <console>:29
    
    scala> res24.collect
    res25: Array[(String, (Int, Int))] = Array((dog,(2,4)), (cat,(1,3)))
    
    scala> rdd2.join(rdd1).collect
    res26: Array[(String, (Int, Int))] = Array((dog,(4,2)), (cat,(3,1)))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

    笛卡尔积

    scala> val rdd1=sc.makeRDD(List(1,2,3))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at makeRDD at <console>:24
    
    scala> val rdd2=sc.makeRDD(List("a","b"))
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[38] at makeRDD at <console>:24
    
    scala> rdd1.cartesian(rdd2).collect
    res27: Array[(Int, String)] = Array((1,a), (1,b), (2,a), (2,b), (3,a), (3,b))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    扩容

    scala> val r22=r19.coalesce(4,true)
    r22: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[44] at coalesce at <console>:26
    
    scala> r22.partitions.size
    res30: Int = 4
    
    • 1
    • 2
    • 3
    • 4
    • 5

    action
    执行方法

    scala> val r23=sc.makeRDD(List(1,2,3,4),2)
    r23: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at makeRDD at <console>:24
    
    scala> val r24=r23.map{num=>num*2}
    r24: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[46] at map at <console>:26
    
    scala> r24.collect
    res31: Array[Int] = Array(2, 4, 6, 8)
    
    scala> r24.reduce{_+_}
    res32: Int = 20
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    scala> r24.count
    res33: Long = 4
    
    • 1
    • 2
    scala> r24.first
    res34: Int = 2
    
    
    • 1
    • 2
    • 3
    scala> r24.take(3)
    res35: Array[Int] = Array(2, 4, 6)
    
    • 1
    • 2

    数据升序,前n项返回

    scala> val r25=sc.makeRDD(List(3,1,2,5,4),2)
    r25: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at makeRDD at <console>:24
    
    scala> r25.takeOrdered(3)
    res37: Array[Int] = Array(1, 2, 3)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    降序前n项

    scala> r25.top(5)
    res38: Array[Int] = Array(5, 4, 3, 2, 1)
    
    
    • 1
    • 2
    • 3
    scala> r25.max
    res39: Int = 5
    
    scala> r25.sum
    res40: Double = 15.0
    
    • 1
    • 2
    • 3
    • 4
    • 5

    数据存储

    scala> r25.saveAsTextFile("file:///home/result")
    
    [root@hadoop01 home]# cd result/
    [root@hadoop01 result]# ls
    part-00000  part-00001  _SUCCESS
    [root@hadoop01 result]# pwd
    /home/result
    [root@hadoop01 result]# cat part-00000
    3
    1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    scala> r25.saveAsTextFile("hdfs://hadoop01:9000/sparkresult")
    [Stage 40:>                                                         (0 +[Stage 40:>                                                         (0 +[Stage 40:=============================>          
    
    
    [root@hadoop01 ~]# hadoop fs -ls /
    Found 18 items
    -rw-r--r--   1 root supergroup  210606807 2021-09-04 22:21 /a.rpm
    drwxr-xr-x   - root supergroup          0 2022-08-28 17:30 /data
    drwxr-xr-x   - root supergroup          0 2022-07-16 20:18 /flume
    drwxr-xr-x   - root supergroup          0 2021-08-26 21:10 /gyj
    drwxr-xr-x   - root supergroup          0 2022-08-01 14:01 /hbase
    drwxr-xr-x   - root supergroup          0 2021-08-26 18:38 /log
    drwxr-xr-x   - root supergroup          0 2021-11-20 12:44 /mr
    drwxr-xr-x   - root supergroup          0 2022-07-21 18:40 /orders
    drwxr-xr-x   - root supergroup          0 2022-07-21 18:42 /oresers
    drwxr-xr-x   - root supergroup          0 2022-07-21 18:40 /products
    drwxr-xr-x   - root supergroup          0 2021-11-20 13:39 /result
    drwxr-xr-x   - root supergroup          0 2022-07-17 13:37 /score
    drwxr-xr-x   - root supergroup          0 2022-09-02 16:01 /sparkresult
    drwxr-xr-x   - root supergroup          0 2022-07-17 10:03 /student
    drwx-wx-wx   - root supergroup          0 2022-07-17 08:20 /tmp
    drwxr-xr-x   - root supergroup          0 2022-07-17 08:13 /user
    drwxr-xr-x   - root supergroup          0 2022-07-22 19:02 /words
    drwxr-xr-x   - root supergroup          0 2022-07-23 19:18 /zebra
    [root@hadoop01 ~]# hadoop fs -ls /sparkresult
    Found 3 items
    -rw-r--r--   3 root supergroup          0 2022-09-02 16:01 /sparkresult/_SUCCESS
    -rw-r--r--   3 root supergroup          4 2022-09-02 16:01 /sparkresult/part-00000
    -rw-r--r--   3 root supergroup          6 2022-09-02 16:01 /sparkresult/part-00001
                   
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    获取hdfs数据,处理,存储到hdfs

    
    scala> val r26=sc.textFile("hdfs://hadoop01:9000/data/1.txt")
    
    scala> val r27=r26.flatMap{x=>x.split(" ")}.map{x=>(x,1)}.groupByKey.map
    {case(x,y)=>(x,y.sum)}
    
    scala> r27.collect
    res48: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))
    
    scala> r27.saveAsTextFile("hdfs://hadoop01:9000/dataspark")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    命令行查看

    [root@hadoop01 ~]# hadoop fs -cat /dataspark/part-00000
    (spark,1)
    (hadoop,1)
    (hello,3)
    (world,1)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    eclipse查看
    在这里插入图片描述

    res56: Array[(String, Int)] = Array((hello,1), (world,2), (hello,1), (hello,3))
    scala> r15.countByKey
    res57: scala.collection.Map[String,Long] = Map(hello -> 3, world -> 1)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    scala> r15.foreach{println}
    (hello,1)
    (world,2)
    (hello,1)
    (hello,3)
    
    
    scala> r15.foreach{x=>println(x)}
    (hello,1)
    (world,2)
    (hello,1)
    (hello,3)
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    注意:
    collect 会将所有的元素集中到一个集合(容器),容易导致内存溢出,生产环境慎用

    总结:
    常用的懒方法

    1.map
    2.flatMap
    3.filter
    3.sortBy
    4.sortByKey
    5.groupBy
    6.groupByKey
    7.reduceByKey
    8.intersection
    9.union
    10.subtract
    11.distinct
    12.coalesce 改变分区数
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    常用的执行方法

    1.reduce
    2.max
    3.min
    4.sum
    5.count
    6.take
    7.foreach
    8.collect(生产慎用)
    9.saveAsTextFile
    10.first
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    eclipse操作
    1.新建Scala项目
    在这里插入图片描述
    2.导入spark相关jar包
    在这里插入图片描述
    3.全选jar包,右键----build path
    在这里插入图片描述

    在这里插入图片描述

    上图演示了WordCount的整个计算链,Spark把一个计算链抽象成一个DAGDAG(有向无环图)
    DAG中记录了RDD之间的依赖关系(血缘关系)
    依赖关系指的是:父RDD和子RDD之间的变换方法(懒方法)
    借助RDD之间的依赖关系,可以实现RDD的数据容错,即RDD分区数据丢失之后,可以进进行恢复

    Spark RDD之间的依赖关系

    Spark的RDD,通过各种变换方法(懒方法)产生了RDD之间的依赖关系。借助依赖关系,当子分区数据丢时,可以从父分区数据+依赖关系进行数据恢复
    依赖关系总的来分,有两种:
    1.窄依赖:父分区和子分区是一对一关系
    典型的:比如lmap,filter等方法产生的就是窄依赖,这类操作只是将父分区数据简单处理,放到对应的子分区中,所以是一对一关系。
    窄依赖没有Shuffle,不发生磁盘I/O,所以执行效率很高。如果整个的DAG中存在多个连续的窄依赖,Spark底层有优化将多个连续的窄依赖整合到一起执行。称为流水线优化

    2.宽依赖:父分区和子分区是一对多关系
    典型的:比如GroupBy,ReduceByKey。这一类操作,要根据某种分组条件,将数据分发到不同的子分区。
    宽依赖有shuffle,会发生磁盘I/O,无法进行优化

    综上,Spark框架也是有Shuffle过程,但是Spark已经尽量避免产生Shuffle,从而提高执行效率

    在这里插入图片描述
    spark的stage划分
    在这里插入图片描述

    在这里插入图片描述
    模拟stage案例,促进理解

    scala> val data=sc.textFile("hdfs://hadoop01:9000/data",2)
    data: org.apache.spark.rdd.RDD[String] = hdfs://hadoop01:9000/data MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> val result=data.flatMap{_.split(" ")}.map{(_,1)}.reduceByKey(_+_)
    result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:26
    
    scala> result.saveAsTextFile("hdfs://hadoop01:9000/result04")
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    Spark Stage和Task的概念
    Spark会将一个DAG划分成多个Stage(阶段)。划分规则:
    从Action向前找,遇到宽依赖则划分为一个Stage,遇到窄依赖则应用流水线优化。
    Stage本质上就是一组Task的集合
    Task(任务):RDD中的一个分区就对应个Task

    如下图展示(8个task合并成4个taks)
    在这里插入图片描述
    在这里插入图片描述

    spark集群搭建
    1.第一台服务配置

    [root@hadoop01 conf]# pwd
    /home/presoftware/spark-2.0.1-bin-hadoop2.7/conf
    [root@hadoop01 conf]# vim spark-env.sh
     SPARK_LOCAL_IP=hadoop01
    #配置shuffle过程中产生临时文件存放的位置
     SPARK_LOCAL_DIRS=/home/presoftware/spark-2.0.1-bin-hadoop2.7/tmp
    #jdk路径配置
     export JAVA_HOME=/home/presoftware/jdk1.8.0_181
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    
    [root@hadoop01 conf]# pwd
    /home/presoftware/spark-2.0.1-bin-hadoop2.7/conf
    [root@hadoop01 conf]# mv slaves.template slaves
    [root@hadoop01 conf]# vim slaves
    #配置工作的服务器是哪几个
    hadoop01
    hadoop02
    hadoop03
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2.第二、三台服务配置

    将第一台配置好的拷贝到第二台和第三台
    [root@hadoop01 presoftware]# scp -r  spark-2.0.1-bin-hadoop2.7 root@hadoop03:/home/presoftware/
    [root@hadoop02 conf]# vim spark-env.sh
    修改
    SPARK_LOCAL_IP=hadoop02(各自的主机名)
    SPARK_LOCAL_IP=hadoop03(各自的主机名)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    启动spark集群(启动master和worker)【在哪台启动哪台就是master】

    [root@hadoop01 presoftware]# cd spark-2.0.1-bin-hadoop2.7/sbin/
    [root@hadoop01 sbin]# ls
    slaves.sh                  start-mesos-shuffle-service.sh  stop-mesos-dispatcher.sh
    spark-config.sh            start-shuffle-service.sh        stop-mesos-shuffle-service.sh
    spark-daemon.sh            start-slave.sh                  stop-shuffle-service.sh
    spark-daemons.sh           start-slaves.sh                 stop-slave.sh
    start-all.sh               start-thriftserver.sh           stop-slaves.sh
    start-history-server.sh    stop-all.sh                     stop-thriftserver.sh
    start-master.sh            stop-history-server.sh
    start-mesos-dispatcher.sh  stop-master.sh
    
    [root@hadoop01 sbin]# sh start-all.sh 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    停止服务

    [root@hadoop01 sbin]# sh stop-all.sh 
    
    • 1

    连接spark集群(bin目录)

    [root@hadoop01 bin]# sh spark-shell --master spark://hadoop01:7077
    
    • 1

    访问spark集群的web控制台

    http://hadoop01:8080/
    
    • 1

    在这里插入图片描述

    测试集群是否可用
    1.从本地文件获取数据

    scala> val data=sc.textFile("file:///home/1.txt",3)
    data: org.apache.spark.rdd.RDD[String] = file:///home/1.txt MapPartitionsRDD[1] at textFile at :24
    
    scala> val result=data.flatMap{_.split(" ")}.map{(_,1)}.reduceByKey{_+_}
    result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:26
    
    scala> result.collect
    [Stage 0:>                                                          (0 + 3) / 3]22/09/03 10:14:39 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 192.168.253.131): java.io.FileNotFoundException: File file:/home/1.txt does not exist
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    报错。原因1.txt不存在
    解决方式,在其他两台服务器上创建对应的目录和文件;利用的Hadoop的hdfs操作

    2.从hdfs获取

    scala> val data=sc.textFile("hdfs://hadoop01:9000/data",3)
    data: org.apache.spark.rdd.RDD[String] = hdfs://hadoop01:9000/data MapPartitionsRDD[6] at textFile at <console>:24
    
    scala> val result=data.flatMap{_.split(" ")}.map{(_,1)}.reduceByKey{_+_}
    result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:26
    
    scala> result.collect
    res1: Array[(String, Int)] = Array((world,1), (hadoop,1), (hello,3), (spark,1)) 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    案例:

    package cn.tedu.wordcount
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object Driver {
      def main(args: Array[String]): Unit = {
        
        //--创建spark的环境参数对象。
       //--最基本的需要设定:①运行模式local 表示本地单机模式②设置jobId
        val conf=new SparkConf().setMaster("local").setAppName("wordcount")
        
        //--创建Spark的上下文对象,通过此对象创建RDD,连接Spark,操作Spark集群等
        val sc=new SparkContext(conf)
        
        val data=sc.textFile("hdfs://hadoop01:9000/data", 2)
        
        val result=data.flatMap { line => line.split(" ")}
          .map { word => (word,1) }
          .reduceByKey{_+_}
        
          
          //执行Action方法,触发计算
        //result.foreach{println}
        
          //-存储的结果路径,事先不能存在
        result.saveAsTextFile("hdfs://hadoop01:9000/result02")
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    1.处理average.txt统计第二列数据的均值
    1 16
    2 74
    3 51
    4 35
    5 44
    6 95
    7 5
    8 29
    10 60
    11 13
    12 99
    13 7
    14 26
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    展示

    package cn.tedu.wordcount
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    
    object DriverAverage {
      //单词统计--求平均数
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setMaster("local").setAppName("average")
        val sc=new SparkContext(conf);
        val data=sc.textFile("hdfs://hadoop01:9000/average", 3);
        
        val sum=data.map{line=>line.split(" ")(1).toInt}.reduce{(a,b)=>a+b}//Int类型。精度损失
        val sum2=data.map{line=>line.split(" ")(1).toInt}.sum()
        val count=data.count().toInt
        val average=sum2/count
        println(average)
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    2.处理MaxMin.txt,返回男性身高的最大值(191)
    3.处理MaxMin.txt,返回男性身高最大值对应那一行数据比如:8 M 191
    1 M 174
    2 F 165
    3 M 172
    4 M 180
    5 F 160
    6 F 162
    7 M 172
    8 M 191
    9 F 175
    10 F 167
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    展示

    package cn.tedu.wordcount
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object DriverMaxMin {
        def main(args: Array[String]): Unit = {
          val conf=new SparkConf().setMaster("local").setAppName("maxmin")
          val sc=new SparkContext(conf)
          val data=sc.textFile("hdfs://hadoop01:9000/maxmin", 3)
          val max=data.filter { line => line.split(" ")(1)=="M" }.map { line => line.split(" ")(2) }.max()
          val min=data.filter { line => line.split(" ")(1)=="M" }.map { line => line.split(" ")(2) }.min()
          println("最大值:"+max+"~~~~最小值:"+min)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    打印年龄最大一行的所有数据

    package cn.tedu.wordcount
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object DriverMin {
      def main(args: Array[String]): Unit = {
          val conf=new SparkConf().setMaster("local").setAppName("min")
          val sc=new SparkContext(conf)
          val data=sc.textFile("hdfs://hadoop01:9000/maxmin", 3)
          val max=data.filter { line => line.split(" ")(1)=="M" }.sortBy{x=> -x.split(" ")(2).toInt}.take(1)
          max.foreach { println }
          println(max.mkString)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    
    
    4.处理topk.txt,返回出现频次最高的前3项单词,比如:( hello,10) ( hive ,6)(hadoop ,4)
    
    hello world bye world
    hello hadoop bye hadoop
    hello world java web
    hadoop scala java hive
    hadoop hive redis hbase
    hello hbase java redis
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    展示

    package cn.tedu.wordcount
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object DriverTopk {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setMaster("local").setAppName("topk")
        val sc=new SparkContext(conf)
        val data=sc.textFile("hdfs://hadoop01:9000/topk",3)
        val topk=data.flatMap { line => line.split(" ") }.map { word => (word,1) }.reduceByKey{_+_}.sortBy{case(x,y)=>(-y)}
        //top处理
        val topk2=data.flatMap { line => line.split(" ") }.map { word => (word,1) }.reduceByKey{_+_}.top(3)(Ordering.by{x=> x._2})
        
    //    topk.take(3).foreach{println}
        topk2.foreach{println}
    
        
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 相关阅读:
    720云手机电动云台全新上市,让手机能自动拍摄亿万像素VR全景
    加速老化测试目的是什么?
    今天的码农女孩继续总结jQuery的知识
    解决 UDP 接收不到数据问题
    C#中File类常见用法总结
    RunnerGo:轻量级、全栈式、易用性和高效性的测试工具
    【精选】OpenCV多视角摄像头融合的目标检测系统:全面部署指南&源代码
    Tinyalsa PCM API 实现深度剖析
    OSG文字-osgText3D(5)
    【Mysql高级特性】 InnoDB 的关键特性
  • 原文地址:https://blog.csdn.net/yygyj/article/details/126574760