Spark简介
Spark 是加州大学伯克利分校AMP 实验室开发的通用内存并行计算框架。Spark 使用Scala 语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据集,具有以下特点。
- 运行速度快:
Spark 拥有DAG 执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce 的10倍以上,如果数据从内存中读取,速度可以高达100多倍。 - 易用性好:
Spark 不仅支持Scala 编写应用程序,而且支持Java 和Python 等语言进行编写,特别是Scala 是一种高效、可拓展的语言,能够用简洁的代码处理较为复杂的处理工作。 - 通用性强:
Spark 生态圈即BDAS (伯克利数据分析栈)包含了Spark Core 、Spark SQL 、Spark Streaming 、MLLib 和GraphX 等组件,这些组件分别处理Spark Core 提供内存计算框架、SparkStreaming 的实时处理应用、Spark SQL 的即席查询、MLlib 或MLbase 的机器学习和GraphX 的图处理。 - 随处运行:
Spark 具有很强的适应性,能够读取HDFS 、Cassandra 、HBase 、S3 和Techyon 为持久层读写原生数据,能够以Mesos 、YARN 和自身携带的Standalone 作为资源管理器调度job ,来完成Spark 应用程序的计算。
Spark结构设计
Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。其中,集群资源管理器可以是Spark自带的资源管理器,也可以是YARN或Mesos等资源管理框架。
在驱动程序中,通过SparkContext主导应用的执行,SparkContext可以连接不同类型的Cluster Manager(YARN、Mesos),连接后,获得集群节点上的Executor,Spark的主入口,每个JVM只能有一个活跃的。
- Spark 2.0+加入了
SparkSession 应用程序的主入口:包含了SparkContext、SQLContext、HiveContext以及StreamingContext - 一个Worker节点默认一个Executor,可通过SPARK_WORKER_INSTANCES调整
搭建Spark Linux集群
Spark 作为一个数据处理框架和计算引擎,被设计在所有常见的集群环境中运行,在国内工作中主流的环境为 YARN 。 接下来我们来学习在强大的Yarn环境下Spark是如何工作的。
先搭建Hadoop的集群环境,确保可以正常工作,在此基础上继续搭建Spark集群。
修改Hadoop配置文件yarn-site.xml
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
Yarn application has already ended! It might have been killed or unable to launch application master.
安装Spark配置环境
[root@node01 conf]
[root@node01 local]
[root@node01 local]
[root@node01 conf]
[root@node01 conf]
export JAVA_HOME=/usr/local/src/jdk
export HADOOP_HOME=/usr/local/src/hadoop
export HADOOP_CONF_DIR=/usr/local/src/hadoop/etc/hadoop
export LD_LIBRARY_PATH=/usr/local/src/hadoop/lib/native
export SPARK_MASTER_IP=master
export SPARK_LOCAL_DIRS=/usr/local/src/spark/tmp
export SPARK_LOG_DIR=/usr/local/src/spark/logs
export SPARK_MASTER_PORT=7077
export SPARK_DRIVER_MEMORY=512M
[root@node01 conf]
node02
node03
[root@node01 conf]
export SPARK_HOME=/usr/local/spark-yarn
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
[root@node01 conf]
[root@node01 local]
[root@node01 conf]
[root@node01 sbin]
[root@node01 spark-yarn]
15600 NameNode
15782 SecondaryNameNode
15927 ResourceManager
16648 Jps
16569 Master
查看Web界面,默认访问端口是 8080
Spark YARN平台启动测试
export LD_LIBRARY_PATH=/usr/local/src/hadoop/lib/native
hdfs dfs -mkdir /spark-yarn-jars
hdfs dfs -put ${SPARK_HOME}/jars/*.jar /spark-yarn-jars
vim spark-defaults.conf
spark.yarn.jars hdfs://master:9000/spark-yarn-jars/*.jar
spark-shell --master yarn --deploy-mode client
在YARN平台上运行Spark提供的默认案例
spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.11-2.0.0.jar 10
--class Spark程序中包含主函数的类
--master Spark程序运行的模式(环境)
Spark RDD简介
MapReduce框架都是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。显然,如果能将结果保存在内存当中,就可以大量减少IO。RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,从而避免了中间结果的落地存储,大大降低了数据复制、磁盘IO和序列化开销。 RDD (Resilient Distributed Datasets 弹性分布式数据集),是Spark 中最重要的概念,可以简单的把RDD 理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际数据被划分为一到多个分区,所有分区数据分布存储于一批机器中(内存或磁盘中),这里的分区可以简单地和Hadoop HDFS里面的文件块来对比理解。 RDD 主要有两大类操作,分别为转换(Transformations )和操作(Actions )。转换主要是指把原始数据集加载到RDD 以及把一个RDD 转换为另外一个RDD 。而操作主要指把RDD 存储到硬盘或触发转换执行。
RDD原理
Spark 中的所有转换都是懒惰(Lazy )操作,它们只是记住了需要这样的转换操作,并不会马上执行,只有等到Actions 操作的时候才会根据RDD的依赖关系生成DAG,并从起点开始真正启动计算过程进行计算。
先经过转换textFile 把数据从HDFS 加载到RDDA 以及RDDC ,这时其实RDDA 和RDDC 中都没有数据的,再到后面的转换flatMap 、map 、reduceByKey 等,分别把RDDA 转换为RDDB 到RDDF 以及RDDC 到RDDE 等,这些转换都是没有执行的。它们相当于先做了个计划,但是并没有具体执行,等到执行操作saveAsSequenceFile 时,才开始真正触发并执行任务。
这一系列处理称为一个“血缘关系(Lineage)",即DAG拓扑排序的结果。采用惰性调用,通过血缘关系连接起来的一系列RDD操作就可以实现管道化 (pipeline),避免了多次转换操作之间数据同步的等待,而且不用担心有过多的中间数据,因为这些具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理。同时,这种通过血缘关系就是把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;相反,在MapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑。
RDD Transformation & Action
常用的Transformation 函数
函数 | 描述 |
---|
map(f:T=>U) | 对RDD 数据集中的每个元素都使用f ,返回一个新的RDD | filter(f:T=>Bool) | 对RDD 数据集中的每个元素都使用f ,返回使用f 为true 的元素构成的RDD | flatMap(f:T=>seq(U)) | 与map 相似,但是每个输入项都可以映射到0个或多个输出项(因此func应该返回Seq而不是单个项) | distinct([n]) | 返回一个新的数据集,其中包含源数据集的不同元素。 | groupByKey(numTasks) | 返回(K,Seq(V)) ,根据相同的键分组 | sortByKey(asc) | 在(K,V) 对的数据集上调用时,返回(K,V) 对的数据集,按布尔值指定,按键以升序或降序排序asc | reduceByKey(f:(V,V)=>V) | 用一个给定的f 作用在groupByKey 而产生的(K,Seq(V)) ,比如求和 | union(otherDataset) | 返回一个新的dataset ,包含源dataset 和给定dataset 的元素的集合 |
常用的Action 函数
函数 | 描述 |
---|
reduce(f:(T,T)=>T) | 通过函数f 聚集数据集中的所有元素,f 函数接收两个参数,返回一个值 | collect() | 返回数据集中所有的元素 | count(p:Int=>Bool) | 返回数据集中所有元素的个数 | take(n:Int) | 返回前n 个元素 | saveAsTextFile(path) | 将数据集的元素以textfile 的形式保存到本地文件系统、hdfs 或者任何其它Hadoop 支持的文件系统 | saveAsObjectFile(path) | 使用Java 序列化以简单的格式编写数据集的元素,然后可以使用进行加载SparkContext.objectFile 。 | foreach(f:T=>U) | 对数据集中的每个元素都执行函数f |
创建RDD
RDD是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点的存放元素的集合。RDD有3种不同的创建方法。一种是对程序种存在的基本数据结构中的集合进行并行化(如Set、List、Array),另一种是通过已有RDD转换得到新的RDD,这两种都是通过内存已有集合创建RDD。还有一种是直接读取外部存储的数据集。
val spark = SparkSession.builder().master("local").appName("RDD Transform").getOrCreate()
val rdd = spark.sparkContext.makeRDD(List(1, 2, 3, 4, 5))
def mapFn(num: Int): Int = num * 2
val mapRDD = rdd.map(mapFn)
val mapRDD = rdd.map(_*2)
mapRDD.foreach(println)
从内存中已有的数据创建RDD
parallelize方式
SparkContext 类中有两个方法:parallelize 和makeRDD 。通过parallelize 或makeRDD 可将单机数据创建为分布式RDD。 parallelize(Seq seq, int numSlices) 有两个参数可以输入,要转换的集合,必须是Seq 集合,分区个数,默认是2。
Seq是一种序列。所谓序列,指的是一类具有一定长度的可迭代访问的对象,其中每个元素均带有一个从0开始计数的固定索引位置。
val rdd = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6), 1)
val sum = rdd.reduce(_ + _)
print(sum, rdd.partitions.size)
makeRDD方式
makeRDD(seq:Seq[T], numSlices) ,底层通过parallelize 函数把一般数据结构加载为RDD ,默认分区是1
val rdd = spark.sparkContext.makeRDD(Seq(1, 2, 3, 4, 5, 6), 1)
rdd.groupBy(item => item % 2).foreach(println)
(0,CompactBuffer(2, 4, 6))
(1,CompactBuffer(1, 3, 5))
从外部存储创建RDD
从外部存储创建RDD 是指直接读取一个存放在文件系统的数据文件创建RDD ,通过SparkContext 对象的textFile 方法读取数据集,支持多种类型数据集,如目录、文本文件、压缩文件和通配符匹配的文件等。并且允许设定分区个数。
spark.sparkContext.textFile("spark_core_1\\src\\main\\resources\\1.txt")
.flatMap(item => item.split(" "))
.map(item => (item, 1))
.reduceByKey(_+_)
.foreach(println)
(Spark,1)
(Hello,2)
(Scala,1)
RDD转换和操作函数
map :将原来RDD的每个数据项通过map中的用户自定义函数f转换成一个新的RDD,map操作不会改变RDD的分区数目
spark.sparkContext.parallelize(List(1, 2, 3, 4)).map(item => item + 10).foreach(println)
spark.sparkContext.parallelize(List("Hello", "Python", "Hello", "Hadoop", "Spark")).map(item => (item, 1)).foreach(println)
flatMap :使用flatMap对集合中的每个元素进行操作再扁平化(把二维数组拉平为一维数组)
spark.sparkContext.parallelize(List("Hello Spark", "Hello Scala", "Hello Spark","Spark")).map(item=>item.split(" ")).foreach(println)
[["Hello","Spark"],["Hello","Scala"],["Hello","Spark"],["Spark"]]
spark.sparkContext.parallelize(List("Hello Spark", "Hello Scala", "Hello Spark","Spark")).flatMap(item=>item.split(" ")).foreach(println)
["Hello","Spark","Hello","Scala","Hello","Spark","Spark"]
val rdd = spark.sparkContext.makeRDD(List(List(1, 2), 3, List(4, 5)))
rdd.flatMap(item=>{
item match {
case list: List[_] => list
case data: Int => List(data)
}
})
rdd.flatMap{
case list: List[_] => list
case data: Int => List(data)
}
sortBy :是对标准RDD进行排序的方法,可接受三个参数
- 参数1是一个函数
f:(T) => K ,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。 - 参数2是
ascending ,决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序。 - 参数3是
numPartitions ,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为 this.partitions.size 。
val rdd = spark.sparkContext.parallelize(List(("Hello", 3), ("Spark", 2), ("Scala", 8), ("Java", 2)))
rdd.sortBy(item => item._1, ascending = false, 1).foreach(println)
(Scala,8)
(Hello,3)
(Spark,2)
(Java,2)
sortByKey :作用于 Key-Value 形式的 RDD ,并对 Key 进行排序,可接受两个参数,分别是 ascending 和 numPartitions
val rdd = spark.sparkContext.parallelize(List(("Hello", 3), ("Spark", 2), ("Scala", 8), ("Java", 2)))
rdd.sortByKey(ascending = true, 1).foreach(println)
(Hello,3)
(Java,2)
(Scala,8)
(Spark,2)
take :用于获取 RDD 中从 0 到 size-1 下标的元素
val rdd = spark.sparkContext.parallelize(List(("Hello", 3), ("Spark", 2), ("Scala", 8), ("Java", 2)))
rdd.take(2).foreach(println)
(Hello,3)
(Spark,2)
filter :对元素进行过滤,对每个元素应用给定的函数,返回值为true的元素在RDD中保留,为false的则被过滤掉
val rdd = spark.sparkContext.parallelize(List(1, 2, 3, 4, 5, 6, 8, 9, 0, 1, 1, 2, 3, 4, 4, 5))
rdd.filter(item => item % 2 == 0).foreach(println)
distinct** :去重,针对RDD中重复的元素,只保留一个元素
val rdd = spark.sparkContext.parallelize(List(1, 2, 3, 4, 5, 6, 8, 9, 0, 1, 1, 2, 3, 4, 4, 5))
rdd.distinct.foreach(println)
union :合并RDD,需要保证两个RDD元素类型一致
val rdd1 = spark.sparkContext.parallelize(List(1, 2, 3))
val rdd2 = spark.sparkContext.parallelize(List(3, 4, 5))
rdd1.union(rdd2).foreach(println)
groupBy :根据指定参数进行分组
val rdd = spark.sparkContext.makeRDD(List("Hello", "Spark", "Hello", "Scala"))
rdd.groupBy(item => item)
.map{case (word, itr) => (word, itr.size)}
.foreach(println)
(Spark,1)
(Hello,2)
(Scala,1)
collect :返回RDD中所有的元素 count() :计算RDD中所有元素个数 saveAsTextFile :把RDD保存到HDFS中
键值对RDD操作
有很多种创建键值对RDD的方式,很多存储键值对的数据格式会在读取时直接返回由其键值对组成的PairRDD。 join :把两个rdd根据键值进行分组整合起来
val rdd1 = spark.sparkContext.parallelize(List(("K1","V1"),("K2","V2"),("K3","V3")))
val rdd2 = spark.sparkContext.parallelize(List(("K1","W1"),("K2","W2")))
rdd1.join(rdd2).foreach(println)
zip :将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
val rdd1 = spark.sparkContext.makeRDD(1 to 5,2)
val rdd2 = spark.sparkContext.makeRDD(Seq("A","B","C","D","E"),2)
rdd1.zip(rdd2).foreach(println)
keys /values :获取键值对的所有键/值
val kvRDD = spark.sparkContext.makeRDD(List(("Hello", 2), ("Spark", 4), ("Hello", 4), ("Scala", 11)))
kvRDD.keys.foreach(println)
kvRDD.values.foreach(println)
mapValues :是针对键值对(Key,Value)类型的数据中的Value进行Map操作,而不对Key进行处理
kvRDD.mapValues(item => item * 2).foreach(println)
lookup :返回指定K的所有V值
val kvRDD = spark.sparkContext.makeRDD(List(("Hello", 2), ("Spark", 4), ("Hello", 4), ("Scala", 11)))
kvRDD.lookup("Spark").foreach(println)
reduceByKey :把相同key的value进行操作
val kvRDD = spark.sparkContext.makeRDD(List(("Hello", 2), ("Spark", 4), ("Hello", 4), ("Scala", 11)))
kvRDD.reduceByKey((num1, num2) => num1 + num2).foreach(println)
keyBy :为各个元素,按指定的函数生成key,形成key-value 的RDD。
val rdd = spark.sparkContext.parallelize(List("dog","tiger","lion","cat","eagle"))
rdd.keyBy(item=>item.length).foreach(println)
创建Maven项目
Spark由Scala 语言开发的,接下来的开发所使用的语言也为Scala,当前使用的Spark版本为2.4.7 ,默认采用的Scala编译版本为2.11.7 。开发前请保证IDEA开发工具中含有Scala开发插件。
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.4</version>
</dependency>
</dependencies>
编写WordCount案例
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("wordCount")
val context = new SparkContext(conf)
val rdd = context.textFile("spark_core_1\\src\\main\\resources\\*.txt")
val wordRDD = rdd.flatMap(item=>item.split(" "))
val groupRDD = wordRDD.groupBy(item=>item)
val wordCountRDD = groupRDD.map(item=>(item._1,item._2.size))
wordCountRDD.collect.foreach(item=>println(item))
wordCountRDD.repartition(1).saveAsTextFile("spark_core_1\\output\\demo1")
}
Spark聚合优化
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("wordCount")
val context = new SparkContext(conf)
val rdd = context.textFile("spark_core_1\\src\\main\\resources\\*.txt")
val wordRDD = rdd.flatMap(item => item.split(" "))
val wordTupleRDD = wordRDD.map(item => (item, 1))
val wordCountRDD = wordTupleRDD.reduceByKey((num1, num2) => num1 + num2)
wordCountRDD.collect.foreach(item=>println(item))
wordCountRDD.repartition(1).saveAsTextFile("spark_core_1\\output\\demo2")
}
使用IDEA打包案例到Spark集群运行
package com.example
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("wordCount")
val context = new SparkContext(conf)
context.textFile(args(0)).flatMap(line=>line.split(",")).groupBy(word=>word).map(item=>(item._1,item._2.size)).foreach(println)
}
}
先使用 maven 的 clear 命令清空项目,再使用 **IDEA Build** 菜单编译项目,最后使用maven 的 package 打包,并上传到 /root/spark_data 目录备用。
[root@master ~]$ spark-submit --master local --class com.example.WordCount spark_core-1.0-SNAPSHOT.jar file:///root/spark_data/words.txt
[root@master ~]$ spark-submit --master local --class com.example.WordCount spark_core-1.0-SNAPSHOT.jar /data/words.txt
(spark,4)
(hive,2)
(hadoop,4)
(java,2)
文件的存储与读取
在实际开发中,所要读取的文本格式不仅仅只是普通的文本文件,还包含其他类型的文本,例如JSON 、SequenceFile 。并且,当数据计算或处理结束后,通常需要将结果保存,用于应用或分析。 Spark支持一些常见文件格式如下:
格式名称 | 结构化 | 描述 |
---|
文本文件 | 否 | 普通的文本文件,每一行一条记录 | JSON | 半结构化 | 常见的基于文本的格式,半结构化;大多数库都要求每一行一条记录 | CSV | 是 | 非常常见的基于文本的格式,通常在电子表格应用中使用 | SequenceFile | 是 | 一种用于键值对数据的文件,常见Hadoop文件格式 |
JSON文件的读取与存储
- JSON文件读取,将数据作为文本文件,对JSON数据进行解析。这种方法要求文件每行是一条JSON记录,如果记录跨行,就需要读取整个文件,对每个文件进行解析。
{"name":"jack","age":12}
{"name":"lili","age":22}
{"name":"cc","age":11}
{"name":"vv","age":13}
{"name":"lee","age":14}
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
import com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDD {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("json read write")
val sc = new SparkContext(conf)
val jsonRDD = sc.textFile("file:///D:\\spark_data\\people.json")
.map(JSON.parseObject)
.map(item=>(item.getString("name").toUpperCase,item.getInteger("age")+10))
jsonRDD.repartition(1).saveAsTextFile("output/json")
}
}
CSV文件的读取与存储
逗号分隔值(CSV )文件每行都有固定数据的字段,字段间用逗号隔开。在CSV 的所有数据字段剧没有包含换行符的情况下,可以使用textFile 读取并解析数据。如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析各段。
# CSV格式
name;age;job
Jorge;30;Developer
Bob;32;Developer
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("json read write")
val sc = new SparkContext(conf)
val csvRDD = sc.textFile("file:///D:\\spark_data\\people.csv")
.map(line=>line.split(";").toList)
.map(item=>item.mkString(","))
csvRDD.repartition(1).saveAsTextFile("/output/csv")
}
SequenceFile的读取与存储
SequenceFile 是由没有相对关系结构的键值对组成的常用Hadoop格式。
SequenceFile 有专门读取接口,可以调用SparkContext 的sequenceFile(path,keyClass,valueClass,minPartitions) 实现。SequenceFile 使用的是Hadoop 的Writable 类型,所以keyClass 和valueClass 参数必须定义为正确的Writable 类。SequenceFile 文件的存储首先需要保证有一个键值对类型的RDD ,然后直接调用saveSequenceFile(path) 保存数据,自动将键和值转化为Writable 类型。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.{IntWritable, Text}
object SparkRDD {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("json read write")
val sc = new SparkContext(conf)
sc.sequenceFile("output/sequence",classOf[Text],classOf[IntWritable])
.map{case (x,y)=>(x.toString,y.get)}
.foreach(println)
}
}
|