安装笔Spark Local 模式搭建文档
在本地使用单机多线程模拟Spark集群中的各个角色
安装包下载
目前Spark最新稳定版本:课程中使用目前Spark最新稳定版本:3.1.x系列
https://spark.apache.org/docs/3.1.2/index.html
★注意1:
Spark3.0+基于Scala2.12
Downloads | Apache Spark
★注意2:
目前企业中使用较多的Spark版本还是Spark2.x,如Spark2.2.0、Spark2.4.5都使用较多,但未来Spark3.X肯定是主流,毕竟官方高版本是对低版本的兼容以及提升
Spark Release 3.0.0 | Apache Spark
-
-
- 将安装包上传并解压
说明: 只需要上传至node1即可, 以下操作都是在node1执行的
cd /export/software rz 上传 解压: cd 更名: (两种方式二选一即可, 推荐软连接方案) cd /export/server 方式一: 软连接方案: ln -s spark-3.1.2-bin-hadoop3.2 spark 方式二: 直接重命名: mv spark-3.1.2-bin-hadoop3.2 spark |
目录结构说明:
-
-
- 测试
Spark的local模式, 开箱即用, 直接启动bin目录下的spark-shell脚本
cd /export/server/spark/bin ./spark-shell |
说明:
sc:SparkContext实例对象:
spark:SparkSession实例对象
40./40:Web监控页面端口号
●Spark-shell说明:
1.直接使用./spark-shell
表示使用local 模式启动,在本机启动一个SparkSubmit进程
2.还可指定参数 --master,如:
spark-shell --master local[N] 表示在本地模拟N个线程来运行当前任务
spark-shell --master local[*] 表示使用当前机器上所有可用的资源
3.不携带参数默认就是
spark-shell --master local[*]
4.后续还可以使用--master指定集群地址,表示把任务提交到集群上运行,如
./spark-shell --master spark://node01:7077,node02:7077
5.退出spark-shell
使用 :quit
-
- PySpark环境安装
同学们可能有疑问, 我们不是学的Spark框架吗? 怎么会安装一个叫做PySpark呢?
这里简单说明一下:
PySpark: 是Python的库, 由Spark官方提供. 专供Python语言使用. 类似Pandas一样,是一个库
Spark: 是一个独立的框架, 包含PySpark的全部功能, 除此之外, Spark框架还包含了对R语言\ Java语言\ Scala语言的支持. 功能更全.? 可以认为是通用Spark。
功能 | PySpark | Spark | 底层语言 | Scala(JVM) | Scala(JVM) | 上层语言支持 | Python | Python\Java\Scala\R | 集群化\分布式运行 | 支持 | 支持 | 定位 | Python库 (客户端) | 标准框架 (客户端和服务端) | 是否可以Daemon运行 | No | Yes | 使用场景 | 生产环境集群化运行 | 生产环境集群化运行 |
若安装PySpark需要首先具备Python环境,这里使用Anaconda环境,安装过程如下:
-
-
- 下载Anaconda环境包
安装版本:Anaconda | Individual Edition
Python3.8.8版本:Anaconda3-2021.05-Linux-x86_64.sh
-
-
- 安装Anaconda环境
此环境三台节点都是需要安装的, 以下演示在node1安装, 其余两台也是需要安装的
cd /export/software rz 上传Anaconda脚本环境 执行脚本: bash Anaconda3-2021.05-Linux-x86_64.sh 不断输入空格, 直至出现以下解压, 然后输入yes 此时, anaconda需要下载相关的依赖包, 时间比较长, 耐心等待即可.... 配置anaconda的环境变量: vim /etc/profile ##增加如下配置 export ANACONDA_HOME=/root/anaconda3/bin export PATH=$PATH:$ANACONDA_HOME/bin 重新加载环境变量: source /etc/profile 修改bashrc文件 sudo vim ~/.bashrc 添加如下内容: export PATH=~/anaconda3/bin:$PATH |
说明:
profile 其实看名字就能了解大概了, profile 是某个用户唯一的用来设置环境变量的地方, 因为用户可以有多个 shell 比如 bash, sh, zsh 之类的, 但像环境变量这种其实只需要在统一的一个地方初始化就可以了, 而这就是 profile. bashrc bashrc 也是看名字就知道, 是专门用来给 bash 做初始化的比如用来初始化 bash 的设置, bash 的代码补全, bash 的别名, bash 的颜色. 以此类推也就还会有 shrc, zshrc 这样的文件存在了, 只是 bash 太常用了而已. |
-
-
- 启动anaconda并测试
注意: 请将当前连接node1的节点窗口关闭,然后重新打开,否则无法识别
说明: 发现在重新进入客户端后, 发现前面多个 base? , 表示进入到anaconda的base默认环境中
说明: 如果不想看到这个说明: 可以尝试退出base环境 修改: sudo vim ~/.bashrc 文件即可 ?在此文件的最后面添加一行内容: ?????? conda deactivate |
-
-
- Anaconda相关组件介绍[了解]
Anaconda(水蟒):是一个科学计算软件发行版,集成了大量常用扩展包的环境,包含了 conda、Python 等 180 多个科学计算包及其依赖项,并且支持所有操作系统平台。下载地址:https://www.continuum.io/downloads
- 安装包:pip install xxx,conda install xxx
- 卸载包:pip uninstall xxx,conda uninstall xxx
- 升级包:pip install upgrade xxx,conda update xxx
Jupyter Notebook:启动命令
功能如下:
- Anaconda自带,无需单独安装
- 实时查看运行过程
- 基本的web编辑器(本地)
- ipynb 文件分享
- 可交互式
- 记录历史运行结果
修改jupyter显示的文件路径:
通过jupyter notebook --generate-config命令创建配置文件,之后在进入用户文件夹下面查看.jupyter隐藏文件夹,修改其中文件jupyter_notebook_config.py的202行为计算机本地存在的路径。
IPython:
???? 命令:ipython,其功能如下
???? 1.Anaconda自带,无需单独安装
???? 2.Python的交互式命令行 Shell
???? 3.可交互式
???? 4.记录历史运行结果
???? 5.及时验证想法
Spyder:
???? 命令:spyder,其功能如下
???? 1.Anaconda自带,无需单独安装
???? 2.完全免费,适合熟悉Matlab的用户
???? 3.功能强大,使用简单的图形界面开发环境
下面就Anaconda中的conda命令做详细介绍和配置。
- conda命令及pip命令
conda管理数据科学环境,conda和pip类似均为安装、卸载或管理Python第三方包。
conda install? 包名??? pip install 包名 conda uninstall 包名?? pip uninstall 包名 conda install -U 包名?? pip install -U 包名 |
(2) Anaconda设置为国内下载镜像
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/ conda config --set show_channel_urls yes |
(3)conda创建虚拟环境
conda env list conda create py_env python=3.8.8 #创建python3.8.8环境 activate py_env?? #激活环境 deactivate py_env #退出环境 |
-
-
- PySpark安装
三个节点也是都需要安装pySpark的
-
-
-
- 方式1:直接安装PySpark
安装如下:
使用PyPI安装PySpark如下:也可以指定版本安装 pip install pyspark 或者指定清华镜像(对于网络较差的情况): pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark? # 指定清华镜像源 如果要为特定组件安装额外的依赖项,可以按如下方式安装(此步骤暂不执行,后面Sparksql部分会执行): pip install pyspark[sql] |
截图如下:
-
-
-
- [安装]方式2:创建Conda环境安装PySpark
#从终端创建新的虚拟环境(沙箱环境),如下所示 conda create -n pyspark_env python=3.8 #创建虚拟环境后,它应该在 Conda 环境列表下可见,可以使用以下命令查看 conda env list #现在使用以下命令激活新创建的环境: source activate pyspark_env 或者 conda activate pyspark_env 如果报错: CommandNotFoundError: Your shell has not been properly configured to use 'conda deactivate'.切换使用 source activate #您可以在新创建的环境中通过使用PyPI安装PySpark来安装pyspark,例如如下。它将pyspark_env在上面创建的新虚拟环境下安装 PySpark。 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark #或者,可以从 Conda 本身安装 PySpark: conda install pyspark |
-
-
-
- ?[不推荐]方式3:手动下载安装
将spark对应版本下的python目录下的pyspark复制到anaconda的
Library/Python3/site-packages/目录下即可。
请注意,PySpark 需要JAVA_HOME正确设置的Java 8 或更高版本。如果使用 JDK 11,请设置-Dio.netty.tryReflectionSetAccessible=true,Arrow相关功能才可以使用。
扩展:
conda虚拟环境 命令 查看所有环境 conda info --envs 新建虚拟环境 conda create -n myenv python=3.6 删除虚拟环境 conda remove -n myenv --all 激活虚拟环境 conda activate myenv source activate base 退出虚拟环境 conda deactivate myenv |
-
-
- 初体验-PySpark shell方式
前面的Spark Shell实际上使用的是Scala交互式Shell,实际上 Spark 也提供了一个用 Python 交互式Shell,即Pyspark。
bin/pyspark --master local[*] |
-
- Spark Standalone集群环境
Standalone模式是Spark自带的一种集群模式,不同于前面本地模式启动多个进程来模拟集群的环境,Standalone模式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。
节点 | 主节点(master) | 从节点(worker) | 历史服务(history server) | node1 | 是 | 是 | 是 | node2 | 否 | 是 | 否 | node3 | 否 | 是 | 否 |
-
-
- 修改配置文件
说明: 直接对local模型下的spark进行更改为standalone模式
【workers】
cd /export/server/spark/conf/ cp workers.template? workers vim workers 添加以下内容: node1.itcast.cn node2.itcast.cn node3.itcast.cn |
【spark-env.sh】
cd /export/server/spark/conf cp spark-env.sh.template spark-env.sh vim spark-env.sh 增加如下内容: JAVA_HOME=/export/server/jdk1.8.0_241/ HADOOP_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop/ YARN_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop/ export SPARK_MASTER_HOST=node1 export SPARK_MASTER_PORT=7077 SPARK_MASTER_WEBUI_PORT=8080 SPARK_WORKER_CORES=1 SPARK_WORKER_MEMORY=1g SPARK_WORKER_PORT=7078 SPARK_WORKER_WEBUI_PORT=8081 SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1.itcast.cn:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true" |
注意:
?????? Jdk,hadoop, yarn的路径, 需要配置为自己的路径(可能与此位置不一致)
?????? History配置中, 需要指定hdfs的地址, 其中端口号为8020或者9820, 大家需要参考hdfs上对应namenode的通信端口号
【配置spark应用日志】
第一步: 在HDFS上创建应用运行事件日志目录: hdfs dfs -mkdir -p /sparklog/ 第二步: 配置spark-defaults.conf cd /export/server/spark/conf cp spark-defaults.conf.template spark-defaults.conf vim spark-defaults.conf 添加以下内容: spark.eventLog.enabled? true spark.eventLog.dir????? hdfs://node1.itcast.cn:8020/sparklog/ spark.eventLog.compress true |
其中HDFS的地址, 8020 还是9820 需要查看HDFS的界面显示
【log4j.properties】
cd /export/server/spark/conf cp log4j.properties.template log4j.properties vim log4j.properties ## 改变日志级别 |
-
-
- 分发到其他机器
将配置好的将 Spark 安装包分发给集群中其它机器,命令如下:
cd /export/server/ scp -r spark-3.1.2-bin-hadoop3.2/ node2:$PWD scp -r spark-3.1.2-bin-hadoop3.2/ node3:$PWD ##分别在node2, 和node3中创建软连接 ln -s /export/server/spark-3.1.2-bin-hadoop3.2/ ?/export/server/spark |
-
-
- 启动spark Standalone
启动spark集群之前, 一定要先将hadoop启动良好了
- 启动方式1:集群启动和停止
在主节点上启动spark集群
cd /export/server/spark : ? ? sbin/start-history-server.sh |
在主节点上停止spark集群
/export/server/spark/sbin/stop-all.sh |
- 启动方式2:单独启动和停止
在 master 安装节点上启动和停止 master:
start-master.sh stop-master.sh |
在 Master 所在节点上启动和停止worker(work指的是slaves 配置文件中的主机名)
start-slaves.sh stop-slaves.sh |
- WEB UI页面
可以看出,配置了3个Worker进程实例,每个Worker实例为1核1GB内存,总共是3核 3GB 内存。目前显示的Worker资源都是空闲的,当向Spark集群提交应用之后,Spark就会分配相应的资源给程序使用,可以在该页面看到资源的使用情况。
- ?历史服务器HistoryServer:
/export/server/spark/sbin/start-history-server.sh |
WEB UI页面地址:????
-
-
- 连接集群
【spark-shell 连接】
cd /export/server/spark bin/spark-shell --master? spark://node1:7077 |
【pyspark 连接】
cd /export/server/spark ./bin/pyspark --master spark://node1:7077 |
-
- Spark Standalone HA 模式安装
多master模式 其中一个为active节点, 另一个为standby状态
注意: HA模式需要依赖于zookeeper进行协助管理
-
-
- 修改配置
cd /export/server/spark/conf vim spark-env.sh 注释或删除MASTER_HOST内容: # SPARK_MASTER_HOST=node1 增加以下配置: SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181 -Dspark.deploy.zookeeper.dir=/spark-ha" |
说明函数说明:
spark.deploy.recoveryMode:恢复模式
spark.deploy.zookeeper.url:ZooKeeper的Server地址
spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。包括Worker、Driver、Application信息。
-
-
- 配置分发
cd /export/server/spark/conf scp -r spark-env.sh node2:$PWD scp -r spark-env.sh node3:$PWD |
-
-
- 启动集群
首先启动zookeeper服务: 三个节点都需要启动 zkServer.sh status? --查看状态 zkServer.sh stop?? --停止命令 zkServer.sh start?? --启动命令 接着在node1启动spark集群 cd /export/server/spark ./sbin/start-all.sh 最后在node2上单独启动一个master cd /export/server/spark ./sbin/start-master.sh |
查看WebUI
- http://node1:8080/
- http://node2:8080/
默认情况下,先启动Master就为Active Master,如下截图所示:
-
- Spark on YARN 环境搭建
- 修改spark-env.sh
cd /export/server/spark/conf vim /export/server/spark/conf/spark-env.sh 添加以下内容: HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop YARN_CONF_DIR=/export/server/hadoop/etc/hadoop 同步到其他两台 cd /export/server/spark/conf scp -r spark-env.sh node2:$PWD scp -r spark-env.sh node3:$PWD |
-
-
- 修改hadoop的yarn-site.xml
node1修改
cd /export/server/hadoop-3.3.0/etc/hadoop/ vim /export/server/hadoop-3.3.0/etc/hadoop/yarn-site.xml 添加以下内容: <configuration> ??? <!-- 配置yarn主节点的位置 --> ??? <property> ??????? <name>yarn.resourcemanager.hostname</name> ??????? <value>node1</value> ??? </property> ??? <property> ??????? <name>yarn.nodemanager.aux-services</name> ??????? <value>mapreduce_shuffle</value> ??? </property> ??? <!-- 设置yarn集群的内存分配方案 --> ??? <property> ??????? <name>yarn.nodemanager.resource.memory-mb</name> ??????? <value>20480</value> ??? </property> ??? <property> ??????? <name>yarn.scheduler.minimum-allocation-mb</name> ??????? <value>2048</value> ??? </property> ??? <property> ??????? <name>yarn.nodemanager.vmem-pmem-ratio</name> ??????? <value>2.1</value> ??? </property> ?? ?<!-- 开启日志聚合功能 --> ??? <property> ??????? <name>yarn.log-aggregation-enable</name> ??????? <value>true</value> ??? </property> ??? <!-- 设置聚合日志在hdfs上的保存时间 --> ??? <property> ??????? <name>yarn.log-aggregation.retain-seconds</name> ??????? <value>604800</value> ??? </property> ??? <!-- 设置yarn历史服务器地址 --> ??? <property> ??????? <name>yarn.log.server.url</name> ??????? <value>http://node1:19888/jobhistory/logs</value> ??? </property> ??? <!-- 关闭yarn内存检查 --> ??? <property> ??????? <name>yarn.nodemanager.pmem-check-enabled</name> ??????? <value>false</value> ??? </property> ??? <property> ??????? <name>yarn.nodemanager.vmem-check-enabled</name> ??????? <value>false</value> ??? </property> </configuration> |
将其同步到其他两台
cd /export/server/hadoop/etc/hadoop scp -r yarn-site.xml node2:$PWD scp -r yarn-site.xml node3:$PWD |
-
-
- Spark设置历史服务地址
cd /export/server/spark/conf cp spark-defaults.conf.template spark-defaults.conf vim spark-defaults.conf 添加以下内容: spark.eventLog.enabled????????????????? true spark.eventLog.dir????????????????????? hdfs://node1:9820/sparklog/ spark.eventLog.compress???????????????? true spark.yarn.historyServer.address??????? node1:18080 |
设置日志级别:
cd /export/server/spark/conf cp log4j.properties.template log4j.properties vim log4j.properties 修改以下内容: |
同步到其他节点
cd /export/server/spark/conf scp -r spark-defaults.conf log4j.properties node2:$PWD scp -r spark-defaults.conf log4j.properties node3:$PWD |
-
-
- 配置依赖spark jar包
当Spark Application应用提交运行在YARN上时,默认情况下,每次提交应用都需要将依赖Spark相关jar包上传到YARN 集群中,为了节省提交时间和存储空间,将Spark相关jar包上传到HDFS目录中,设置属性告知Spark Application应用。
hadoop fs -mkdir -p /spark/jars/ hadoop fs -put /export/server/spark/jars/* /spark/jars/ |
修改spark-defaults.conf
cd /export/server/spark/conf vim spark-defaults.conf 添加以下内容: spark.yarn.jars? hdfs://node1:8020/spark/jars/* |
同步到其他节点
cd /export/server/spark/conf scp -r spark-defaults.conf root@node2:$PWD scp -r spark-defaults.conf root@node3:$PWD |
-
-
- 启动服务
Spark Application运行在YARN上时,上述配置完成
启动服务:HDFS、YARN、MRHistoryServer和Spark HistoryServer,命令如下:
## 启动HDFS和YARN服务,在node1执行命令 start-dfs.sh start-yarn.sh 或 start-all.sh 注意:在onyarn模式下不需要启动start-all.sh(jps查看一下看到worker和master) ## 启动MRHistoryServer服务,在node1执行命令 mr-jobhistory-daemon.sh start historyserver ## 启动Spark HistoryServer服务,,在node1执行命令 /export/server/spark/sbin/start-history-server.sh |
- Spark HistoryServer服务WEB UI页面地址:
-
-
- 提交测试
先将圆周率PI程序提交运行在YARN上,命令如下:
SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master yarn \ --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10 |
运行完成在YARN 监控页面截图如下:
设置资源信息,提交运行pi程序至YARN上,命令如下:
SPARK_HOME=/export/server/spark ${SPARK_HOME}/bin/spark-submit \ --master yarn \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --num-executors 2 \ --queue default \ --conf "spark.pyspark.driver.python=/root/anaconda3/bin/python3" \ --conf "spark.pyspark.python=/root/anaconda3/bin/python3" \ ${SPARK_HOME}/examples/src/main/python/pi.py \ 10 |
当pi应用运行YARN上完成以后,从8080 WEB 页面点击应用历史服务连接,查看应用运行状态信息。
-
- SparkSQL与Hive整合
- SparkSQL整合Hive步骤
- 第一步:将hive-site.xml拷贝到spark安装路径conf目录
node1执行以下命令来拷贝hive-site.xml到所有的spark安装服务器上面去
-
-
-
- 第二步:将mysql的连接驱动包拷贝到spark的jars目录下
node1执行以下命令将连接驱动包拷贝到spark的jars目录下,三台机器都要进行拷贝
cd /export/server/hive/lib cp mysql-connector-java-5.1.32.jar? /export/server/spark/jars/ scp mysql-connector-java-5.1.32.jar? root@node2:/export/server/spark/jars/ scp mysql-connector-java-5.1.32.jar? root@node3:/export/server/spark/jars/ |
-
-
-
- 第三步:Hive开启MetaStore服务
(1)修改 hive/conf/hive-site.xml新增如下配置
远程模式部署metastore 服务地址 <?xml?version="1.0"?> <?xml-stylesheet?type="text/xsl"?href="configuration.xsl"?> <configuration> ????<property> ????????<name>hive.metastore.uris</name> ????????<value>thrift://node1:9083</value> ????</property> </configuration> |
2: 后台启动 Hive MetaStore服务
前台启动:
bin/hive --service metastore & |
后台启动:
nohup /export/server/hive/bin/hive --service metastore 2>&1 >> /var/log.log & |
完整的hive-site.xml文件
<configuration> ??? <!-- 存储元数据mysql相关配置 --> ??? <property> ??????? <name>javax.jdo.option.ConnectionURL</name> ??????? <value> jdbc:mysql://node1:3306/hive?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=UTF-8</value> ??? </property> ??? <property> ??????? <name>javax.jdo.option.ConnectionDriverName</name> ??????? <value>com.mysql.jdbc.Driver</value> ??? </property> ??? <property> ??????? <name>javax.jdo.option.ConnectionUserName</name> ??????? <value>root</value> ??? </property> ??? <property> ??????? <name>javax.jdo.option.ConnectionPassword</name> ??????? <value>123456</value> ??? </property> ?<!-- H2S运行绑定host --> ??? <property> ??????? <name>hive.server2.thrift.bind.host</name> ??????? <value>node1</value> ??? </property> ??? <!-- 远程模式部署metastore 服务地址 --> ??? <property> ??????? <name>hive.metastore.uris</name> ??????? <value>thrift://node1:9083</value> ??? </property> ??? <!-- 关闭元数据存储授权? --> ??? <property> ??????? <name>hive.metastore.event.db.notification.api.auth</name> ??????? <value>false</value> ??? </property> ??? <!-- 关闭元数据存储版本的验证 --> ??? <property> ??????? <name>hive.metastore.schema.verification</name> ??????? <value>false</value> ??? </property> </configuration> |
-
-
-
- 第四步:测试Sparksql整合Hive是否成功
- [方式1]Spark-sql方式测试
先启动hadoop集群,在启动spark集群,确保启动成功之后node1执行命令,指明master地址、每一个executor的内存大小、一共所需要的核数、mysql数据库连接驱动:
cd /export/server/spark bin/spark-sql --master local[2] --executor-memory 512m --total-executor-cores 1 或 bin/spark-sql --master spark://node1.itcast.cn:7077 --executor-memory 512m --total-executor-cores 1 |
执行成功后的界面:进入到spark-sql 客户端命令行界面
查看当前有哪些数据库, 并创建数据库
show databases; create database sparkhive; |
看到数据的结果,说明sparksql整合hive成功!
注意:日志太多,我们可以修改spark的日志输出级别(conf/log4j.properties)
注意:
在spark2.0版本后由于出现了sparkSession,在初始化sqlContext的时候,会设置默认的spark.sql.warehouse.dir=spark-warehouse,
此时将hive与sparksql整合完成之后,在通过spark-sql脚本启动的时候,还是会在那里启动spark-sql脚本,就会在当前目录下创建一个spark.sql.warehouse.dir为spark-warehouse的目录,存放由spark-sql创建数据库和创建表的数据信息,与之前hive的数据息不是放在同一个路径下(可以互相访问)。但是此时spark-sql中表的数据在本地,不利于操作,也不安全。
所有在启动的时候需要加上这样一个参数:
--conf? spark.sql.warehouse.dir=hdfs://node1:9820/user/hive/warehouse
保证spark-sql启动时不在产生新的存放数据的目录,sparksql与hive最终使用的是hive同一存放数据的目录。如果使用的是spark2.0之前的版本,由于没有sparkSession,不会出现spark.sql.warehouse.dir配置项,不会出现上述问题。
Spark2之后最后的执行脚本,node1执行以下命令重新进去spark-sql
cd /export/server/spark bin/spark-sql \ ?--master spark://node1:7077 \ ?--executor-memory 512m --total-executor-cores 1 \ ?--conf spark.sql.warehouse.dir=hdfs://node1:9820/user/hive/warehouse |
- [方式2]PySpark-Shell方式启动:
bin/spark-shell --master local[3] spark.sql("show databases").show |
如下图:
- [方式3]PySpark-Shell方式启动:
bin/pyspark --master local[2] spark.sql("show databases").show |
- PyCharm整合Hive
- 操作准备
●原理
Hive表的元数据库中,描述了有哪些database、table、以及表有多少列,每一列是什么类型,以及表的数据保存在hdfs的什么位置
执行HQL时,先到MySQL元数据库中查找描述信息,然后解析HQL并根据描述信息生成MR任务,简单来说Hive就是将SQL根据MySQL中元数据信息转成MapReduce执行,但是速度慢
使用SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表
所以首先需要开启Hive的元数据库服务,让SparkSQL能够加载元数据
●API
在Spark2.0之后,SparkSession对HiveContext和SqlContext在进行了统一
可以通过操作SparkSession来操作HiveContext和SqlContext。
-
-
-
- SparkSQL整合Hive MetaStore
默认Spark 有一个内置的 MateStore,使用 Derby 嵌入式数据库保存数据【上面案例】,但是这种方式不适合生产环境,因为这种模式同一时间只能有一个 SparkSession 使用,所以生产环境更推荐使用 Hive 的 MetaStore
SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它,并且能够使用 HDFS保存WareHouse,所以可以直接拷贝Hadoop和Hive的配置文件到Spark的配置目录。
-
-
-
- 使用SparkSQL操作集群Hive表
如下为HiveContext的源码解析:
在PyCharm中开发应用,集成Hive读取表的数据进行分析,构建SparkSession时需要设置HiveMetaStore服务器地址及集成Hive选项:
范例演示代码如下:
# -*- coding: utf-8 -*-
# Program function:
from pyspark.sql import SparkSession
import os
os.environ['SPARK_HOME'] = '/export/servers/spark'
PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
??? # _SPARK_HOST = "spark://node1:7077"
??? _SPARK_HOST = "local[3]"
??? _APP_NAME = "test"
??? spark = SparkSession.builder\ ??????? .appName("_01_onhive")\ ??????? .master("local[*]")\ ??????? .config("spark.sql.shuffle.partitions",'4')\ ??????? .config("spark.sql.warehouse.dir","hdfs://node1:8020/user/hive/warehouse")\ ??????? .config("hive.metastore.uris","thrift://node1:9083")\ ??????? .enableHiveSupport()\ ??????? .getOrCreate()
??? spark.sparkContext.setLogLevel("WARN")
??? #PROJECT_ROOT = os.path.dirname(os.path.realpath(__file__))? # 获取项目根目录
??? # print(PROJECT_ROOT)#/export/pyfolder1/pyspark-chapter03_3.8/main
??? #path = os.path.join(PROJECT_ROOT, "data\\edge\\0_fuse.txt")? # 文件路径
??? # 查看有哪些表
??? spark.sql("show databases").show()
??? spark.sql("use sparkhive").show()
??? spark.sql("show tables").show()
??? # 创建表
??? spark.sql(
??????? "create table if not exists person (id int, name string, age int) row format delimited fields terminated by ','")
??? # 加载数据, 数据为当前目录下的person.txt(和src平级)
??? spark.sql("LOAD DATA LOCAL INPATH '/export/pyfolder1/pyspark-chapter03_3.8/data/student.csv' INTO TABLE person")
??? # 查询数据
??? spark.sql("select * from person ").show()
??? print("===========================================================")
??? import pyspark.sql.functions as fn
??? spark.read \
??????? .table("person ") \
??????? .groupBy("name") \
??????? .agg(fn.round(fn.avg("age"), 2).alias("avg_age")) \
??????? .show(10, truncate=False)
??? spark.stop() |
运行程序结果如下:
|