PySpark集群完全分布式搭建
本文的目的是使读者对spark的安装流程有一个清晰的认识,并且能根据本文的内容搭建一个属于自己的完全分布式Spark集群,并在此基础上增加pyspark的分布式环境。
阅读本文前,有几个点需要注意:
-
本文假设读者有Hadoop的搭建基础,并且成功搭建了完全分布式的Hadoop集群,因此本文不会对该方面的知识进行铺垫。 -
本文假设读者有在Linux上安装anaconda或者minconda的基础,并且成功的在每一个节点上的相同路径下配置好了相应的环境。(该过程可以每个节点一一配置,也在可以配置好某个节点后,把配置好的文件打包发送到所有节点再解压,因为略占篇幅、且不为本文重点是故省略)
寻找合适的Spark安装包
spark的官网为:https://spark.apache.org/
进入官网后可进入下载页面:https://spark.apache.org/downloads.html
下载页面的核心部分如下所示:

下载Spark安装包时需要特别注意发行版的兼容性问题,特别是Spark版本与Hadoop版本的兼容性、以及Spark版本与Scala版本的兼容性。
虽然当前最新的Spark版本已经更新到3.3.0了,最新版的hadoop也更新到3.3.4,不过因为笔者的Hadoop版本为3.2.2,所以使用的Spark安装包为spark-3.1.1-bin-hadoop3.2.tgz,对应的Scala版本为2.12.12。
笔者更鼓励读者使用最新的安装包进行尝试,但是如果已经安装好了某一版本的hadoop,那更建议去官网的历史发行版页找到对应版本的Spark安装包进行下载。
解压Spark安装包与Scala安装包
假设读者已经下载好了Spark和Scala的安装包,并且上传到了主节点的某一文件路径。
笔者使用的主节点为:westgisB052
存放Spark安装包的路径为:~/pkg/spark-3.1.1-bin-hadoop3.2.tgz
存放Scala安装包的路径为:~/pkg/scala-2.12.12.tgz
Spark解压后存放的目标路径为:~/bigdata/
Scala解压后存放的目标路径为:~/program/
所以在配置环境变量时,SPARK_HOME =/home/G22/bigdata/spark ,SCALA_HOME=/home/G22/program/scala
注1:配置环境变量时,指定变量的取值必须为绝对路径,~/bigdata/spark 指向的绝对路径就是/home/G22/bigdata/spark ,/home/G22 为笔者的用户根目录,简写为~ ,~/program/scala 同理。
注2:解压后的压缩包会带版本号的后缀,不过笔者觉得不太美观,所以还会进行重命名操作。
接下来,根据上述准备好的路径,我们可以执行:
tar -zxvf ~/pkg/spark-3.1.1-bin-hadoop3.2.tgz -C ~/bigdata/
tar -zxvf ~/pkg/scala-2.12.12.tgz -C ~/program/
mv ~/bigdata/spark* ~/bigdata/spark
mv ~/program/scala* ~/program/scala
配置环境变量
-
配置scala环境下的spark只需要在~/.bashrc 文件中添加如下六句:
export SCALA_HOME=/home/G22/program/scala
export PATH=$PATH:$SCALA_HOME/bin
export SPARK_HOME=/home/G22/bigdata/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
需要注意的是,读者需要根据自己spark的存放路径,更改SPARK_HOME 的取值。此外,即使读者只想配置python环境下的Spark集群,也要配置SPARK_HOME ,并将其的bin 目录添加进$PATH 变量,即添加最后3句。 -
配置pyspark环境时,还需要在~/.bashrc 中添加以下配置:
export PYSPARK_PYTHON=$MINIC_HOME/bin/python
export PYSPARK_DRIVER_PYTHON=$MINIC_HOME/bin/python
export LD_LIBRARY_PATH=$MINIC_HOME/lib/:$LD_LIBRARY_PATH
export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH
第一行和第二行的意义是指定pyspark启动和执行任务时使用的python解释器; 第三行和第四行的意义是指定pyspark运行时,加载模块库的路径。
注:此处的$MINIC_HOME 为笔者minconda的安装路径,具体为~/minconda3 ,对应的绝对路径为/home/G22/minconda3 ,读者需要根据自己minconda或者anoconda的安装路径进行修改。
-
检验环境变量是否配置成功: source ~/.bashrc
run-example SparkPi
如果执行成功,会在屏幕输出Spark的运行日志信息,以及运行结果:Pi is roughly 3.146675733378667 ,该运行结果夹杂在运行日志信息中间。
修改配置文件
-
修改$SPARK_HOME/conf/spark-env.sh 解压Spark后,其conf 目录下本身并不存在spark-env.sh 文件,只有spark-env.sh.template 文件,因此我们首先需要基于后者生成前者,命令如下: cd $SPARK_HOME/conf/
cp ./spark-env.sh.template ./spark-env.sh
之后编辑新生成的文件spark-env.sh ,添加如下内容:
export JAVA_HOME=/home/G22/bigdata/java
export SCALA_HOME=/home/G22/bigdata/scala
export HADOOP_HOME=/home/G22/bigdata/hadoop
export HADOOP_CONF_DIR=/home/G22/bigdata/hadoop/etc/hadoop
export YARN_CONF_DIR==/home/G22/bigdata/hadoop/etc/hadoop
export SPARK_MASTER_HOST=westgisB052
export SPARK_MASTER_PORT=7077
export SPARK_PID_DIR=/home/G22/bigdata/spark/data/pid
export SPARK_DIST_CLASSPATH=$(/home/G22/bigdata/hadoop/bin/hadoop classpath)
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://westgisB052:9000/directory
-Dspark.history.retainedApplications=30"
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=8G
export SPARK_EXECUTOR_CORES=1
export SPARK_EXECUTOR_MEMORY=2G
export SPARK_DRIVER_MEMORY=1G
可以看到添加的内容分为3个部分:第一部分是纵向配置,用于指定Spark的底层依赖,因为Spark依赖编程语言Java和Scala,所以需要设置JAVA_HOME 和SCALA_HOME ,因为我们选择的Spark发行版是基于兼容版本的Hadoop构造的,所以也要指定Hadoop相关的配置,如HADOOP_HOME 、HADOOP_CONF_DIR 、YARN_CONF_DIR 。 第二部分是Spark的主要配置,用于指定Master节点的IP或者主机名、Master和其它节点进行交互的端口、Spark守护进程pid的存放路径、Spark的依赖包的路径、Spark的历史服务器设置等。(以上5个配置分别按顺序与配置文件中part2部分的内容对应) 第三部分是Spark的资源配置,Spark是主从式架构,组件的角色包括主节点Master,从节点Worker,资源管理器ClusterManager(Spark有多种运行模式,在YARN模式下运行时ClusterManager是YARN;在Standalone模式下运行时,ClusterManager是主节点Master担任),而一个Worker节点又可以包含一个或多个Executor,每个Executor是一个进程,专门用于执行具体的计算任务。因此在进行集群配置时,可以按不同的粒度对Worker和Executor进行资源配置,在笔者上述的配置中:给每个Worker节点分配了4个CPU的核心、8GB的内存、给每个Executor分配了1个核心和2GB内存。(至于Part3的最后一个配置:Spark会在ClusterManager节点上启动一个Drive进程作为Spark应用程序的入口,此外Driver还包含SparkContext实例,负责向集群申请资源、向master注册信息,作业调度,作业解析、生成Stage并调度Task到Executor上等功能的实现,而Driver进程完成这些功能是需要内存的,因此SPARK_DRIVER_MEMORY 参数指定的就是Driver进程可使用的内存资源)
上述的配置文件建议读者认真仔细地阅读,理解每一个参数的含义,并根据自己的配置修改每一个环境变量的取值。
-
修改$SPARK_HOME/conf/workers 和spark-env.sh 一样,其在conf 目录下本身并不存在,但存在workers.template 文件,因此我们首先需要基于后者生成前者,命令如下: cd $SPARK_HOME/conf/
cp workers.template workers
workers 配置文件的配置很简单,只需要把workers 里的内容全部替换成从节点的主机名或者IP即可,笔者的内容为: westgisB053
westgisB054
westgisB055
westgisB056
修改完上述两个配置文件后,Spark的配置文件就已经全部配置完毕了,此时可以将配置好的Spark文件打包,分发到从节点后解压,更新环境变量,则Spark就配置成功了。
打包分发
该过程同配置hadoop时,将配置好的Hadoop打包分发的过程类似,可以使用for循环来进行批量分发和解压、修改环境变量,执行命令如下:
cd $SPARK_HOME
cd ..
tar -zcf spark.tar.gz ./spark
for i in westgisB0{53..57}
do
scp ./spark.tar.gz $i:~;
scp ~/.bashrc $i:~;
done
for i in westgisB0{53..57}
do
ssh $i "tar -zxvf ~/spark.tar.gz -C ~/bigdata/"
ssh $i "rm ~/spark.tar.gz"
ssh $i "source ~/.bashrc"
done
验证Spark集群是否搭建成功
-
启动Spark集群 start-dfs.sh
start-master.sh
start-workers.sh
-
查看集群中是否存在Spark的Java守护进程 for i in westgisB0{52..57}
do
ssh $i "hostname;jps;echo"
done
如果结果同笔者类似,每个节点都成功的启动了安排的守护进程,则配置成功: westgisB052
15477 SecondaryNameNode
15210 NameNode
16154 Jps
15899 Master
westgisB053
11907 Worker
11593 DataNode
12079 Jps
westgisB054
1448 Worker
1145 DataNode
1625 Jps
westgisB055
24212 Jps
23720 DataNode
24011 Worker
westgisB056
30112 Jps
29879 Worker
29544 DataNode
westgisB057
10079 Jps
解释:笔者的主节点为westgisB052,应该存在Master进程;客户端为westgisB057,理论上不存在守护进程;从节点为westgisB053~westgisB056,应该存在Worker进程。 -
查看Spark的web界面 在windows主机的浏览器中输入网址:主节点IP:8080 ,若跳转页面如下,说明Spark集群配置成功且Web界面可用。 
提交一个简单的Spark与HDFS集成的应用程序
-
上传数据文件到HDFS: 假设我们的应用程序从HDFS的路径/user/G22/data/test 下读取数据
hdfs dfs -mkdir -p /user/G22/data/test
cd $SPARK_HOME
hdfs dfs -put ./README.md /user/G22/data/test
hdfs dfs -ls /user/G22/data/test
-
以Standolne模式启动spark-shell: spark支持交互式数据分析以及对大型代码项目进行编译和运行,此处使用的spark-shell是Spark为使用者提供的交互式解释器,每输入一条指令,spark-shell就会翻译和执行。在启动spark-shell时,可以指定使用的集群模式(如local、standolne、yarn等)、还可以指定为spark-shell分配的硬件资源等设置。 以standolne启动spark-shell的命令如下 spark-shell --master spark://westgisB052:7077
执行效果如下: [G22@westgisB052 ~]$ spark-shell --master spark://westgisB052:7077
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/G22/bigdata/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/G22/bigdata/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2022-10-06 00:15:24,006 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://westgisB052:4040
Spark context available as 'sc' (master = spark://westgisB052:7077, app id = app-20221006001533-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_251)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
上述内容中的最后一行有scala> 的提示符,代表我们当前输入的命令不再由linux上的bash 解释器进行翻译和执行,而是由spark-shell的scala 解释器进行翻译和执行,此时我们便可输入scala语句进行交换。 -
输入简单的scala指令进行交互式分析:
val logFile = "/user/G22/data/test/"
val logData = sc.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
若执行上述语句后,得到的最终结果如下,这说明Spark能够成功地运行scala应用程序: Lines with a: 64, Lines with b: 32
此时,在spark-shell界面输入:q ,敲击回车,则可退出spark-shell。
PySpark分布式应用程序测试
上一个步骤中,我们已经成功的执行了scala版本的Spark应用程序,我们现在将上面的程序修改为python版的,再启动PySpark进行运行。
-
启动PySpark pyspark --master spark://westgisB052:7077
执行效果如下,同spark-shell类似,pyspark也启动了一个交互式终端,不过与spark-shell不同的地方是pyspark使用python进行交互。 [G22@westgisB052 ~]$ pyspark --master spark://westgisB052:7077
Python 3.8.13 (default, Mar 28 2022, 11:38:47)
[GCC 7.5.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/G22/bigdata/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/G22/bigdata/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2022-10-06 00:38:58,300 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Python version 3.8.13 (default, Mar 28 2022 11:38:47)
Spark context Web UI available at http://westgisB052:4040
Spark context available as 'sc' (master = spark://westgisB052:7077, app id = app-20221006003900-0001).
SparkSession available as 'spark'.
>>>
-
输入python指令进行交互
logFile = "/user/G22/data/test/"
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda line: 'a' in line).count()
numBs = logData.filter(lambda line: 'b' in line).count()
print('Lines with a: %s, Lines with b: %s' % (numAs, numBs))
若执行上述语句后,得到的最终结果如scala程序运行的结果一致,则说明PySpark完全分布式环境配置成功。
|