spark简介及spark部署、原理和开发环境搭建
spark简介
Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发的通用内存并行计算框架
Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。
版本历史
hadoop对比版本历史

spark对比版本历史

spark对比MapReduce框架

RDD:抽象弹性分布式数据集( Resiliennt Distributed Datasets )
spark内置模块

Spark Core:实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core中还包含了对弹性分布式数据集(Resilient Distributed DataSet,简称RDD)的API定义。
Spark SQL:是Spark用来操作结构化数据的程序包。通过Spark SQL,我们可以使用 SQL或者Apache Hive版本的HQL来查询数据。Spark SQL支持多种数据源,比如Hive表、Parquet以及JSON等。
Spark Streaming:是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与Spark Core中的 RDD API高度对应。
Spark MLlib:提供常见的机器学习功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据 导入等额外的支持功能。
Spark GraphX:主要用于图形并行计算和图挖掘系统的组件。 集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(Cluster Manager)上运行,包括Hadoop YARN、Apache Mesos,以及Spark自带的一个简易调度器,叫作独立调度器。
spark特点
运行速度快:
与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中
易用性好:
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的Shell,可以非常方便地在这些Shell中使用Spark集群来验证解决问题的方法
通用性强:
Spark提供了统一的解决方案。Spark可以用于,交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。减少了开发和维护的人力成本和部署平台的物力成本
高兼容性:
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力
spark部署
Spark运行模式
部署Spark集群大体上分为两种模式:单机模式与集群模式 大多数分布式框架都支持单机模式,方便开发者调试框架的运行环境。但是在生产环境中,并不会使用单机模式。因此,后续直接按照集群模式部署Spark集群。 下面详细列举了Spark目前支持的部署模式。
Local模式
在本地部署单个Spark服务,比较适合简单了解spark目录结构,熟悉配置文件,简单跑一下demo示例等调试场景。
Standalone模式
Spark自带的任务调度模式,多个spark机器之间内部协作调度,但仅是spark自身的任务调度
YARN模式
Spark使用Hadoop的YARN组件进行资源与任务调度,真正意义上spark与外部对接协作。
Mesos模式
Spark使用Mesos平台进行资源与任务的调度。Spark客户端直接连接Mesos;不需要额外构建Spark集群。
( Mesos是一个集群管理平台。 可以理解为是一种分布式系统的kernel, 负责集群资源的分配, 这里的资源指的是CPU资源, 内存资源, 存储资源, 网络资源等。 在Mesos可以运行Spark, Storm, Hadoop, Marathon等多种Framework )
spark官网

