• 【大数据学习篇6】 Spark操作统计分析数据操作


    通过前面的文章安装好环境下面我们就可以开始来操作

    1. Spark操作
     

      [hd@master ~]$ spark-shell
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    2022-09-14 23:13:12,403 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Spark context Web UI available at http://192.168.159.129:4040
    Spark context available as 'sc' (master = local[*], app id = local-1663168393546).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
          /_/
    
    Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala>
    
    scala> val rdd = sc.textFile("/data/tall_sum.csv")

    rdd: org.apache.spark.rdd.RDD[String] = /data/tall_sum.csv MapPartitionsRDD[1] at textFile at :24
    
    scala> rdd.collect

    res0: Array[String] = Array(1,178.80,0.00,上海,2020-02-21 00:00:00,,0.00, 2,21.00,21.00,内蒙古自治区,2020-02-20 23:59:54,2020-02-21 00:00:02,0.00, 3,37.00,0.00,安徽省,2020-02-20 23:59:35,,0.00, 4,157.00,157.00,湖南省,2020-02-20 23:58:34,2020-02-20 23:58:44,0.00, 5,64.80,0.00,江苏省,2020-02-20 23:57:04,2020-02-20 23:57:11,64.80, 6,327.70,148.90,浙江省,2020-02-20 23:56:39,2020-02-20 23:56:53,178.80, 7,357.00,357.00,天津,2020-02-20 23:56:36,2020-02-20 23:56:40,0.00, 8,53.00,53.00,浙江省,2020-02-20 23:56:12,2020-02-20 23:56:16,0.00, 9,43.00,0.00,湖南省,2020-02-20 23:54:53,2020-02-20 23:55:04,43.00, 10,421.00,421.00,北京,2020-02-20 23:54:28,2020-02-20 23:54:33,0.00, 11,267.90,0.00,北京,2020-02-20 23:54:24,2020-02-20 23:54:31,267.90, 12,37.00,37.00,四川省,2020-02-20 23:54:24,2020-02-20 23:54:31,0.00, 13,53.00,53.00,上海,2020-02-...
    scala>
    
    scala> val rdd1 = rdd.map(_.split(","))

    rdd1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :26
    
    scala> rdd1.collect

    res1: Array[Array[String]] = Array(Array(1, 178.80, 0.00, 上海, 2020-02-21 00:00:00, "", 0.00), Array(2, 21.00, 21.00, 内蒙古自治区, 2020-02-20 23:59:54, 2020-02-21 00:00:02, 0.00), Array(3, 37.00, 0.00, 安徽省, 2020-02-20 23:59:35, "", 0.00), Array(4, 157.00, 157.00, 湖南省, 2020-02-20 23:58:34, 2020-02-20 23:58:44, 0.00), Array(5, 64.80, 0.00, 江苏省, 2020-02-20 23:57:04, 2020-02-20 23:57:11, 64.80), Array(6, 327.70, 148.90, 浙江省, 2020-02-20 23:56:39, 2020-02-20 23:56:53, 178.80), Array(7, 357.00, 357.00, 天津, 2020-02-20 23:56:36, 2020-02-20 23:56:40, 0.00), Array(8, 53.00, 53.00, 浙江省, 2020-02-20 23:56:12, 2020-02-20 23:56:16, 0.00), Array(9, 43.00, 0.00, 湖南省, 2020-02-20 23:54:53, 2020-02-20 23:55:04, 43.00), Array(10, 421.00, 421.00, 北京, 2020-02-20 23:54:28, 2020-02-20 23:54:33, 0.00), Array(11, 267.90...
    scala> case class Order(orderNo:Int,deal:Double,pay:Double,province:String,orderTime:String,payTime:String,refund:Double)

    defined class Order
    
    scala> val rdd2 = rdd1.map(x=>Order(x(0).toInt,x(1).toDouble,x(2).toDouble,x(3),x(4),x(5),x(6).toDouble))

    rdd2: org.apache.spark.rdd.RDD[Order] = MapPartitionsRDD[3] at map at :30
    scala> rdd2.collect

    res2: Array[Order] = Array(Order(1,178.8,0.0,上海,2020-02-21 00:00:00,,0.0), Order(2,21.0,21.0,内蒙古自治区,2020-02-20 23:59:54,2020-02-21 00:00:02,0.0), Order(3,37.0,0.0,安徽省,2020-02-20 23:59:35,,0.0), Order(4,157.0,157.0,湖南省,2020-02-20 23:58:34,2020-02-20 23:58:44,0.0), Order(5,64.8,0.0,江苏省,2020-02-20 23:57:04,2020-02-20 23:57:11,64.8), Order(6,327.7,148.9,浙江省,2020-02-20 23:56:39,2020-02-20 23:56:53,178.8), Order(7,357.0,357.0,天津,2020-02-20 23:56:36,2020-02-20 23:56:40,0.0), Order(8,53.0,53.0,浙江省,2020-02-20 23:56:12,2020-02-20 23:56:16,0.0), Order(9,43.0,0.0,湖南省,2020-02-20 23:54:53,2020-02-20 23:55:04,43.0), Order(10,421.0,421.0,北京,2020-02-20 23:54:28,2020-02-20 23:54:33,0.0), Order(11,267.9,0.0,北京,2020-02-20 23:54:24,2020-02-20 23:54:31,267.9), Order(12,37.0,37.0,四川省,2020-02-20 23:54:24,2020-...
    scala> val df = rdd2.toDF

    2022-09-14 23:19:17,272 WARN conf.HiveConf: HiveConf of name hive.server2.thrift.client.user does not exist
    2022-09-14 23:19:17,272 WARN conf.HiveConf: HiveConf of name hive.server2.thrift.client.password does not exist
    2022-09-14 23:19:18,509 WARN conf.HiveConf: HiveConf of name hive.server2.thrift.client.user does not exist
    2022-09-14 23:19:18,509 WARN conf.HiveConf: HiveConf of name hive.server2.thrift.client.password does not exist
    2022-09-14 23:19:20,805 WARN metastore.ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    2022-09-14 23:19:20,947 WARN conf.HiveConf: HiveConf of name hive.server2.thrift.client.user does not exist
    2022-09-14 23:19:20,948 WARN conf.HiveConf: HiveConf of name hive.server2.thrift.client.password does not exist
    df: org.apache.spark.sql.DataFrame = [orderNo: int, deal: double ... 5 more fields]
    
    scala> df.show

    +-------+-----+-----+--------+-------------------+-------------------+------+
    |orderNo| deal|  pay|province|          orderTime|            payTime|refund|
    +-------+-----+-----+--------+-------------------+-------------------+------+
    |      1|178.8|  0.0|      上海|2020-02-21 00:00:00|                   |   0.0|
    |      2| 21.0| 21.0|  内蒙古自治区|2020-02-20 23:59:54|2020-02-21 00:00:02|   0.0|
    |      3| 37.0|  0.0|     安徽省|2020-02-20 23:59:35|                   |   0.0|
    |      4|157.0|157.0|     湖南省|2020-02-20 23:58:34|2020-02-20 23:58:44|   0.0|
    |      5| 64.8|  0.0|     江苏省|2020-02-20 23:57:04|2020-02-20 23:57:11|  64.8|
    |      6|327.7|148.9|     浙江省|2020-02-20 23:56:39|2020-02-20 23:56:53| 178.8|
    |      7|357.0|357.0|      天津|2020-02-20 23:56:36|2020-02-20 23:56:40|   0.0|
    |      8| 53.0| 53.0|     浙江省|2020-02-20 23:56:12|2020-02-20 23:56:16|   0.0|
    |      9| 43.0|  0.0|     湖南省|2020-02-20 23:54:53|2020-02-20 23:55:04|  43.0|
    |     10|421.0|421.0|      北京|2020-02-20 23:54:28|2020-02-20 23:54:33|   0.0|
    |     11|267.9|  0.0|      北京|2020-02-20 23:54:24|2020-02-20 23:54:31| 267.9|
    |     12| 37.0| 37.0|     四川省|2020-02-20 23:54:24|2020-02-20 23:54:31|   0.0|
    |     13| 53.0| 53.0|      上海|2020-02-20 23:53:50|2020-02-20 23:57:09|   0.0|
    |     14| 34.9|  0.0|      天津|2020-02-20 23:53:44|                   |   0.0|
    |     15| 96.8|  0.0|     贵州省|2020-02-20 23:51:37|                   |   0.0|
    |     16| 80.8| 80.8|      天津|2020-02-20 23:51:29|2020-02-20 23:51:35|   0.0|
    |     17| 37.0| 37.0|     辽宁省|2020-02-20 23:51:22|2020-02-20 23:51:30|   0.0|
    |     18|119.0|119.0|      上海|2020-02-20 23:50:55|2020-02-20 23:51:12|   0.0|
    |     19| 37.0| 37.0|     浙江省|2020-02-20 23:50:48|2020-02-20 23:51:00|   0.0|
    |     20|238.0|238.0|      上海|2020-02-20 23:50:08|2020-02-20 23:50:17|   0.0|
    +-------+-----+-----+--------+-------------------+-------------------+------+
    only showing top 20 rows
    
    scala> df.createOrReplaceTempView("v_order")

    scala> spark.sql("select * from v_order ").show

    +-------+-----+-----+--------+-------------------+-------------------+------+
    |orderNo| deal|  pay|province|          orderTime|            payTime|refund|
    +-------+-----+-----+--------+-------------------+-------------------+------+
    |      1|178.8|  0.0|      上海|2020-02-21 00:00:00|                   |   0.0|
    |      2| 21.0| 21.0|  内蒙古自治区|2020-02-20 23:59:54|2020-02-21 00:00:02|   0.0|
    |      3| 37.0|  0.0|     安徽省|2020-02-20 23:59:35|                   |   0.0|
    |      4|157.0|157.0|     湖南省|2020-02-20 23:58:34|2020-02-20 23:58:44|   0.0|
    |      5| 64.8|  0.0|     江苏省|2020-02-20 23:57:04|2020-02-20 23:57:11|  64.8|
    |      6|327.7|148.9|     浙江省|2020-02-20 23:56:39|2020-02-20 23:56:53| 178.8|
    |      7|357.0|357.0|      天津|2020-02-20 23:56:36|2020-02-20 23:56:40|   0.0|
    |      8| 53.0| 53.0|     浙江省|2020-02-20 23:56:12|2020-02-20 23:56:16|   0.0|
    |      9| 43.0|  0.0|     湖南省|2020-02-20 23:54:53|2020-02-20 23:55:04|  43.0|
    |     10|421.0|421.0|      北京|2020-02-20 23:54:28|2020-02-20 23:54:33|   0.0|
    |     11|267.9|  0.0|      北京|2020-02-20 23:54:24|2020-02-20 23:54:31| 267.9|
    |     12| 37.0| 37.0|     四川省|2020-02-20 23:54:24|2020-02-20 23:54:31|   0.0|
    |     13| 53.0| 53.0|      上海|2020-02-20 23:53:50|2020-02-20 23:57:09|   0.0|
    |     14| 34.9|  0.0|      天津|2020-02-20 23:53:44|                   |   0.0|
    |     15| 96.8|  0.0|     贵州省|2020-02-20 23:51:37|                   |   0.0|
    |     16| 80.8| 80.8|      天津|2020-02-20 23:51:29|2020-02-20 23:51:35|   0.0|
    |     17| 37.0| 37.0|     辽宁省|2020-02-20 23:51:22|2020-02-20 23:51:30|   0.0|
    |     18|119.0|119.0|      上海|2020-02-20 23:50:55|2020-02-20 23:51:12|   0.0|
    |     19| 37.0| 37.0|     浙江省|2020-02-20 23:50:48|2020-02-20 23:51:00|   0.0|
    |     20|238.0|238.0|      上海|2020-02-20 23:50:08|2020-02-20 23:50:17|   0.0|
    +-------+-----+-----+--------+-------------------+-------------------+------+
    only showing top 20 rows
    
    scala> spark.sql("select province,sum(deal) val from v_order group by province ").show

    +--------+------------------+
    |province|        val       |
    +--------+------------------+
    |   西藏自治区|            489.72|
    |     辽宁省|107355.93000000007|
    |     浙江省|         203126.96|
    | 广西壮族自治区| 35140.09999999999|
    |     海南省|          16828.18|
    |     河北省|106561.56000000004|
    |     福建省|37075.529999999984|
    |     湖南省|102929.22000000007|
    | 宁夏回族自治区|           4804.92|
    |      天津|124564.24000000003|
    |     陕西省|          59450.93|
    |     山西省|46568.799999999996|
    |  内蒙古自治区|           36827.0|
    |     甘肃省|          14294.76|
    |     贵州省|          32274.16|
    |     湖北省|            8581.7|
    |     四川省|188948.12000000005|
    |    黑龙江省| 35058.28999999999|
    |     广东省|227855.27999999968|
    |      重庆|108975.65000000008|
    +--------+------------------+
    only showing top 20 rows
    
    scala> val df1 = spark.sql("select province,sum(deal) val from v_order group by province ")

    df1: org.apache.spark.sql.DataFrame = [province: string, sum(deal): double]
    
    scala> df1.show

    +--------+------------------+
    |province|        val       |
    +--------+------------------+
    |   西藏自治区|            489.72|
    |     辽宁省|107355.93000000007|
    |     浙江省|         203126.96|
    | 广西壮族自治区| 35140.09999999999|
    |     海南省|          16828.18|
    |     河北省|106561.56000000004|
    |     福建省|37075.529999999984|
    |     湖南省|102929.22000000007|
    | 宁夏回族自治区|           4804.92|
    |      天津|124564.24000000003|
    |     陕西省|          59450.93|
    |     山西省|46568.799999999996|
    |  内蒙古自治区|           36827.0|
    |     甘肃省|          14294.76|
    |     贵州省|          32274.16|
    |     湖北省|            8581.7|
    |     四川省|188948.12000000005|
    |    黑龙江省| 35058.28999999999|
    |     广东省|227855.27999999968|
    |      重庆|108975.65000000008|
    +--------+------------------+
    only showing top 20 rows
    
    ###读取MySQL数据
    scala>  spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/test", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "order_stat", "user" -> "hive", "password" -> "123456")).load().show()

    +---+------+--------+--------+
    | id|rowkey|province|     val|
    +---+------+--------+--------+
    |  1|stat01|      GD|32003.98|
    +---+------+--------+--------+
    ###写入MySQL
    scala> df1.write.format("jdbc").mode("append").options(Map("url" -> "jdbc:mysql://localhost:3306/test?characterEncoding=utf8", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "order_stat2", "user" -> "hive", "password" -> "123456")).save()

    ###读取MySQL数据
    scala> spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://localhost:3306/test", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "order_stat2", "user" -> "hive", "password" -> "123456")).load().show()

    +--------+------------------+
    |province|               val|
    +--------+------------------+
    |   西藏自治区|            489.72|
    |     辽宁省|107355.93000000007|
    |     浙江省|         203126.96|
    | 广西壮族自治区| 35140.09999999999|
    |     海南省|          16828.18|
    |     河北省|106561.56000000004|
    |     福建省|37075.529999999984|
    |     湖南省|102929.22000000007|
    | 宁夏回族自治区|           4804.92|
    |      天津|124564.24000000003|
    |     陕西省|          59450.93|
    |     山西省|46568.799999999996|
    |  内蒙古自治区|           36827.0|
    |     贵州省|          32274.16|
    |     甘肃省|          14294.76|
    |     四川省|188948.12000000005|
    |     湖北省|            8581.7|
    |     广东省|227855.27999999968|
    |    黑龙江省| 35058.28999999999|
    |      重庆|108975.65000000008|
    +--------+------------------+
    only showing top 20 rows
    

    2. MySQL操作

    [hd@master ~]$ mysql -u hive -p

    Enter password:
    Welcome to the MariaDB monitor.  Commands end with ; or \g.
    Your MariaDB connection id is 48
    Server version: 10.4.18-MariaDB MariaDB Server
    
    Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
    
    Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
    
    MariaDB [(none)]> show databases;
    +--------------------+
    | Database           |
    +--------------------+
    | hive               |
    | information_schema |
    | mysql              |
    | performance_schema |
    | test               |
    +--------------------+
    5 rows in set (0.003 sec)
    
    MariaDB [(none)]> use test
    Database changed
    MariaDB [test]> show tables;
    Empty set (0.001 sec)
    ###设计一个通用的表,用来装不用统计的数据
    MariaDB [test]> CREATE TABLE `order_stat` (`id` int NOT NULL AUTO_INCREMENT,`rowkey` varchar(20) DEFAULT NULL,  `province` varchar(25) DEFAULT NULL,  `val` double DEFAULT NULL,  KEY `id` (`id`)) ;

    Query OK, 0 rows affected (0.004 sec)
    
    MariaDB [test]> select * from order_stat;

    Empty set (0.001 sec)
    MariaDB [test]> insert into order_stat(rowkey,province,val) values('stat01','GD',32003.98);

    Query OK, 1 row affected (0.001 sec)
    MariaDB [test]>
    MariaDB [test]>
    1. MariaDB [test]> CREATE TABLE `order_stat2` (
    2.   ->   `province` VARCHAR(25) DEFAULT NULL,
    3.   ->   `val` DOUBLE DEFAULT NULL
    4.   -> )
    5.   -> ;

    Query OK, 0 rows affected (0.003 sec)
    
    MariaDB [test]>
    MariaDB [test]> select * from order_stat2;

    Empty set (0.000 sec)
    MariaDB [test]>
    MariaDB [(none)]> select * from  test.order_stat2;

    +--------------------------+--------------------+
    | province                 | val                |
    +--------------------------+--------------------+
    | 西藏自治区               |             489.72 |
    | 辽宁省                   | 107355.93000000007 |
    | 浙江省                   |          203126.96 |
    | 广西壮族自治区           |  35140.09999999999 |
    | 海南省                   |           16828.18 |
    | 河北省                   | 106561.56000000004 |
    | 福建省                   | 37075.529999999984 |
    | 湖南省                   | 102929.22000000007 |
    | 宁夏回族自治区           |            4804.92 |
    | 天津                     | 124564.24000000003 |
    | 陕西省                   |           59450.93 |
    | 山西省                   | 46568.799999999996 |
    | 内蒙古自治区             |              36827 |
    | 贵州省                   |           32274.16 |
    | 甘肃省                   |           14294.76 |
    | 四川省                   | 188948.12000000005 |
    | 湖北省                   |             8581.7 |
    | 广东省                   | 227855.27999999968 |
    | 黑龙江省                 |  35058.28999999999 |
    | 重庆                     | 108975.65000000008 |
    | 新疆维吾尔自治区         |            10112.9 |
    | 山东省                   |  175046.1300000001 |
    | 河南省                   |  90619.72000000003 |
    | 吉林省                   |           42040.92 |
    | 青海省                   |             2396.2 |
    | 上海                     |  544907.6299999994 |
    | 江西省                   | 36791.649999999994 |
    | 安徽省                   |           61378.67 |
    | 北京                     | 231055.48999999993 |
    | 江苏省                   | 227930.92999999985 |
    | 云南省                   |  75769.32000000002 |
    +--------------------------+--------------------+
    31 rows in set (0.000 sec)
    

    3. MySQL中文乱码

    使用MySQL的root用户对数据库进行修改以下设置

    ##修改整库的字符集
    ALTER DATABASE <database_name> CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci;   

    ##修改表的字符集
    ALTER TABLE <table_name> CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; 

    MariaDB [(none)]> ALTER DATABASE test  CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ;

    Query OK, 1 row affected (0.002 sec)
    
    MariaDB [(none)]>
    MariaDB [(none)]> ALTER TABLE test.order_stat2  CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

    Query OK, 0 rows affected (0.010 sec)
    Records: 0  Duplicates: 0  Warnings: 0
  • 相关阅读:
    tcp字节传输(java)-自定义包头和数据识别
    Catheon Gaming任命Activision Blizzard前亚太区负责人Mark Aubrey担任首席执行官
    【随想】每日两题Day.10(实则一题)
    Open Inventor 10.12 Crack
    java毕业生设计住房公积金筹集子系统的网站系统计算机源码+系统+mysql+调试部署+lw
    十、pygame小游戏开发
    研发过程中的文档管理与工具
    Ensp用windows回环口连接cloud配置
    零售抄表系统是什么?
    网络原理之封装和分用,网络编程套接字
  • 原文地址:https://blog.csdn.net/m0_56073435/article/details/130635611