• windows + anaconda 安装PySpark3.0.1


    1、背景

            Spark作为分布式内存计算框架,可以广泛应用在数据处理、分析等应用场景。因此,希望借助Spark高性能的处理项目中的数据,搭建此开发环境,深入了解Spark的处理能力与实现机制。

    2、开发环境

            在windows10上使用Anaconda作为Python运行与开发环境,搭建PySpark3.0.1的Python开发环境,并执行PI.py和WordCount.py示例程序。

    3、下载Spark和Hadoop安装包

            1、前往Spark官网,下载spark-3.0.1-bin-hadoop2.7.tgz安装包

            2、前往Hadoop官网,下载与Spark对应的hadoop-2.7.1.tar.gz安装包

            3、在网上下载winutils.exe工具,windows下使用Hadoop接口需要依赖winutils.exe

    4、PySpark环境搭建

            1、安装JDK8

            2、解压spark-3.0.1-bin-hadoop2.7.tgz到安装目录,例如D:\install

            3、解压hadoop-2.7.1.tar.gz到安装目录,例如D:\install

            4、将winutils.exe拷贝到hadoop安装目录下的bin目录中 

            5、在windows环境变量中,设置环境变量:

    变量项变量值
    HADOOP_HOMED:\install\hadoop-2.7.1
    CLASSPATH%HADOOP_HOME%\bin\winutils.exe
    SPARK_HOMED:\install\spark-3.0.1-bin-hadoop2.7

            6、打开Anaconda,创建新的环境env_spark,指定Python版本为3.8

            7、在env_spark环境中打开CMD窗口,执行pip install pyspark==3.0.1,指定安装PySpark3.0.1版本。

    5、执行PySpark示例程序

            打开Anaconda,打开Spyder(Python代码IDE),分别执行PI.py和WordCount.py示例。

    PI.py代码如下:

    1. import sys
    2. from random import random
    3. from operator import add
    4. from pyspark.sql import SparkSession
    5. if __name__ == "__main__":
    6. """
    7. Usage: pi [partitions]
    8. """
    9. spark = SparkSession\
    10. .builder\
    11. .appName("PythonPi")\
    12. .getOrCreate()
    13. partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    14. n = 100000 * partitions
    15. def f(_: int) -> float:
    16. x = random() * 2 - 1
    17. y = random() * 2 - 1
    18. return 1 if x ** 2 + y ** 2 <= 1 else 0
    19. count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    20. print("Pi is roughly %f" % (4.0 * count / n))
    21. spark.stop()

            Spyder中执行结果如下:

            WordCount.py代码如下:

    1. from pyspark.sql import SparkSession
    2. from pyspark import SparkContext
    3. sparksession = SparkSession.builder.appName("SimpleApp").getOrCreate()
    4. sc = sparksession.sparkContext
    5. lines = sc.textFile('D:\install\hadoop-2.7.1\README.txt')
    6. rdd = lines.flatMap(lambda line : line.split(' ')).map(lambda word : (word, 1)).reduceByKey(lambda agg, cur: agg+cur)
    7. print(rdd.collect())
    8. sc.stop()

             WordCount.py执行结果如下:

    6、注意事项

            1、安装高版本的PySpark3.3.1后,执行WordCount.py 时报异常org.apache.spark.SparkException: Python worker failed to connect back。因此使用PySpark3.0.1版本。 

            

  • 相关阅读:
    微信开发者工具开发小程序代码自动热加载/重载/部署
    算法---一和零(Kotlin)
    DMA原理
    【数据结构】树与二叉树
    Pytorch中关于forward函数的理解与用法
    Python 国家地震台网 地震数据集完整分析、pyecharts、plotly,分析强震次数、震级分布、震级震源关系、发生位置、发生时段、最大震级、平均震级
    数据结构——二叉树提升
    【PyQt】多窗口使用信号-槽传递数据
    【block的基本使用 Objective-C语言】
    广度优先搜索
  • 原文地址:https://blog.csdn.net/gaixm/article/details/128023884