Local模式部署使用介绍
下载安装包:官网 -> Download -> release archives -> spark-2.1.1 -> spark-2.1.1-bin-hadoop2.7.tgz 下载
解压Spark安装包
wangting@ops01:/opt/software >tar -xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
目录改名
wangting@ops01:/opt/software >cd /opt/module/
wangting@ops01:/opt/module >mv spark-2.1.1-bin-hadoop2.7 spark-local
目录结构
wangting@ops01:/opt/module >cd spark-local/
wangting@ops01:/opt/module/spark-local >
wangting@ops01:/opt/module/spark-local >ll
total 104
drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 bin
drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 conf
drwxr-xr-x 5 wangting wangting 4096 Apr 26 2017 data
drwxr-xr-x 4 wangting wangting 4096 Apr 26 2017 examples
drwxr-xr-x 2 wangting wangting 12288 Apr 26 2017 jars
-rw-r--r-- 1 wangting wangting 17811 Apr 26 2017 LICENSE
drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 licenses
-rw-r--r-- 1 wangting wangting 24645 Apr 26 2017 NOTICE
drwxr-xr-x 8 wangting wangting 4096 Apr 26 2017 python
drwxr-xr-x 3 wangting wangting 4096 Apr 26 2017 R
-rw-r--r-- 1 wangting wangting 3817 Apr 26 2017 README.md
-rw-r--r-- 1 wangting wangting 128 Apr 26 2017 RELEASE
drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 sbin
drwxr-xr-x 2 wangting wangting 4096 Apr 26 2017 yarn
官方demo示例求圆周率Pi
wangting@ops01:/opt/module/spark-local >bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[4] /opt/module/spark-local/examples/jars/spark-examples_2.11-2.1.1.jar 20
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/22 10:52:01 INFO SparkContext: Running Spark version 2.1.1
21/07/22 10:52:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/22 10:52:02 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cee9f744-b8dd-4c75-83be-3884f3b4425b
21/07/22 10:52:02 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
21/07/22 10:52:02 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/22 10:52:02 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/22 10:52:02 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://11.8.37.50:4040
21/07/22 10:52:02 INFO SparkContext: Added JAR file:/opt/module/spark-local/examples/jars/spark-examples_2.11-2.1.1.jar at spark://11.8.37.50:46388/jars/spark-examples_2.11-2.1.1.jar with timestamp 1626922322910
21/07/22 10:52:02 INFO Executor: Starting executor ID driver on host localhost
21/07/22 10:52:04 INFO Executor: Fetching spark://11.8.37.50:46388/jars/spark-examples_2.11-2.1.1.jar with timestamp 1626922322910
21/07/22 10:52:04 INFO TransportClientFactory: Successfully created connection to /11.8.37.50:46388 after 28 ms (0 ms spent in bootstraps)
21/07/22 10:52:04 INFO Utils: Fetching spark://11.8.37.50:46388/jars/spark-examples_2.11-2.1.1.jar to /tmp/spark-86795505-2fa5-4e6e-9331-f26233e462b2/userFiles-174c50ee-3aff-4523-a89b-17910dcd467e/fetchFileTemp632487868763480019.tmp
21/07/22 10:52:04 INFO Executor: Adding file:/tmp/spark-86795505-2fa5-4e6e-9331-f26233e462b2/userFiles-174c50ee-3aff-4523-a89b-17910dcd467e/spark-examples_2.11-2.1.1.jar to class loader
21/07/22 10:52:05 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.386012 s
Pi is roughly 3.140143570071785
21/07/22 10:52:05 INFO SparkUI: Stopped Spark web UI at http://11.8.37.50:4040
21/07/22 10:52:05 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/22 10:52:05 INFO MemoryStore: MemoryStore cleared
21/07/22 10:52:05 INFO BlockManager: BlockManager stopped
21/07/22 10:52:05 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/22 10:52:05 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/22 10:52:05 INFO SparkContext: Successfully stopped SparkContext
21/07/22 10:52:05 INFO ShutdownHookManager: Shutdown hook called
21/07/22 10:52:05 INFO ShutdownHookManager: Deleting directory /tmp/spark-86795505-2fa5-4e6e-9331-f26233e462b2
bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[4] /opt/module/spark-local/examples/jars/spark-examples_2.11-2.1.1.jar 20
–class:表示要执行程序jar包的主类 –master local[2] (1)local: 没有指定线程数,则所有计算都运行在一个线程当中,没有任何并行计算 (2)local[K]:指定使用K个Core来运行计算,比如local[4]就是运行4个Core来执行 (3)local[*]: 自动按照CPU最多核来设置线程数。比如CPU有4核,Spark帮你自动设置4个线程计算 spark-examples_2.11-2.1.1.jar:要运行的程序jar包
20 :要运行程序的输入参数 ( 计算圆周率π的次数,计算次数越多,准确率越高 , 这里只是应用示例定义传参)
官方wordcount示例
wordcount将实现多个文件中,若干单词总计次数,统计词频

