• 大数据分析与应用实验任务八


    大数据分析与应用实验任务八

    实验目的

    • 进一步熟悉pyspark程序运行方式;
    • 熟练掌握pysaprk RDD基本操作相关的方法、函数。

    实验任务

    进入pyspark实验环境,在图形界面的pyspark命令行窗口中完成下列任务:

    在实验环境中自行选择路径新建以自己姓名拼音命名的文件夹,后续代码中涉及的文件请保存到该文件夹下(需要时文件夹中可以创建新的文件夹)。

    一、 参考书上例子,理解并完成RDD常用操作(4.1.2节内容);
    1.转换操作
    (1)filter(func)

    filter(func)操作会筛选出满足函数 func 的元素,并返回一个新的数据集。例如:

    lines = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
    linesWithSpark = lines.filter(lambda line: "Spark" in line) 
    linesWithSpark.foreach(print) 
    
    • 1
    • 2
    • 3

    image-20231116111150747

    (2)map(func)

    map(func)操作将每个元素传递到函数 func 中,并将结果返回为一个新的数据集。例如:

    data = [1,2,3,4,5] 
    rdd1 = sc.parallelize(data) 
    rdd2 = rdd1.map(lambda x:x+10) 
    rdd2.foreach(print) 
    
    • 1
    • 2
    • 3
    • 4

    image-20231116111301428

    下面是另外一个实例:

    lines = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
    words = lines.map(lambda line:line.split(" ")) 
    words.foreach(print) 
    
    • 1
    • 2
    • 3

    image-20231116111354416

    (3)flatMap(func)

    flatMap(func)与 map()相似,但每个输入元素都可以映射到 0 或多个输出结果。例如:

    lines = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
    words = lines.flatMap(lambda line:line.split(" ")) 
    words.foreach(print) 
    
    • 1
    • 2
    • 3

    image-20231116111511592

    (4)groupByKey()

    groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集。下面给

    出一个简单实例,代码如下:

    words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)]) 
    words1 = words.groupByKey() 
    words1.foreach(print) 
    
    • 1
    • 2
    • 3

    image-20231116111553622

    (5)reduceByKey(func)

    reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每

    个值是将每个 key 传递到函数 func 中进行聚合后得到的结果。这里给出一个简单实例,代码如下:

    words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)]) 
    words1 = words.reduceByKey(lambda a,b:a+b) 
    words1.foreach(print)
    
    • 1
    • 2
    • 3

    image-20231116111642594

    2.行动操作
    rdd = sc.parallelize([1,2,3,4,5]) 
    rdd.count() 
    rdd.first() 
    rdd.take(3) 
    rdd.reduce(lambda a,b:a+b) 
    rdd.collect() 
    rdd.foreach(lambda elem:print(elem))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    image-20231116111735249

    3.惰性机制
    lineslzy = sc.textFile("file:///root/Desktop/luozhongye/wordlzy.txt") 
    lineLengths = lineslzy.map(lambda s:len(s)) 
    totalLength = lineLengths.reduce(lambda a,b:a+b) 
    print(totalLength)
    
    • 1
    • 2
    • 3
    • 4

    image-20231116111910718

    二、 参考书上例子,理解并完成键值对RDD常用操作(4.2.2节内容);

    常用的键值对转换操作包括 reduceByKey(func)、groupByKey()、keys、values、sortByKey()、sortBy()、mapValues(func)、join()和 combineByKey 等。

    1.reduceByKey(func)
    pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1),("罗忠烨",1)]) 
    pairRDD.reduceByKey(lambda a,b:a+b).foreach(print) 
    
    • 1
    • 2

    image-20231116112042871

    2.groupByKey()
    list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)] 
    pairRDD = sc.parallelize(list) 
    pairRDD.groupByKey() 
    pairRDD.groupByKey().foreach(print)
    
    • 1
    • 2
    • 3
    • 4

    image-20231116112213649

    对于一些操作,既可以通过 reduceByKey()得到结果,也可以通过组合使用 groupByKey()和 map()操作得到结果,二者是“殊途同归”,下面是一个实例:

    words = ["one", "two", "two", "three", "three", "three"] 
    wordPairsRDD = sc.parallelize(words).map(lambda word:(word, 1)) 
    wordCountsWithReduce = wordPairsRDD.reduceByKey(lambda a,b:a+b) 
    wordCountsWithReduce.foreach(print) 
    wordCountsWithGroup = wordPairsRDD.groupByKey().map(lambda t:(t[0],sum(t[1]))) 
    wordCountsWithGroup.foreach(print) 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    image-20231116121259280

    3.keys
    list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] 
    pairRDD = sc.parallelize(list) 
    pairRDD.keys().foreach(print)
    
    • 1
    • 2
    • 3

    image-20231116112613497

    4.values
    list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1),("luozhongyeTop1",1)] 
    pairRDD = sc.parallelize(list) 
    pairRDD.values().foreach(print)
    
    • 1
    • 2
    • 3

    image-20231116112822976

    5.sortByKey()
    list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] 
    pairRDD = sc.parallelize(list) 
    pairRDD.foreach(print) 
    pairRDD.sortByKey().foreach(print)
    
    • 1
    • 2
    • 3
    • 4

    image-20231116113006599

    6.sortBy()
    d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9),("luozhongye",1)]) 
    d1.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()
    
    • 1
    • 2

    image-20231116113102891

    sortByKey(False)括号中的参数 False 表示按照降序排序,如果没有提供参数 False,则默认采用升序排序(即参数取值为 True)。从排序后的效果可以看出,所有键值对都按照 key 的降序进行了排序,因此输出[(‘g’, 21), (‘f’, 29), (‘e’, 17), (‘d’, 9), (‘c’, 27), (‘b’, 38), (‘a’, 42)]。但是,如果要根据 21、29、17 等数值进行排序,就无法直接使用 sortByKey()来实现,这时可以使用 sortBy(),代码如下:

    d1 = sc.parallelize([("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)]) 
    d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x,False).collect() 
    d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[0],False).collect() 
    d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x:x[1],False).collect() 
    
    • 1
    • 2
    • 3
    • 4

    image-20231116113232289

    7.mapValues(func)
    list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] 
    pairRDD = sc.parallelize(list) 
    pairRDD1 = pairRDD.mapValues(lambda x:x+1) 
    pairRDD1.foreach(print)
    
    • 1
    • 2
    • 3
    • 4

    image-20231116113332961

    8.join()
    pairRDD1 = sc.parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)]) 
    pairRDD2 = sc.parallelize([("spark","fast")]) 
    pairRDD3 = pairRDD1.join(pairRDD2) 
    pairRDD3.foreach(print)
    
    • 1
    • 2
    • 3
    • 4

    image-20231116113434643

    9.combineByKey

    (1)createCombiner:在第一次遇到 key 时创建组合器函数,将 RDD 数据集中的 V 类型值转换成 C 类型值(V => C);

    (2)mergeValue:合并值函数,再次遇到相同的 key 时,将 createCombiner 的 C 类型值与这次传入的 V 类型值合并成一个 C 类型值(C,V)=>C;

    (3)mergeCombiners:合并组合器函数,将 C 类型值两两合并成一个 C 类型值;

    (4)partitioner:使用已有的或自定义的分区函数,默认是 HashPartitioner;

    (5)mapSideCombine:是否在 map 端进行 Combine 操作,默认为 true。

    下面通过一个实例来解释如何使用 combineByKey 操作。假设有一些销售数据,数据采用键值对的形式,即<公司,当月收入>,要求使用 combineByKey 操作求出每个公司的总收入和每月平均收入,并保存在本地文件中。

    为了实现该功能,可以创建一个代码文件“/root/Desktop/luozhongye/Combinelzy.py”,并输入如下代码:

    #!/usr/bin/env python3 
    from pyspark import SparkConf, SparkContext 
    
    conf = SparkConf().setMaster("local").setAppName("Combine ") 
    
    sc = SparkContext(conf = conf) 
    
    data = sc.parallelize([("company-1",88),("company-1",96),("company-1",85),("company-2",94),("company-2",86),("company-2",74),("company-3",86),("company-3",88),("company-3",92)],3) 
    
    res = data.combineByKey(lambda income:(income,1),lambda acc,income:(acc[0]+income, acc[1]+1),lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1])).map(lambda x:(x[0],x[1][0],x[1][0]/float(x[1][1])))
    
    res.repartition(1).saveAsTextFile("file:///root/Desktop/luozhongye/combineresultlzy")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    执行如下命令运行该程序:

    cd /root/Desktop/luozhongye 
    /usr/local/spark/bin/spark-submit Combinelzy.py
    
    • 1
    • 2

    image-20231116113910011

    三、 逐行理解并运行4.4.1实例“求TOP值”。

    假设在某个目录下有若干个文本文件,每个文本文件里面包含了很多行数据,每行数据由 4 个字段的值构成,不同字段值之间用逗号隔开,4 个字段分别为 orderid、userid、payment 和 productid,要求求出 Top N 个 payment 值。如下为一个样例文件 file0lzy.txt:

    1,1768,50,155 
    2,1218,600,211 
    3,2239,788,242 
    4,3101,28,599 
    5,4899,290,129 
    6,3110,54,1201 
    7,4436,259,877 
    8,2369,7890,27
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    实现上述功能的代码文件“/root/Desktop/luozhongye/TopN.py”的内容如下:

    # !/usr/bin/env python3 
    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setMaster("local").setAppName("ReadHBase")
    sc = SparkContext(conf=conf)
    lines = sc.textFile("file:///root/Desktop/luozhongye/file0.txt")
    result1 = lines.filter(lambda line: (len(line.strip()) > 0) and (len(line.split(",")) ==
                                                                     4))
    result2 = result1.map(lambda x: x.split(",")[2])
    result3 = result2.map(lambda x: (int(x), ""))
    result4 = result3.repartition(1)
    result5 = result4.sortByKey(False)
    result6 = result5.map(lambda x: x[0])
    result7 = result6.take(5)
    for a in result7:
    	print(a)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    image-20231116121013386
    = result1.map(lambda x: x.split(“,”)[2])
    result3 = result2.map(lambda x: (int(x), “”))
    result4 = result3.repartition(1)
    result5 = result4.sortByKey(False)
    result6 = result5.map(lambda x: x[0])
    result7 = result6.take(5)
    for a in result7:
    print(a)

    ![在这里插入图片描述](https://img-blog.csdnimg.cn/a3c32d0053e2481c83318d6c27bb68bb.png)
    
    
    
    • 1
    • 2
    • 3
  • 相关阅读:
    C语言入门 Day_13 二维数组
    Redis JDBC
    01下班后一小时| 《JavaScript 悟道》 读书笔记
    【毕业设计源码】基于SSM的高校学籍信息管理系统的设计与实现
    Leetcode 242:有效的字母异位词
    webpack常见的loader和plugins,及它们各自作用机制
    10款功能强大的网络嗅探工具应用分析
    支持向量机--svm.SVC类
    redis报错 Error getaddrinfo ENOTFOUND
    idea实现Docker 远程部署项目
  • 原文地址:https://blog.csdn.net/qq_45473330/article/details/134439005