创建实验目录及文件
wangting@ops01:/opt/module/spark-local >mkdir input
wangting@ops01:/opt/module/spark-local >cd input/
wangting@ops01:/opt/module/spark-local/input >echo "hello spark" >> 1.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello scala" >> 1.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello flower" >> 1.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello wangt" >> 1.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello hello" >> 2.txt
wangting@ops01:/opt/module/spark-local/input >echo "hello niubi" >> 2.txt
wangting@ops01:/opt/module/spark-local/input >echo "wang wang" >> 2.txt
wangting@ops01:/opt/module/spark-local/input >echo "wangt ting" >> 2.txt
wangting@ops01:/opt/module/spark-local/input >cat 1.txt
hello spark
hello scala
hello flower
hello wangt
wangting@ops01:/opt/module/spark-local/input >cat 2.txt
hello hello
hello niubi
wang wang
wangt ting
进入spark-shell命令行
wangting@ops01:/opt/module/spark-local/input >cd ..
wangting@ops01:/opt/module/spark-local >bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/07/22 11:01:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/22 11:01:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
21/07/22 11:01:09 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
21/07/22 11:01:10 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://11.8.37.50:4040
Spark context available as 'sc' (master = local[*], app id = local-1626922864098).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
【注意】:
- spark-submit和spark-shell只是任务执行的两种途径
- spark-submit,是将jar上传到集群,执行Spark任务;
- spark-shell,相当于命令行工具,本身也是一个Application;
- 当打开spark-shell时,会开启一个SparkSubmit的进程,端口为4040,是application的weiUI的端口号,命令行保持连接则进程端口存活,命令行退出则进程端口同时也会回收关闭。
执行wordcount任务
scala> sc.textFile("/opt/module/spark-local/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((ting,1), (scala,1), (hello,7), (flower,1), (spark,1), (niubi,1), (wangt,2), (wang,2))
【说明】:( 命令行操作时tab键可以补全 ) def textFile(path: String,minPartitions: Int): org.apache.spark.rdd.RDD[String] textFile() -> 读取本地文件input文件夹数据
def flatMap[U](f: String => TraversableOnce[U])(implicit evidence$4: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U] flatMap() -> 压平操作,按照空格分割符将一行数据映射成一个个单词
def map[U](f: String => U)(implicit evidence$3: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U] map() -> 对每一个元素操作,将单词映射为元组
def reduceByKey(func: (Int, Int) => Int): org.apache.spark.rdd.RDD[(String, Int)] reduceByKey() -> 按照key将值进行聚合,相加
def collect[U](f: PartialFunction[(String, Int),U](implicit evidence$29: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U] def collect():Array[(String,Int)] collect() -> 将数据收集到Driver端展示
页面查看

点击对应的job可以看到更详细的信息

集群角色介绍
Master & Worker

Driver & Executor

【注意】:
-
Master和Worker是Spark的守护进程,即Spark在特定模式下正常运行所必须的进程。 -
Driver和Executor是临时程序,当有具体任务提交到Spark集群才会开启的程序
Standalone模式部署使用介绍
Standalone模式是Spark自带的资源调动引擎,构建一个由Master + Slave构成的Spark集群,Spark运行在集群中。
这个要和Hadoop中的Standalone区别开来。这里的Standalone是指只用Spark来搭建一个集群,不需要借助其他的框架。是相对于Yarn和Mesos来说的
机器规划(3台即可):
ops01 11.8.37.50 master|worker
ops02 11.8.36.63 worker
ops03 11.8.36.76 worker
ops04 11.8.36.86 worker
【注意】:把信息配置在/etc/hosts主机解析文件中
wangting@ops01:/opt/module >cat /etc/hosts
127.0.0.1 ydt-cisp-ops01
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
11.16.0.176 rancher.mydomain.com
11.8.38.123 www.tongtongcf.com
11.8.37.50 ops01
11.8.36.63 ops02
11.8.36.76 ops03
11.8.38.86 ops04
11.8.38.82 jpserver ydt-dmcp-jpserver
wangting@ops01:/opt/module >
解压安装包
wangting@ops01:/opt/software >tar -xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
目录改名
wangting@ops01:/opt/software >mv /opt/module/spark-2.1.1-bin-hadoop2.7 /opt/module/spark-standalone
配置文件目录
wangting@ops01:/opt/software >cd /opt/module/spark-standalone/conf/
wangting@ops01:/opt/module/spark-standalone/conf >
wangting@ops01:/opt/module/spark-standalone/conf >ll
total 32
-rw-r--r-- 1 wangting wangting 987 Apr 26 2017 docker.properties.template
-rw-r--r-- 1 wangting wangting 1105 Apr 26 2017 fairscheduler.xml.template
-rw-r--r-- 1 wangting wangting 2025 Apr 26 2017 log4j.properties.template
-rw-r--r-- 1 wangting wangting 7313 Apr 26 2017 metrics.properties.template
-rw-r--r-- 1 wangting wangting 865 Apr 26 2017 slaves.template
-rw-r--r-- 1 wangting wangting 1292 Apr 26 2017 spark-defaults.conf.template
-rwxr-xr-x 1 wangting wangting 3960 Apr 26 2017 spark-env.sh.template
修改slaves配置定义集群
wangting@ops01:/opt/module/spark-standalone/conf >mv slaves.template slaves
wangting@ops01:/opt/module/spark-standalone/conf >vim slaves
ops01
ops02
ops03
ops04
修改spark-env.sh文件,添加master节点
wangting@ops01:/opt/module/spark-standalone/conf >mv spark-env.sh.template spark-env.sh
wangting@ops01:/opt/module/spark-standalone/conf >vim spark-env.sh
SPARK_MASTER_HOST=ops01
SPARK_MASTER_PORT=7077
分发spark-standalone目录至各节点
wangting@ops01:/opt/module >scp -r spark-standalone ops02:/opt/module/
wangting@ops01:/opt/module >scp -r spark-standalone ops03:/opt/module/
wangting@ops01:/opt/module >scp -r spark-standalone ops04:/opt/module/
检查8080端口和spark进程
wangting@ops01:/home/wangting >sudo netstat -tnlpu|grep 8080
wangting@ops01:/home/wangting >
wangting@ops01:/home/wangting >jps -l | grep spark
wangting@ops01:/home/wangting >
启动spark-standalone集群
wangting@ops01:/home/wangting >cd /opt/module/spark-standalone/
wangting@ops01:/opt/module/spark-standalone >sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.master.Master-1-ops01.out
ops04: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops04.out
ops03: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops03.out
ops01: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops01.out
ops02: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops02.out
ops04: failed to launch: nice -n 0 /opt/module/spark-standalone/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ops01:7077
ops04: JAVA_HOME is not set
ops04: full log in /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops04.out
wangting@ops01:/opt/module/spark-standalone >
wangting@ops01:/opt/module/spark-standalone >sudo netstat -tnlpu|grep 8080
tcp6 0 0 :::8080 :::* LISTEN 57689/java
wangting@ops01:/opt/module/spark-standalone >jps -l | grep spark
57809 org.apache.spark.deploy.worker.Worker
57689 org.apache.spark.deploy.master.Master
wangting@ops01:/opt/module/spark-standalone >
处理JAVA_HOME is not set
服务虽然成功启动,但是启动集群时,提示在ops04上有 : ops04: JAVA_HOME is not set
切换至ops04服务器
wangting@ops04:/opt/module/spark-standalone >echo $JAVA_HOME
/usr/java8_64/jdk1.8.0_101
wangting@ops04:/opt/module/spark-standalone >vim sbin/spark-config.sh
export JAVA_HOME=/usr/java8_64/jdk1.8.0_101
切换回master: ops01重启服务
wangting@ops01:/opt/module/spark-standalone >sbin/stop-all.sh
ops01: stopping org.apache.spark.deploy.worker.Worker
ops03: stopping org.apache.spark.deploy.worker.Worker
ops04: no org.apache.spark.deploy.worker.Worker to stop
ops02: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
wangting@ops01:/opt/module/spark-standalone >
wangting@ops01:/opt/module/spark-standalone >sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.master.Master-1-ops01.out
ops01: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops01.out
ops03: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops03.out
ops04: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops04.out
ops02: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.worker.Worker-1-ops02.out
wangting@ops01:/opt/module/spark-standalone >
之前的提示已经处理了
浏览器查看界面

官方demo示例求圆周率Pi
wangting@ops01:/opt/module/spark-standalone >bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://ops01:7077 /opt/module/spark-standalone/examples/jars/spark-examples_2.11-2.1.1.jar 20
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/23 15:13:39 INFO SparkContext: Running Spark version 2.1.1
21/07/23 15:13:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/23 15:13:39 INFO SecurityManager: Changing view acls to: wangting
21/07/23 15:13:39 INFO SecurityManager: Changing modify acls to: wangting
21/07/23 15:13:39 INFO SecurityManager: Changing view acls groups to:
21/07/23 15:13:39 INFO SecurityManager: Changing modify acls groups to:
21/07/23 15:13:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
21/07/23 15:13:44 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 2.824153 s
Pi is roughly 3.1423635711817854
21/07/23 15:13:44 INFO SparkUI: Stopped Spark web UI at http://11.8.37.50:4040
21/07/23 15:13:44 INFO StandaloneSchedulerBackend: Shutting down all executors
21/07/23 15:13:44 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
21/07/23 15:13:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/23 15:13:44 INFO MemoryStore: MemoryStore cleared
21/07/23 15:13:44 INFO BlockManager: BlockManager stopped
21/07/23 15:13:44 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/23 15:13:44 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/23 15:13:44 INFO SparkContext: Successfully stopped SparkContext
21/07/23 15:13:44 INFO ShutdownHookManager: Shutdown hook called
21/07/23 15:13:44 INFO ShutdownHookManager: Deleting directory /tmp/spark-6547bdc7-5117-4c44-8f14-4328fa38ace6
页面查看任务状态

指定资源执行任务
wangting@ops01:/opt/module/spark-standalone >bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://ops01:7077 --executor-memory 8G --total-executor-cores 4 /opt/module/spark-standalone/examples/jars/spark-examples_2.11-2.1.1.jar 20
wangting@ops01:/opt/module/spark-standalone >
页面查看新任务状态资源变化

配置历史服务
【注意】: 默认已经安装有hdfs环境,如果没有需要先搭建部署一下,如果仅仅是为了实验测试可以使用local版即可;如搭建集群则默认已经具备了hdfs集群环境。
由于spark-shell停止掉后,hadoop102:4040页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况
wangting@ops01:/opt/module/spark-standalone >cd conf/
wangting@ops01:/opt/module/spark-standalone/conf >mv spark-defaults.conf.template spark-defaults.conf
wangting@ops01:/opt/module/spark-standalone/conf >
wangting@ops01:/opt/module/spark-standalone/conf >vim spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://ops01:8020/directory
wangting@ops01:/opt/module/spark-standalone/conf >hdfs dfs -ls /
2021-07-23 15:24:45,730 INFO [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
Found 15 items
drwxr-xr-x - wangting supergroup 0 2021-03-17 11:44 /20210317
drwxr-xr-x - wangting supergroup 0 2021-03-19 10:51 /20210319
drwxr-xr-x - wangting supergroup 0 2021-04-24 17:05 /flume
-rw-r--r-- 3 wangting supergroup 338075860 2021-03-12 11:50 /hadoop-3.1.3.tar.gz
drwxr-xr-x - wangting supergroup 0 2021-05-13 15:31 /hbase
drwxr-xr-x - wangting supergroup 0 2021-05-26 16:56 /origin_data
drwxr-xr-x - wangting supergroup 0 2021-06-10 10:31 /spark-history
drwxr-xr-x - wangting supergroup 0 2021-06-10 10:39 /spark-jars
drwxr-xr-x - wangting supergroup 0 2021-06-10 11:11 /student
drwxr-xr-x - wangting supergroup 0 2021-04-04 11:07 /test.db
drwxr-xr-x - wangting supergroup 0 2021-03-19 11:14 /testgetmerge
drwxr-xr-x - wangting supergroup 0 2021-04-10 16:23 /tez
drwx------ - wangting supergroup 0 2021-04-02 15:14 /tmp
drwxr-xr-x - wangting supergroup 0 2021-04-02 15:25 /user
drwxr-xr-x - wangting supergroup 0 2021-06-10 11:43 /warehouse
wangting@ops01:/opt/module/spark-standalone/conf >hdfs dfs -mkdir /directory
2021-07-23 15:25:14,573 INFO [main] Configuration.deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
wangting@ops01:/opt/module/spark-standalone/conf >hdfs dfs -ls / | grep directory
drwxr-xr-x - wangting supergroup 0 2021-07-23 15:25 /directory
wangting@ops01:/opt/module/spark-standalone/conf >vim spark-env.sh
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://ops01:8020/directory
-Dspark.history.retainedApplications=30"
wangting@ops01:/opt/module/spark-standalone/conf >
wangting@ops01:/opt/module/spark-standalone/conf >scp spark-env.sh spark-defaults.conf ops02:/opt/module/spark-standalone/conf/
wangting@ops01:/opt/module/spark-standalone/conf >scp spark-env.sh spark-defaults.conf ops03:/opt/module/spark-standalone/conf/
wangting@ops01:/opt/module/spark-standalone/conf >scp spark-env.sh spark-defaults.conf ops04:/opt/module/spark-standalone/conf/
wangting@ops01:/opt/module/spark-standalone >sbin/start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/module/spark-standalone/logs/spark-wangting-org.apache.spark.deploy.history.HistoryServer-1-ops01.out
wangting@ops01:/opt/module/spark-standalone >
wangting@ops01:/opt/module/spark-standalone >bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://ops01:7077 --executor-memory 8G --total-executor-cores 4 /opt/module/spark-standalone/examples/jars/spark-examples_2.11-2.1.1.jar 20
通过ip:18080访问历史任务页面
http://11.8.37.50:18080/

因为这里standalone仅实验使用,暂不考虑高可用相关操作,高可用省略,正式环境一般国内几乎都是使用yarn模式为主,国外有mesos模式
Yarn模式部署使用介绍
yarn模式 需要提前准备hadoop集群,hdfs以及yarn集群;在之前的文章里有写过部署方式 :《hadoop介绍部署文档》
服务 | ops01(8C32G) | ops02(8C24G) | ops03(8C24G) | ops04(8C24G) | version |
---|
Hdfs | NameNode | Datanode | SecondaryNameNode | Datanode | 3.1.3 | Yarn | NodeManager | ReSourceManager / NodeManager | NodeManager | NodeManager | 3.1.3 | MapReduce | √ JobHistoryServer | √ | √ | √ | 3.1.3 |
停止Standalone模式下的spark集群
wangting@ops01:/opt/module/spark-standalone >sbin/stop-all.sh
ops01: stopping org.apache.spark.deploy.worker.Worker
ops04: stopping org.apache.spark.deploy.worker.Worker
ops03: stopping org.apache.spark.deploy.worker.Worker
ops02: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
wangting@ops01:/opt/module/spark-standalone >
解压spark包并改名
wangting@ops01:/opt/module/spark-standalone >cd /opt/software/
wangting@ops01:/opt/software >tar -xf spark-2.1.1-bin-hadoop2.7.tgz -C /opt/module/
wangting@ops01:/opt/software >cd /opt/module/
wangting@ops01:/opt/module >mv spark-2.1.1-bin-hadoop2.7 spark-yarn
wangting@ops01:/opt/module >cd spark-yarn/
修改配置spark-env
最后增加配置项:YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop ( 定义hadoop安装配置路径 )
wangting@ops01:/opt/module/spark-yarn >cd conf/
wangting@ops01:/opt/module/spark-yarn/conf >mv spark-env.sh.template spark-env.sh
wangting@ops01:/opt/module/spark-yarn/conf >vim spark-env.sh
YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
启动HDFS、YARN
确认已启动HDFS以及YARN集群(在hadoop/sbin/目录下启动)
这一步每个环境不一样,取决于自己本地部署在哪里,每个组件的master在哪个服务器上
wangting@ops01:/opt/module/hadoop-3.1.3 >sbin/start-dfs.sh
Starting namenodes on [ops01]
Starting datanodes
Starting secondary namenodes [ops03]
wangting@ops02:/opt/module/hadoop-3.1.3/sbin >./start-yarn.sh
Starting resourcemanager
Starting nodemanagers
官方demo示例求圆周率Pi
wangting@ops01:/opt/module/spark-yarn >bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
/opt/module/spark-yarn/examples/jars/spark-examples_2.11-2.1.1.jar \
20
2021-07-26 11:41:56,166 INFO spark.SparkContext: Running Spark version 2.1.1
2021-07-26 11:41:56,606 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-07-26 11:41:56,784 INFO spark.SecurityManager: Changing view acls to: wangting
2021-07-26 11:41:56,784 INFO spark.SecurityManager: Changing modify acls to: wangting
2021-07-26 11:41:56,785 INFO spark.SecurityManager: Changing view acls groups to:
2021-07-26 11:41:56,786 INFO spark.SecurityManager: Changing modify acls groups to:
2021-07-26 11:41:56,786 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(wangting); groups with view permissions: Set(); users with modify permissions: Set(wangting); groups with modify permissions: Set()
2021-07-26 11:42:21,960 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 0.933 s
2021-07-26 11:42:21,965 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.150791 s
Pi is roughly 3.139429569714785
2021-07-26 11:42:21,976 INFO server.ServerConnector: Stopped Spark@61edc883{HTTP/1.1}{0.0.0.0:4040}
2021-07-26 11:42:21,977 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@6063d80a{/stages/stage/kill,null,UNAVAILABLE,@Spark}
2021-07-26 11:42:21,977 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler@5ae76500{/jobs/job/kill,null,UNAVAILABLE,@Spark}
2021-07-26 11:42:22,010 INFO cluster.YarnClientSchedulerBackend: Stopped
2021-07-26 11:42:22,015 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2021-07-26 11:42:22,027 INFO memory.MemoryStore: MemoryStore cleared
2021-07-26 11:42:22,027 INFO storage.BlockManager: BlockManager stopped
2021-07-26 11:42:22,033 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2021-07-26 11:42:22,037 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2021-07-26 11:42:22,038 INFO spark.SparkContext: Successfully stopped SparkContext
2021-07-26 11:42:22,040 INFO util.ShutdownHookManager: Shutdown hook called
2021-07-26 11:42:22,041 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-d3cf3aec-be6f-41f0-a950-4521641e6179
查看集群yarn resourcemanager 8088端口

可以看到历史作业记录
spark-yarn运行流程
Spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。 yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。 yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster适用于生产环境。
客户端模式
wangting@ops01:/opt/module/spark-yarn >bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10
在控制台输出可以直接看到输出结果
Client模式任务流程

集群模式
wangting@ops01:/opt/module/spark-yarn >bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.11-2.1.1.jar \
10

点进去任务,在日志输出中是可以看到最终的Pi输出

Cluster模式任务流程

集中spark模式对比( 不包括mesos )
模式 | Spark安装机器数 | 需启动的进程 | 所属者 |
---|
Local模式 | 1 | 无 | Spark | Standalone模式 | 3 | Master及Worker | Spark | Yarn模式 | 1 | 依赖现有 Yarn及HDFS | Hadoop |
Spark端口整理
1)Spark历史服务器端口号:18080 (类比于Hadoop历史服务器端口号:19888)
2)Spark Master Web端口号:8080(类比于Hadoop的NameNode Web端口号:9870(50070))
3)Spark Master内部通信服务端口号:7077 (类比于Hadoop的8020 ( 9000 )端口)
4)Spark查看当前Spark-shell运行任务情况端口号:4040
5)Hadoop YARN任务运行情况查看端口号:8088
本地开发环境+wordcount案例
先安装idea代码管理工具,安装java环境、scala环境、idea配置maven环境等等准备工作,可自行百度。
? Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成Jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理Jar包的依赖。
编写程序
1)创建一个Maven项目
2)准备一些wordcount素材
3)导入项目依赖
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wangting</groupId>
<artifactId>spark_wt_test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

【注意】:
- 新建的项目为maven项目,项目名称可自定义
- 在项目目录下新建input目录,在input目录下新建1.txt ; 2.txt 用作计算数据源
? 1.txt
hello niubi
nihao niubiplus
scala niubi
spark niubi scala spark
? 2.txt
hello wangting
nihao wang
scala ting
spark wangting scala spark
- 在pom.xml中加入和配置后,点击右边的maven更新组件,这时候会有一个下载依赖的过程,耐心等待下载完成
- 在src/main中创建scala目录,右键mark directory as 改成root,颜色和java相同
- 在scala中新建一个package: com.wangting.spark ;这个可以自定义
- 在com.wangting.spark 包下新建一个Scala class ; 名称为:WordCount,类型为Object
package com.wangting.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc: SparkContext = new SparkContext(conf)
val textRDD: RDD[String] = sc.textFile("E:\\spark_wt_test\\input\\1.txt")
val flatMapRDD: RDD[String] = textRDD.flatMap(_.split(" "))
val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
val res: Array[(String, Int)] = reduceRDD.collect()
res.foreach(println)
sc.stop()
}
}
xml增加打包package配置
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.wangting.spark.WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
注意plugin配置放置位置 build - plugins - plugin
流程梳理

【注意】:
上面只是一个开发流程梳理,每一步实际场景中必然是不一样的,这里只是让熟悉开发的过程经历哪些步骤
-
准备idea并配置maven环境 -
创建一个maven项目 -
项目中可以创建一个input目录,编写一些素材替代去hdfs获取数据来测试代码 -
在src/main中也创建一个类型同java的目录scala( Sources Root ) -
注意pom文件中的依赖配置增加,增加完后可以再右上方附近找到maven,点开找到第一个类似更新的按钮,会去把依赖去下载到本地 -
可以把代码维护管理放置再scala包下 -
最后可以在maven中的Lifecycle选项,使用package打包 -
打包完成后,在项目目录下会有一个target目录;包的名字为pom中定义的:WordCount,流程没有问题则会有一个WordCount.jar包 -
最后把jar包传到服务器上,去运行jar包,到此整个开发流程完结
关联源码
1.官网下载spark-2.1.1源码zip包,解压到本地
2.在idea代码块中ctrl点方法等关联不到源码时,会提示Download Sources / ChooseSources
3.选择ChooseSources ,在路径选择中选择解压的spark源码目录路径
4.关联对应路径后,点击ok导入即完成
本地执行部分版本报错处理
因版本问题,如果本机操作系统是Windows,如果在程序中使用了Hadoop相关的东西,比如写入文件到HDFS,则会遇到如下异常
21/07/29 10:07:41 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$29.apply(SparkContext.scala:1013)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$29.apply(SparkContext.scala:1013)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:179)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:179)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:179)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)


【注意】:只需要配置有这样一个参数,实际路径是否有hadoop部署无关紧要,保证这个参数有,流程能走过去即可,因为实际代码测试时,调用本地的静态文件 处理报错加了环境参数后:

|