一、spark
(一)介绍
运行速度快,比hadoop MapReduce快的多 包含:Spark SQL,Spark Streaming、Mlib (machine learning) 、Graphx(graph)
(Ⅰ)spark起源
Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的,后贡献给Apache。是一种快速、通用、可扩展的大数据分析引擎。它是不断壮大的大数据分析解决方案家族中备受关注的明星成员,为分布式数据集的处理提供了一个有效框架,并以高效的方式处理分布式数据集。Spark集批处理、实时流处理、交互式查询、机器学习与图计算于一体,避免了多种运算场景下需要部署不同集群带来的资源浪费。目前,Spark社区也成为大数据领域和Apache软件基金会最活跃的项目之一,其活跃度甚至远超曾经只能望其项背的Hadoop。
(Ⅱ)Spark的技术背景
1、无论是工业界还是学术界,都已经广泛使用高级集群编程模型来处理日益增长的数据,如MapReduce和Dryad。这些系统将分布式编程简化为自动提供位置感知性调度、容错以及负载均衡,使得大量用户能够在商用集群上分析超大数据集。
2、大多数现有的集群计算系统都是基于非循环的数据流模型。即从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG(Directed Acyclic Graph,有向无环图),然后写回稳定存储。DAG数据流图能够在运行时自动实现任务调度和故障恢复。
3、尽管非循环数据流是一种很强大的抽象方法,但仍然有些应用无法使用这种方式描述。这类应用包括:①机器学习和图应用中常用的迭代算法(每一步对数据执行相似的函数) ②交互式数据挖掘工具(用户反复查询一个数据子集)
4、基于数据流的框架并不明确支持工作集,所以需要将数据输出到磁盘,然后在每次查询时重新加载,这会带来较大的开销。针对上述问题,Spark实现了一种分布式的内存抽象,称为弹性分布式数据集(Resilient Distributed Dataset,RDD )。它支持基于工作集的应用,同时具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
(二)spark vs MapReduce
hadoop的问题在于一个hadoop job会进行多次磁盘的读写 spark允许在内存中输入输出
(Ⅰ)MapReduce存在的问题
1、一个 Hadoop job 通常都是这样的: 1)从 HDFS 读取输入数据; 2)在 Map 阶段使用用户定义的 mapper function, 然后把结果Spill到磁盘; 3)在 Reduce 阶段,从各个处于 Map 阶段的机器中读取 Map 计算的中间结果,使用用户定义的 reduce function, 通常最后把结果写回 HDFS;
2、Hadoop的问题在于,一个 Hadoop job 会进行多次磁盘读写,比如写入机器本地磁盘,或是写入分布式文件系统中(这个过程包含磁盘的读写以及网络传输)。考虑到磁盘读取比内存读取慢了几个数量级,所以像 Hadoop 这样高度依赖磁盘读写的架构就一定会有性能瓶颈。
3、此外,在实际应用中我们通常需要设计复杂算法处理海量数据, 而且不是一个 Hadoop job 可以完成的。比如机器学习领域,需要大量使用迭代的方法训练机器学习模型。而像 Hadoop 的基本模型就只包括了一个 Map 和 一个 Reduce 阶段,想要完成复杂运算就需要切分出无数单独的 Hadoop jobs, 而且每个 Hadoop job 都是磁盘读写大户,这就让 Hadoop 显得力不从心。
4、随着业界对大数据使用越来越深入,大家都呼唤一个更强大的处理框架,能够真正解决更多复杂的大数据问题。
(Ⅱ)Spark的优势
1、2009年,美国加州大学伯克利分校的 AMPLab 设计并开发了名叫 Spark 的大数据处理框架。真如其名,Spark 像燎原之火,迅猛占领大数据处理框架市场。
2、Spark 没有像 Hadoop 一样使用磁盘读写,内存存储输入数据而转用性能高得多的、处理中间结果、和存储最终结果。在大数据的场景中,很多计算都有循环往复的特点,像 Spark 这样允许在内存中缓存输入输出,上一个 job 的结果马上可以被下一个使用,性能自然要比 Hadoop MapReduce 好得多。
3、同样重要的是,Spark 提供了更多灵活可用的数据操作,比如 filter, join, 以及各种对 key value pair 的方便操作,甚至提供了一个通用接口,让用户根据需要开发定制的数据操作。
4、此外,Spark 本身作为平台也开发了 streaming 处理框架 spark streaming, SQL 处理框架 Dataframe, 机器学习库 MLlib, 和图处理库 GraphX. 如此强大,如此开放,基于 Spark 的操作,应有尽有。
(Ⅲ)Hadoop 的 MapReduce 为什么不使用内存存储?
是历史原因。当初 MapReduce 选择磁盘,除了要保证数据存储安全以外,更重要的是当时企业级数据中心购买大容量内存的成本非常高,选择基于内存的架构并不现实;现在 Spark 真的赶上了好时候,企业可以轻松部署多台大内存机器,内存大到可以装载所有要处理的数据。
二、spark安装
(一)单机模式
(Ⅰ)上传文件
(Ⅱ)解压并修改文件名
[root@hadoop01 software]# cd /home/software/
[root@hadoop01 software]# tar -zxvf spark-2.0.1-bin-hadoop2.7.tgz
[root@hadoop01 software]# mv spark-2.0.1-bin-hadoop2.7/ spark
(Ⅲ)修改配置文件
[root@hadoop01 spark]# cd /home/software/spark/conf
[root@hadoop01 conf]# cp spark-env.sh.template spark-env.sh
[root@hadoop01 conf]# vim spark-env.sh
[root@hadoop01 conf]#
添加如下内容
SPARK_LOCAL_IP=hadoop01
(Ⅳ)启动
[root@hadoop01 spark]# cd /home/software/spark/bin
[root@hadoop01 bin]# sh spark-shell --master=local
在浏览器访问,若在windows上配置过hadoop01的ip可输入:hadoop01:4040否则,输入IP地址:4040(如192.168.232.129:4040)
Spark context available as 'sc' (master = local, app id = local-1490336686508).
Spark session available as 'spark'.
(二)集群模式
(三)On yarn(集群,资源管理交给yarn调度)
三、RDD
(一)RDD介绍
(Ⅰ)概述
1、RDD就是带有分区的集合类型 弹性分布式数据集(RDD),特点是可以并行操作,并且是容错的。有两种方法可以创建RDD: 1)执行Transform操作(变换操作), 2)读取外部存储系统的数据集,如HDFS,HBase,或任何与Hadoop有关的数据源。 2、它是spark提供的一个特殊集合类。 1)诸如普通的集合类型,如传统的Array:(1,2,3,4,5)是一个整体,但转换成RDD后,我们可以对数据进行Partition(分区)处理,这样做的目的就是为了分布式。 2)可以让这个RDD有两个分区,那么有可能是这个形式:RDD(1,2) (3,4)。这样设计的目的在于:可以进行分布式运算。 3、在spark交互模式下,编写函数时,可以通过tab提示【自动补全代码】
(Ⅱ)查看RDD
1、scala>rdd.collect 收集rdd中的数据组成Array返回,此方法将会把分布式存储的rdd中的数据集中到一台机器中组建Array。 在生产环境下一定要慎用这个方法,容易内存溢出。
2、查看RDD的分区数量:scala>rdd.partitions.size
3、查看RDD每个分区的元素:scala>rdd.glom.collect 此方法会将每个分区的元素以Array形式返回
4、不以分区显示rdd中数据:rdd.collect;以分区的形式显示rdd中的数据: rdd.glom.collect
(Ⅲ)分区概念
在上图中, 一个RDD有item1~item25个数据,共5个分区,分别在3台机器上进行处理。
此外,spark并没有原生的提供rdd的分区查看工具 我们可以自己来写一个 示例代码:
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
object su {
def debug[T: ClassTag](rdd: RDD[T]) = {
rdd.mapPartitionsWithIndex((i: Int, iter: Iterator[T]) => {
val m = scala.collection.mutable.Map[Int, List[T]]()
var list = List[T]()
while (iter.hasNext) {
list = list :+ iter.next
}
m(i) = list
m.iterator
}).collect().foreach((x: Tuple2[Int, List[T]]) => {
val i = x._1
println(s"partition:[$i]")
x._2.foreach { println }
})
}
}
(Ⅳ)简单使用(RDD的创建与查看)
创建数组
scala> val data = Array(1,2,3,4,5,6)
data: Array[Int] = Array(1, 2, 3, 4, 5, 6)
转化(创建)为RDD(两种方式)
scala> val rdd = sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val rdd2 = sc.makeRDD(data)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:26
不以分区的形式显示rdd中的数据
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6)
以分区的形式显示rdd中的数据
scala> rdd.glom.collect
res1: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6))
分两个区建立RDD,分区是为了提高处理数据的并行度
scala> val rdd3 = sc.makeRDD(data,2)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:26
查看RDD的分区数量
scala> rdd3.partitions.size
res2: Int = 2
scala> rdd3.glom.collect
res3: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))
从本地文件读取数据生成RDD
scala> val rdd4 = sc.textFile("/home/bok")
rdd4: org.apache.spark.rdd.RDD[String] = /home/bok MapPartitionsRDD[6] at textFile at <console>:24
scala> rdd4.collect
res4: Array[String] = Array(shuihu`)
[root@hadoop01 home]# hadoop fs -put bok /
从hadoop文件读取数据生成RDD
scala> val rdd4 = sc.textFile("hdfs://hadoop01:9000/bok")
rdd4: org.apache.spark.rdd.RDD[String] = hdfs://hadoop01:9000/bok MapPartitionsRDD[8] at textFile at <console>:24
scala> rdd4.collect
res5: Array[String] = Array(shuihu`)
生成RDD的方法:
val rdd1 = sc.parallelize(data)
val rdd2 = sc.parallelize(data,2)
val rdd3 = sc.makeRDD(data)
(本地)
val rdd4 = sc.textFile("/home/bok")
(远程 hadoop)
val rdd4 = sc.textFile("hdfs://hadoop01:9000/bok")
当数据丢失时,hadoop会进入安全模式,如果一直处于安全模式,可以手动退出安全模式:hadoop dfsadmin -safemode leave
(二) RDD操作
(Ⅰ)Transformation(变换)操作
Transformation(变换)操作属于懒操作(算子),不会真正触发RDD的处理计算。
(Ⅱ)action操作
才会触发真正的计算
(Ⅲ)具体操作
1、mapPartitions
scala> val rdd = sc.makeRDD(List(1,2,3,4,5,6),2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at <console>:24
scala> rdd.glom.collect
res8: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))
scala> val r1 = rdd.mapPartitions{x=>{
| val result=List[Int]()
| var i = 0
| while(x.hasNext){
| i += x.next()
| }
| result.::(i).iterator
| }}
r1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at mapPartitions at <console>:26
scala> r1.collect
res9: Array[Int] = Array(6, 15)
2、集合操作(并、交、差、去重)
scala> val rdd1 = sc.makeRDD(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at makeRDD at <console>:24
scala> val rdd2 = sc.makeRDD(3 to 9)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at makeRDD at <console>:24
scala> val r1 = rdd1.union(rdd2)
r1: org.apache.spark.rdd.RDD[Int] = UnionRDD[15] at union at <console>:28
scala> r1.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7, 8, 9)
scala> val r1 = rdd1.intersection(rdd2)
r1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at intersection at <console>:28
scala> r1.collect
res11: Array[Int] = Array(4, 3, 5)
scala> val r1 = rdd1.subtract(rdd2)
r1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at subtract at <console>:28
scala> r1.collect
res12: Array[Int] = Array(1, 2)
scala> val r1 = rdd1.union(rdd2).distinct
r1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at distinct at <console>:28
scala> r1.collect
res13: Array[Int] = Array(4, 6, 8, 2, 1, 3, 7, 9, 5)
scala>
3、groupBy
scala> val rdd = sc.parallelize(List(("cat",2), ("dog",5),("cat",4),("dog",3),("cat",6),("dog",3),("cat",9),("dog",1)),2);
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:24
scala> rdd.groupByKey()
res14: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[31] at groupByKey at <console>:27
scala> res14.collect
res15: Array[(String, Iterable[Int])] = Array((dog,CompactBuffer(5, 3, 3, 1)), (cat,CompactBuffer(2, 4, 6, 9)))
4、统计单词出现的次数
scala> sc.textFile("hdfs://hadoop01:9000/txt/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hadoop01:9000/txt/result")
(三)RDD依赖关系
(Ⅰ)RDD的依赖关系
RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。 1)窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用,如下图所示。 2)宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition。
我们可以从不同类型的转换来进一步理解RDD的窄依赖和宽依赖的区别,如下图所示。
(Ⅱ)窄依赖
对于窄依赖操作,它们只是将Partition的数据根据转换的规则进行转化,并不涉及其他的处理,可以简单地认为只是将数据从一个形式转换到另一个形式。
窄依赖底层的源码:
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}
**所以对于窄依赖,并不会引入昂贵的Shuffle。**所以执行效率非常高。如果整个DAG中存在多个连续的窄依赖,则可以将这些连续的窄依赖整合到一起连续执行,中间不执行shuffle 从而提高效率,这样的优化方式称之为流水线优化。 此外,针对窄依赖,如果子RDD某个分区数据丢失,只需要找到父RDD对应依赖的分区,恢复即可。但如果是宽依赖,当分区丢失时,最糟糕的情况是要重算所有父RDD的所有分区。
(Ⅲ)宽依赖
对于groupByKey这样的操作,子RDD的所有Partition(s)会依赖于parent RDD的所有Partition(s),子RDD的Partition是parent RDD的所有Partition Shuffle的结果。
宽依赖的源码:
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle =
_rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
(Ⅳ)Shuffle概述
spark中一旦遇到宽依赖就需要进行shuffle的操作,所谓的shuffle的操作的本质就是将数据汇总后重新分发的过程。
这个过程数据要汇总到一起,数据量可能很大所以不可避免的需要进行数据落磁盘的操作,会降低程序的性能,所以spark并不是完全内存不读写磁盘,只能说它尽力避免这样的过程来提高效率 。
spark中的shuffle,在早期的版本中,会产生多个临时文件,但是这种多临时文件的策略造成大量文件的同时的读写,磁盘的性能被分摊给多个文件,每个文件读写效率都不高,影响spark的执行效率。所以在后续的spark中(1.2.0之后的版本)的shuffle中,只会产生一个文件,并且数据会经过排序再附加索引信息,减少了文件的数量并通过排序索引的方式提升了性能。
(四)RDD容错机制
分布式系统通常在一个机器集群上运行,同时运行的几百台机器中某些出问题的概率大大增加,所以容错设计是分布式系统的一个重要能力。
Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。
RDD也是一个DAG,每一个RDD都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark里面叫lineage(血缘关系)。当一个RDD的某个分区丢失时,RDD是有足够的信息记录其如何通过其他RDD进行计算,且只需重新计算该分区,这是Spark的一个创新。
(五)RDD的缓存
(Ⅰ)概述
相比Hadoop MapReduce来说,Spark计算具有巨大的性能优势,其中很大一部分原因是Spark对于内存的充分利用,以及提供的缓存机制。
(Ⅱ)RDD持久化(缓存)
持久化在早期被称作缓存(cache),但缓存一般指将内容放在内存中。虽然持久化操作在绝大部分情况下都是将RDD缓存在内存中,但一般都会在内存不够时用磁盘顶上去(比操作系统默认的磁盘交换性能高很多)。当然,也可以选择不使用内存,而是仅仅保存到磁盘中。所以,现在Spark使用持久化(persistence)这一更广泛的名称。
如果一个RDD不止一次被用到,那么就可以持久化它,这样可以大幅提升程序的性能,甚至达10倍以上。
默认情况下,RDD只使用一次,用完即扔,再次使用时需要重新计算得到,而持久化操作避免了这里的重复计算,实际测试也显示持久化对性能提升明显,这也是Spark刚出现时被人称为内存计算框架的原因。
假设首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0→RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度可以得到很大提升。 持久化的方法是调用persist()函数,除了持久化至内存中,还可以在persist()中指定storage level参数使用其他的类型,具体如下:
1)MEMORY_ONLY : 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中. 如果内存空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算. 这是默认的级别。
cache()方法对应的级别就是MEMORY_ONLY级别
2)MEMORY_AND_DISK:将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
3)MEMORY_ONLY_SER :将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serialize时会节省更多的空间,但是在读取时会使得 CPU 的 read 变得更加密集。如果内存空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算。
4)MEMORY_AND_DISK_SER :类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
5)DISK_ONLY:只在磁盘上缓存 RDD。
6)MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
7)OFF_HEAP 将数据存储在 off-heap memory 中。使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。使用堆外内存的好处:可能会利用到更大的内存存储空间。但是对于数据的垃圾回收会有影响,需要程序员来处理
注意,可能带来一些GC回收问题。
Spark 也会自动持久化一些在 shuffle 操作过程中产生的临时数据(比如 reduceByKey),即便是用户并没有调用持久化的方法。这样做可以避免当 shuffle 阶段时如果一个节点挂掉了就得重新计算整个数据的问题。如果用户打算多次重复使用这些数据,我们仍然建议用户自己调用持久化方法对数据进行持久化。
(Ⅲ)使用缓存
scala> import org.apache.spark.storage._ scala> val rdd1=sc.makeRDD(1 to 5) scala> rdd1.cache //cache只有一种默认的缓存级别,即MEMORY_ONLY scala> rdd1.persist(StorageLevel.MEMORY_ONLY)
(Ⅳ)缓存数据的清除
Spark 会自动监控每个节点上的缓存数据,然后使用 least-recently-used (LRU) 机制来处理旧的缓存数据。如果你想手动清理这些缓存的 RDD 数据而不是去等待它们被自动清理掉, 可以使用 RDD.unpersist( ) 方法。
四、DAG
(一)概念
Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。
Spark Scala版本的Word Count程序如下: 1: val file=sc.textFile(“hdfs://hadoop01:9000/hello1.txt”) 2: val counts = file.flatMap(line => line.split(" ")) 3: .map(word => (word, 1)) 4: .reduceByKey(_ + _) 5: counts.saveAsTextFile(“hdfs://…”)
file和counts都是RDD,其中file是从HDFS上读取文件并创建了RDD,而counts是在file的基础上通过flatMap、map和reduceByKey这三个RDD转换生成的。最后,counts调用了动作saveAsTextFile,用户的计算逻辑就从这里开始提交的集群进行计算。那么上面这5行代码的具体实现是什么呢?
1)行1:sc是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口,会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。 sc.textFile(“hdfs://…”)就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。
2)行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD。
3)行3:将第2步生成的MapPartittionsRDD再次经过map将每个单词word转为(word,1)的元组。这些元组最终被放到一个MapPartitionsRDD中。
4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。
5)行5:向HDFS输出RDD的数据内容。最后,调用org.apache.spark.SparkContext#runJob向集群提交这个计算任务。
RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么;还有就是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖,org.apache.spark.Dependency。根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖。
(二)DAG的生成与Stage的划分
(Ⅰ)DAG的生成
原始的RDD(s)通过一系列转换就形成了DAG。RDD之间的依赖关系,包含了RDD由哪些Parent RDD(s)转换而来和它依赖parent RDD(s)的哪些Partitions,是DAG的重要属性。
借助这些依赖关系,DAG可以认为这些RDD之间形成了Lineage(血统,血缘关系)。借助Lineage,能保证一个RDD被计算前,它所依赖的parent RDD都已经完成了计算;同时也实现了RDD的容错性,即如果一个RDD的部分或者全部的计算结果丢失了,那么就需要重新计算这部分丢失的数据。
RDD是分布式的,弹性的,容错的数据结构
(Ⅱ)Spark的Stage(阶段)
Spark在执行任务(job)时,首先会根据依赖关系,将DAG划分为不同的阶段(Stage)。
处理流程是: 1)Spark在执行Transformation类型操作时都不会立即执行,而是懒执行(计算) 2)执行若干步的Transformation类型的操作后,一旦遇到Action类型操作时,才会真正触发执行(计算) 3)执行时,从当前Action方法向前回溯,如果遇到的是窄依赖则应用流水线优化,继续向前找,直到碰到某一个宽依赖 4)因为宽依赖必须要进行shuffle,无法实现优化,所以将这一次段执行过程组装为一个stage 5)再从当前宽依赖开始继续向前找。重复刚才的步骤,从而将整个DAG还分为若干的stage 在stage内部可以执行流水线优化,而在stage之间没办法执行流水线优化,因为有shuffle。但是这种机制已经尽力的去避免了shuffle。
(Ⅲ)Spark的Job和Task
原始的RDD经过一系列转换后(一个DAG),会在最后一个RDD上触发一个动作,这个动作会生成一个Job。 **所以可以这样理解:**一个DAG对应一个Spark的Job。
在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算 Spark的Task分为两种: 1)org.apache.spark.scheduler.ShuffleMapTask 2)org.apache.spark.scheduler.ResultTask 简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成Shuff fleMapTask。
(Ⅳ)可视化理解窄依赖和宽依赖
案例 单词统计
scala>val data=sc.textFile("/home/software/hello.txt",2) scala> data.flatMap(.split(" ")).map((,1)).reduceByKey(+).collect
1)打开web页面控制台(ip:4040端口地址),刷新,会发现刚才的操作会出现在页面上 2)点击 Description下的 collect at…… 进入job的详细页面 3)点击 DAG Visualization 会出现如下图形
五、案例
(一)案例一:WordCount
实现步骤 1)创建spark的项目 在scala中创建项目 导入spark相关的jar包 jars中的所有如下图 2)开发spark相关代码 代码示例:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCountDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("spark://hadoop01:9000").setAppName("wordcount")
val sc=new SparkContext(conf)
val data=sc.textFile("hdfs://hadoop01:9000/txt/words.txt", 2)
val result=data.flatMap { x => x.split(" ") }.map { x => (x,1) }.reduceByKey(_+_)
result.saveAsTextFile("hdfs://hadoop01:9000/wcresult")
}
}
3)将写好的项目打成jar,上传到服务器,进入bin目录 执行:spark-submit --class cn.tedu.WordCountDriver /home/software/spark/conf/wc.jar
(二)案例二:求平均值
案例文件: 1 16 2 74 3 51 4 35 5 44 6 95 7 5 8 29 10 60 11 13 12 99 13 7 14 26 正确答案:42
代码示例一:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AverageDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("AverageDriver")
val sc=new SparkContext(conf)
val data=sc.textFile("d://average.txt")
val ageData=data.map { line=>{line.split(" ")(1).toInt}}
val ageSum=ageData.reduce(_+_)
val pepopleCount=data.count()
val average=ageSum/pepopleCount
println(average)
}
}
代码示例二:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AverageDriver {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("AverageDriver")
val sc=new SparkContext(conf)
val data=sc.textFile("d://average.txt",3)
val ageData=data.map { line=>{line.split(" ")(1).toInt}}
val ageSum=ageData.mapPartitions{it=>{
val result=List[Int]()
var i=0
while(it.hasNext){
i+=it.next()
}
result.::(i).iterator
}}.reduce(_+_)
val pepopleCount=data.count()
val average=ageSum/pepopleCount
println(average)
}
}
六、Spark
(一)Spark框架核心概念
1.RDD。弹性分布式数据集,是Spark最核心的数据结构。有分区机制,所以可以分布式进行处理。有容错机制,通过RDD之间的依赖关系来恢复数据。 2.依赖关系。RDD的依赖关系是通过各种**Transformation(变换)**来得到的。父RDD和子RDD之间的依赖关系分两种:①窄依赖 ②宽依赖 ①针对窄依赖:父RDD的分区和子RDD的分区关系是:一对一 窄依赖不会发生Shuffle,执行效率高,spark框架底层会针对多个连续的窄依赖执行流水线优化,从而提高性能。例如 map flatMap等方法都是窄依赖方法 ②针对宽依赖:父RDD的分区和子RDD的分区关系是:一对多 宽依赖会产生shuffle,会产生磁盘读写,无法优化。
3.DAG。有向无环图,当一整条RDD的依赖关系形成之后,就形成了一个DAG。一般来说,一个DAG,最后都至少会触发一个Action操作,触发执行。一个Action对应一个Job任务。 4.Stage。一个DAG会根据RDD之间的依赖关系进行Stage划分,流程是:以Action为基准,向前回溯,遇到宽依赖,就形成一个Stage。遇到窄依赖,则执行流水线优化(将多个连续的窄依赖放到一起执行) 5.task。任务。一个分区对应一个task。可以这样理解:一个Stage是一组Task的集合 6.RDD的Transformation(变换)操作:懒执行,并不会立即执行 7.RDD的Action(执行)操作:触发真正的执行
(二)Spark架构
(Ⅰ)概述
为了更好地理解调度,我们先来鸟瞰一下集群模式下的Spark程序运行架构图。 1. Driver Program 用户编写的Spark程序称为Driver Program。每个Driver程序包含一个代表集群环境的SparkContext对象,程序的执行从Driver程序开始,所有操作执行结束后回到Driver程序中,在Driver程序中结束。如果你是用spark shell,那么当你启动 Spark shell的时候,系统后台自启了一个 Spark 驱动器程序,就是在Spark shell 中预加载的一个叫作 sc 的 SparkContext 对象。如果驱动器程序终止,那么Spark 应用也就结束了。
2. SparkContext对象 每个Driver Program里都有一个SparkContext对象,职责如下: 1)SparkContext对象联系 cluster manager(集群管理器),让 cluster manager 为Worker Node分配CPU、内存等资源。此外, cluster manager会在 Worker Node 上启动一个执行器(专属于本驱动程序)。 2)和Executor进程交互,负责任务的调度分配 监控。
3. cluster manager 集群管理器 它对应的是Master进程。集群管理器负责集群的资源调度,比如为Worker Node分配CPU、内存等资源。并实时监控Worker的资源使用情况。一个Worker Node默认情况下分配一个Executor(进程)。 从图中可以看到sc和Executor之间画了一根线条,这表明:程序运行时,sc是直接与Executor进行交互的。 所以,cluster manager 只是负责资源的管理调度,而任务的分配和结果处理它不管。
4.Worker Node Worker节点。集群上的计算节点,对应一台物理机器
5.Worker进程 它对应Worder进程,用于和Master进程交互,向Master注册和汇报自身节点的资源使用情况,并管理和启动Executor进程
6.Executor 负责运行Task计算任务,并将计算结果回传到Driver中。
7.Task 在执行器上执行的最小单元。比如RDD Transformation操作时对RDD内每个分区的计算都会对应一个Task。
(三)Spark调度模块
(Ⅰ)概述
之前我们提到:Driver 的sc负责和Executor交互,完成任务的分配和调度,在底层,任务调度模块主要包含两大部分: 1)DAGScheduler 2)TaskScheduler 它们负责将用户提交的计算任务按照DAG划分为不同的阶段并且将不同阶段的计算任务提交到集群进行最终的计算。整个过程可以使用下图表示 RDD Objects可以理解为用户实际代码中创建的RDD,这些代码逻辑上组成了一个DAG。
DAGScheduler主要负责分析依赖关系,然后将DAG划分为不同的Stage(阶段),其中每个Stage由可以并发执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据。
在DAGScheduler将这组Task划分完成后,会将这组Task提交到TaskScheduler。TaskScheduler通过Cluster Manager 申请计算资源,比如在集群中的某个Worker Node上启动专属的Executor,并分配CPU、内存等资源。接下来,就是在Executor中运行Task任务,如果缓存中没有计算结果,那么就需要开始计算,同时,计算的结果会回传到Driver或者保存在本地。
(Ⅱ)Scheduler的实现概述
任务调度模块涉及的最重要的三个类是: 1)org.apache.spark.scheduler.DAGScheduler 前面提到的DAGScheduler的实现。 将一个DAG划分为一个一个的Stage阶段(每个Stage是一组Task的集合) 然后把Task Set 交给TaskScheduler模块。 2)org.apache.spark.scheduler.TaskScheduler 它的作用是为创建它的SparkContext调度任务,即从DAGScheduler接收不同Stage的任务。向Cluster Manager 申请资源。然后Cluster Manager收到资源请求之后,在Worker为其启动进程 3)org.apache.spark.scheduler.SchedulerBackend 是一个trait,作用是分配当前可用的资源,具体就是向当前等待分配计算资源的Task分配计算资源(即Executor),并且在分配的Executor上启动Task,完成计算的调度过程。 4)AKKA是一个网络通信框架,类似于Netty,此框架在Spark1.8之后已全部替换成Netty
(Ⅲ)任务调度流程图
(四)Spark Shuffle详解
(Ⅰ)概述
Shuffle,翻译成中文就是洗牌。之所以需要Shuffle,还是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上并且由不同节点的计算单元处理。以最简单的Word Count为例,其中数据保存在Node1、Node2和Node3;
经过处理后,这些数据最终会汇聚到Nodea、Nodeb处理,如下图所示。 这个数据重新打乱然后汇聚到不同节点的过程就是Shuffle。但是实际上,Shuffle过程可能会非常复杂: 1)数据量会很大,比如单位为TB或PB的数据分散到几百甚至数千、数万台机器上。 2)为了将这个数据汇聚到正确的节点,需要将这些数据放入正确的Partition,因为数据大小已经大于节点的内存,因此这个过程中可能会发生多次硬盘续写。 3)为了节省带宽,这个数据可能需要压缩,如何在压缩率和压缩解压时间中间 做一个比较好的选择? 4)数据需要通过网络传输,因此数据的序列化和反序列化也变得相对复杂。
一般来说,每个Task处理的数据可以完全载入内存(如果不能,可以减小每个Partition的大小),因此Task可以做到在内存中计算。但是对于Shuffle来说,如果不持久化这个中间结果,一旦数据丢失,就需要重新计算依赖的全部RDD,因此有必要持久化这个中间结果。所以这就是为什么Shuffle过程会产生文件的原因。 如果Shuffle过程不落地,①可能会造成内存溢出 ②当某分区丢失时,会重新计算所有父分区数据
(Ⅱ)Shuffle Write
Shuffle Write,即数据是如何持久化到文件中,以使得下游的Task可以获取到其需要处理的数据的(即Shuffle Read)。在Spark 0.8之前,Shuffle Write是持久化到缓存的,但后来发现实际应用中,shuffle过程带来的数据通常是巨量的,所以经常会发生内存溢出的情况,所以在Spark 0.8以后,Shuffle Write会将数据持久化到硬盘,再之后Shuffle Write不断进行演进优化,但是数据落地到本地文件系统的实现并没有改变。 1)Hash Based Shuffle Write 在Spark 1.0以前,Spark只支持Hash Based Shuffle。因为在很多运算场景中并不需要排序,因此多余的排序只能使性能变差,比如Hadoop的Map Reduce就是这么实现的,也就是Reducer拿到的数据都是已经排好序的。实际上Spark的实现很简单:每个Shuffle Map Task根据key的哈希值,计算出每个key需要写入的Partition然后将数据单独写入一个文件,这个Partition实际上就对应了下游的一个Shuffle Map Task或者Result Task。因此下游的Task在计算时会通过网络(如果该Task与上游的Shuffle Map Task运行在同一个节点上,那么此时就是一个本地的硬盘读写)读取这个文件并进行计算。 Hash Based Shuffle Write存在的问题 由于每个Shuffle Map Task需要为每个下游的Task创建一个单独的文件,因此文件的数量就是: number(shuffle_map_task)*number(result_task)。
如果Shuffle Map Task是1000,下游的Task是500,那么理论上会产生500000个文件(对于size为0的文件Spark有特殊的处理)。生产环境中Task的数量实际上会更多,因此这个简单的实现会带来以下问题: 1)每个节点可能会同时打开多个文件,每次打开文件都会占用一定内存。假设每个Write Handler的默认需要100KB的内存,那么同时打开这些文件需要50GB的内存,对于一个集群来说,还是有一定的压力的。尤其是如果Shuffle Map Task和下游的Task同时增大10倍,那么整体的内存就增长到5TB。
2)从整体的角度来看,打开多个文件对于系统来说意味着随机读,尤其是每个文件比较小但是数量非常多的情况。而现在机械硬盘在随机读方面的性能特别差,非常容易成为性能的瓶颈。如果集群依赖的是固态硬盘,也许情况会改善很多,但是随机写的性能肯定不如顺序写的。
2)Sort Based Shuffle Write 在Spark 1.2.0中,Spark Core的一个重要的升级就是将默认的Hash Based Shuffle换成了Sort Based Shuffle,即spark.shuffle.manager从Hash换成了Sort,对应的实现类分别是org.apache.spark.shuffle.hash.HashShuffleManager和org.apache.spark.shuffle.sort.SortShuffleManager。 那么Sort Based Shuffle“取代”Hash Based Shuffle作为默认选项的原因是什么? 正如前面提到的,Hash Based Shuffle的每个Mapper都需要为每个Reducer写一个文件,供Reducer读取,即需要产生M*R个数量的文件,如果Mapper和Reducer的数量比较大,产生的文件数会非常多。 而Sort Based Shuffle的模式是:每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个Index文件, Reducer可以通过这个Index文件取得它需要处理的数据。避免产生大量文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件给系统带来的压力。
Sort Based Write实现详解 Shuffle Map Task会按照key相对应的Partition ID进行Sort,其中属于同一个Partition的key不会Sort。因为对于不需要Sort的操作来说,这个Sort是负收益的;要知道之前Spark刚开始使用Hash Based的Shuffle而不是Sort Based就是为了避免Hadoop Map Reduce对于所有计算都会Sort的性能损耗。对于那些需要Sort的运算,比如sortByKey,这个Sort在Spark 1.2.0里还是由Reducer完成的。
①答出shuffle的定义 ②spark shuffle的特点 ③spark shuffle的目的 ④spark shuffel的实现类,即对应优缺点
七、Spark SQL
(一)Spark SQL
(Ⅰ)概述
Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame(数据框)的编程抽象,DF的底层仍然是RDD,并且可以充当分布式SQL查询引擎。
(Ⅱ)SparkSQL的由来
SparkSQL的前身是Shark。在Hadoop发展过程中,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,是当时唯一运行在hadoop上的SQL-on-Hadoop工具。但是,MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,运行效率较低。 后来,为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是: 1)MapR的Drill 2)Cloudera的Impala 3)Shark 其中Shark是伯克利实验室Spark生态环境的组件之一,它基于Hive实施了一些改进,比如引入缓存管理,改进和优化执行器等,并使之能运行在Spark引擎上,从而使得SQL查询的速度得到10-100倍的提升。 但是,随着Spark的发展,对于野心勃勃的Spark团队来说,Shark对于hive的太多依赖(如采用hive的语法解析器、查询优化器等等),制约了Spark的One Stack rule them all的既定方针,制约了spark各个组件的相互集成,所以提出了sparkSQL项目。 SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码。
由于摆脱了对hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。
2014年6月1日,Shark项目和SparkSQL项目的主持人Reynold Xin宣布:停止对Shark的开发,团队将所有资源放SparkSQL项目上,至此,Shark的发展画上了句号。
(Ⅲ)SparkSql特点
1)引入了新的RDD类型SchemaRDD,可以像传统数据库定义表一样来定义SchemaRDD 2)在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作。 3)内嵌了查询优化框架,在把SQL解析成逻辑执行计划之后,最后变成RDD的计算
(Ⅳ)为什么sparkSQL的性能会得到怎么大的提升呢?
主要sparkSQL在下面几点做了优化: 1)内存列存储(In-Memory Columnar Storage) 列存储的优势: ①海量数据查询时,不存在冗余列问题。如果是基于行存储,查询时会产生冗余列,消除冗余列一般在内存中进行的。或者基于行存储的查询,实现物化索引(建立B-tree B+tree),但是物化索引也是需要耗费cpu的 ②基于列存储,每一列数据类型都是同质的,好处一可以避免数据在内存中类型的频繁转换。好处二可以采用更高效的压缩算法,比如增量压缩算法,二进制压缩算法。性别:男 女 男 女 0101 SparkSQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储,如下图所示。 该存储方式无论在空间占用量和读取吞吐率上都占有很大优势。 对于原生态的JVM对象存储方式,每个对象通常要增加12-16字节的额外开销(toString、hashcode等方法),如对于一个270MB的电商的商品表数据,使用这种方式读入内存,要使用970MB左右的内存空间(通常是2~5倍于原生数据空间)。
另外,使用这种方式,每个数据记录产生一个JVM对象,如果是大小为200GB的数据记录,堆栈将产生1.6亿个对象,这么多的对象,对于GC来说,可能要消耗几分钟的时间来处理(JVM的垃圾收集时间与堆栈中的对象数量呈线性相关。显然这种内存存储方式对于基于内存计算的spark来说,很昂贵也负担不起)
SparkSql的存储方式:对于内存列存储来说,将所有原生数据类型的列采用原生数组来存储,将Hive支持的复杂数据类型(如array、map等)先序化后并接成一个字节数组来存储。
此外,基于列存储,每列数据都是同质的,所以可以降低数据类型转换的CPU消耗。此外,可以采用高效的压缩算法来压缩,是的数据更少。比如针对二元数据列,可以用字节编码压缩来实现(010101)
这样,每个列创建一个JVM对象,从而可以快速的GC和紧凑的数据存储;额外的,还可以使用低廉CPU开销的高效压缩方法(如字典编码、行长度编码等压缩方法)降低内存开销;更有趣的是,对于分析查询中频繁使用的聚合特定列,性能会得到很大的提高,原因就是这些列的数据放在一起,更容易读入内存进行计算。
(二)SparkSQL入门
(Ⅰ)概述
SparkSql将RDD封装成一个DataFrame对象,这个对象类似于关系型数据库中的表。
(Ⅱ)创建DataFrame对象
DataFrame就相当于数据库的一张表。它是个只读的表,不能在运算过程再往里加元素。 RDD.toDF(“列名”)
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> rdd.toDF("id")
res0: org.apache.spark.sql.DataFrame = [id: int]
scala> res0.show#默认只显示20条数据
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
+---+
scala> res0.printSchema #查看列的类型等属性
root
|-- id: integer (nullable = true)
(Ⅲ)创建多列DataFrame对象
DataFrame就相当于数据库的一张表。
scala> sc.parallelize(List( (1,"beijing"),(2,"shanghai") ) )
res3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[5] at parallelize at <console>:22
scala> res3.toDF("id","name")
res4: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> res4.show
+---+--------+
| id| name|
+---+--------+
| 1| beijing|
| 2|shanghai|
+---+--------+
例如3列的
scala> sc.parallelize(List( (1,"beijing",100780),(2,"shanghai",560090),(3,"xi'an",600329)))
res6: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:22
scala> res6.toDF("id","name","postcode")
res7: org.apache.spark.sql.DataFrame = [id: int, name: string, postcode: int]
scala> res7.show
+---+--------+--------+
| id| name|postcode|
+---+--------+--------+
| 1| beijing| 100780|
| 2|shanghai| 560090|
| 3| xi'an| 600329|
+---+--------+--------+
可以看出,需要构建几列,tuple就有几个内容。
(Ⅳ)由外部文件构造DataFrame对象
1)txt文件 txt文件不能直接转换成,先利用RDD转换为tuple。然后toDF()转换为DataFrame。
scala> val rdd = sc.textFile("/root/words.txt")
.map( x => (x,1) )
.reduceByKey( (x,y) => x+y )
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:21
scala> rdd.toDF("word","count")
res9: org.apache.spark.sql.DataFrame = [word: string, count: int]
scala> res9.show
+------+-----+
| word|count|
+------+-----+
| spark| 3|
| hive| 1|
|hadoop| 2|
| big| 2|
| scla| 1|
| data| 1|
+------+-----+
2)json文件 文件代码: {“id”:1, “name”:“leo”, “age”:18} {“id”:2, “name”:“jack”, “age”:19} {“id”:3, “name”:“marry”, “age”:17}
代码:
import org.apache.spark.sql.SQLContext
scala>val sqc=new SQLContext(sc)
scala> val tb4=sqc.read.json("/home/software/people.json")
scala> tb4.show
3)jdbc读取 实现步骤: 1)将mysql 的驱动jar上传到spark的jars目录下 2)重启spark服务 3)进入spark客户端 4)执行代码,比如在Mysql数据库下,有一个test库,在test库下有一张表为tabx 执行代码:
import org.apache.spark.sql.SQLContext
scala> val sqc = new SQLContext(sc);
scala> val prop = new java.util.Properties
scala> prop.put("user","root")
scala> prop.put("password","root")
scala>val tabx=sqc.read.jdbc("jdbc:mysql://hadoop01:3306/test","tabx",prop)
scala> tabx.show
+---+----+
| id|name|
+---+----+
| 1| aaa|
| 2| bbb|
| 3| ccc|
| 1| ddd|
| 2| eee|
| 3| fff|
+---+----+
注:如果报权限不足,则进入mysql,执行: grant all privileges on . to ‘root’@‘hadoop01’ identified by ‘root’ with grant option; 然后执行: flush privileges;
(三)SparkSql基础语法
(Ⅰ)通过方法来使用
(1)查询 df.select(“id”,“name”).show();
(2)带条件的查询 df.select(
"
i
d
"
,
"id",
"id",“name”).where($“name” === “bbb”).show()
(3)排序查询 orderBy/sort(
"
列
名
"
)
升
序
排
列
o
r
d
e
r
B
y
/
s
o
r
t
(
"列名") 升序排列 orderBy/sort(
"列名")升序排列orderBy/sort(“列名”.desc) 降序排列 orderBy/sort($“列1” , $“列2”.desc) 按两列排序
df.select(
"
i
d
"
,
"id",
"id",“name”).orderBy(
"
n
a
m
e
"
.
d
e
s
c
)
.
s
h
o
w
d
f
.
s
e
l
e
c
t
(
"name".desc).show df.select(
"name".desc).showdf.select(“id”,
"
n
a
m
e
"
)
.
s
o
r
t
(
"name").sort(
"name").sort(“name”.desc).show tabx.select(
"
i
d
"
,
"id",
"id",“name”).sort(
"
i
d
"
,
"id",
"id",“name”.desc).show (4)分组查询 groupBy(“列名”, …).max(列名) 求最大值 groupBy(“列名”, …).min(列名) 求最小值 groupBy(“列名”, …).avg(列名) 求平均值 groupBy(“列名”, …).sum(列名) 求和 groupBy(“列名”, …).count() 求个数 groupBy(“列名”, …).agg 可以将多个方法进行聚合
scala>val rdd = sc.makeRDD(List((1,“a”,“bj”,100),(2,“b”,“sh”,80),(3,“c”,“gz”,50),(4,“d”,“bj”,45))); scala>val df = rdd.toDF(“id”,“name”,“addr”,“score”); scala>df.groupBy(“addr”).count().show() scala>df.groupBy(“addr”).agg(max(
"
s
c
o
r
e
"
)
,
m
i
n
(
"score"), min(
"score"),min(“score”), count($"*")).show
(5)连接查询 scala>val dept=sc.parallelize(List((100,“caiwubu”),(200,“yanfabu”))).toDF(“deptid”,“deptname”) scala>val emp=sc.parallelize(List((1,100,“zhang”),(2,200,“li”),(3,300,“wang”))).toDF(“id”,“did”,“name”) scala>dept.join(emp,$“deptid” ===
"
d
i
d
"
)
.
s
h
o
w
s
c
a
l
a
>
d
e
p
t
.
j
o
i
n
(
e
m
p
,
"did").show scala>dept.join(emp,
"did").showscala>dept.join(emp,“deptid” ===
"
d
i
d
"
,
"
l
e
f
t
"
)
.
s
h
o
w
左
向
外
联
接
的
结
果
集
包
括
L
E
F
T
O
U
T
E
R
子
句
中
指
定
的
左
表
的
所
有
行
,
而
不
仅
仅
是
联
接
列
所
匹
配
的
行
。
如
果
左
表
的
某
行
在
右
表
中
没
有
匹
配
行
,
则
在
相
关
联
的
结
果
集
行
中
右
表
的
所
有
选
择
列
表
列
均
为
空
值
。
s
c
a
l
a
>
d
e
p
t
.
j
o
i
n
(
e
m
p
,
"did","left").show 左向外联接的结果集包括 LEFT OUTER子句中指定的左表的所有行,而不仅仅是联接列所匹配的行。如果左表的某行在右表中没有匹配行,则在相关联的结果集行中右表的所有选择列表列均为空值。 scala>dept.join(emp,
"did","left").show左向外联接的结果集包括LEFTOUTER子句中指定的左表的所有行,而不仅仅是联接列所匹配的行。如果左表的某行在右表中没有匹配行,则在相关联的结果集行中右表的所有选择列表列均为空值。scala>dept.join(emp,“deptid” === $“did”,“right”).show
(6)执行运算 val df = sc.makeRDD(List(1,2,3,4,5)).toDF(“num”); df.select($“num” * 100).show
(7)使用列表 val df = sc.makeRDD(List((“zhang”,Array(“bj”,“sh”)),(“li”,Array(“sz”,“gz”)))).toDF(“name”,“addrs”) df.selectExpr(“name”,“addrs[0]”).show
(8)使用结构体 {“name”:“陈晨”,“address”:{“city”:“西安”,“street”:“南二环甲字1号”}} {“name”:“娜娜”,“address”:{“city”:“西安”,“street”:“南二环甲字2号”}}
val df = sqlContext.read.json(“file:///root/work/users.json”) dfs.select(“name”,“address.street”).show
(9)其他 df.count//获取记录总数 val row = df.first()//获取第一条记录 val take=df.take(2) //获取前n条记录 val value = row.getString(1)//获取该行指定列的值 df.collect //获取当前df对象中的所有数据为一个Array 其实就是调用了df对象对应的底层的rdd的collect方法
(Ⅱ)通过sql语句来调用
(0)创建表 df.registerTempTable(“tabName”) (1)查询 val sqc = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List((1,“a”,“bj”),(2,“b”,“sh”),(3,“c”,“gz”),(4,“d”,“bj”),(5,“e”,“gz”))).toDF(“id”,“name”,“addr”); df.registerTempTable(“stu”); sqc.sql(“select * from stu”).show()
(2)带条件的查询 val df = sc.makeRDD(List((1,“a”,“bj”),(2,“b”,“sh”),(3,“c”,“gz”),(4,“d”,“bj”),(5,“e”,“gz”))).toDF(“id”,“name”,“addr”); df.registerTempTable(“stu”); sqc.sql(“select * from stu where addr = ‘bj’”).show()
(3)排序查询 val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List((1,“a”,“bj”),(2,“b”,“sh”),(3,“c”,“gz”),(4,“d”,“bj”),(5,“e”,“gz”))).toDF(“id”,“name”,“addr”); df.registerTempTable(“stu”); sqlContext.sql(“select * from stu order by addr”).show() sqlContext.sql(“select * from stu order by addr desc”).show()
(4)分组查询 val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List((1,“a”,“bj”),(2,“b”,“sh”),(3,“c”,“gz”),(4,“d”,“bj”),(5,“e”,“gz”))).toDF(“id”,“name”,“addr”); df.registerTempTable(“stu”); sqlContext.sql(“select addr,count(*) from stu group by addr”).show()
(5)连接查询 val sqlContext = new org.apache.spark.sql.SQLContext(sc); val dept=sc.parallelize(List((100,“财务部”),(200,“研发部”))).toDF(“deptid”,“deptname”) val emp=sc.parallelize(List((1,100,“张财务”),(2,100,“李会计”),(3,300,“王艳发”))).toDF(“id”,“did”,“name”) dept.registerTempTable(“deptTab”); emp.registerTempTable(“empTab”); sqlContext.sql(“select deptname,name from dept inner join emp on dept.deptid = emp.did”).show()
(6)执行运算 val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List(1,2,3,4,5)).toDF(“num”); df.registerTempTable(“tabx”) sqlContext.sql(“select num * 100 from tabx”).show();
(7)分页查询 val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.makeRDD(List(1,2,3,4,5)).toDF(“num”); df.registerTempTable(“tabx”) sqlContext.sql(“select * from tabx limit 3”).show();
(8)查看表 sqlContext.sql(“show tables”).show
(9)类似hive方式的操作 scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) scala>hiveContext.sql(“create table if not exists zzz (key int, value string) row format delimited fields terminated by ‘|’”) scala>hiveContext.sql(“load data local inpath ‘file:///home/software/hdata.txt’ into table zzz”) scala>hiveContext.sql(“select key,value from zzz”).show
(10)案例 val sqlContext = new org.apache.spark.sql.SQLContext(sc); val df = sc.textFile(“file:///root/work/words.txt”).flatMap{ _.split(" ") }.toDF(“word”) df.registerTempTable(“wordTab”) sqlContext.sql(“select word,count(*) from wordTab group by word”).show
(四)SparkSql API
实现步骤: 1)打开scala IDE开发环境,创建一个scala工程 2)导入spark相关依赖jar包
3)创建包路径以object类 4)写代码 代码示意:
package cn.tedu.sparksql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object Demo01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("spark://hadoop01:7077").setAppName("sqlDemo01");
val sc=new SparkContext(conf)
val sqlContext=new SQLContext(sc)
val rdd=sc.makeRDD(List((1,"zhang"),(2,"li"),(3,"wang")))
import sqlContext.implicits._
val df=rdd.toDF("id","name")
df.registerTempTable("tabx")
val df2=sqlContext.sql("select * from tabx order by name");
val rdd2=df2.toJavaRDD;
rdd2.saveAsTextFile("file:///home/software/result");
}
}
5)打jar包,并上传到linux虚拟机上 6)在spark的bin目录下 执行:sh spark-submit --class cn.tedu.sparksql.Demo01 ./sqlDemo01.jar 7)最后检验
八、SparkStreaming
(Ⅰ)概述
Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力,以吞吐量高和容错能力强著称。
(Ⅱ)SparkStreaming VS Storm
大体上两者非常接近,而且都处于快速迭代过程中,即便一时的对比可能某一方占优势。 在Spark老版本中,SparkStreaming的延迟级别达到秒级,而Storm可以达到毫秒级别。而在最新的2.0版本之后,SparkStreaming能够达到毫秒级。
但后者可能很快就追赶上来。比如在性能方面,Spark Streaming刚发布不久,有基准测试显示性能超过Storm几十倍,原因是Spark Streaming采用了小批量模式,而Storm是一条消息一条消息地计算。但后来Storm也推出了称为Trident的小批量计算模式,性能应该不是差距了。而且双方都在持续更新,底层的一个通信框架的更新或者某个路径的代码优化都可能让性能有较大的提升。
目前,sparkStreaming还不能达到一条一条记录的精细控制,还是以batch为单位。所以像Storm一般用于金融领域,达到每笔交易的精细控制。
但是两者的基因不同,更具体地说就是核心数据抽象不同。这是无法改变的,而且也不会轻易改变,这样的基因也决定了它们各自最适合的应用场景。
Spark Streaming的核心抽象是DSTream,里面是RDD,下层是Spark核心DAG调度,所以Spark Streaming的这一基因决定了其粒度是小批量的,无法做更精细地控制。数据的可靠性也是以批次为粒度的,但好处也很明显,就是有可能实现更大的吞吐量。
另外,得益于Spark平台的良好整合性,完成相同任务的流式计算程序与历史批量处理程序的代码基本相同,而且还可以使用平台上的其他模块比如SQL、机器学习、图计算的计算能力,在开发效率上占有优势。而Storm更擅长细粒度的消息级别的控制,比如延时可以实现毫秒级,数据可靠性也是以消息为粒度的。
核心数据抽象的不同导致了它们在计算模式上的本质区别。Spark Streaming在本质上其实是像MR一样的批处理计算,但将批处理的周期从常规的几十分钟级别尽可能缩短至秒级(毫秒级),也算达到了实时计算的延时指标。而且,它支持各类数据源,基本可以实现流式计算的功能,但延时无法进一步缩短了。但Storm的设计初衷就是实时计算,毫秒级的计算当然不在话下,而且后期通过更高级别的Trident也实现了小批次处理功能。
九、遇到问题及解决方法
(一)r1.partitions出错
scala> val r1 = rdd.mapPartitions{x=>{
| val result=List[Int]()
| val i = 0
| while(x.hasNext){
| i += x.next()
| }
| result.::(i).iterator
| }}
<console>:30: error: value += is not a member of Int
Error occurred in an application involving default arguments.
i += x.next()
原因:将var打错了成了val,导致为常量,不可修改
(二)eclipse自动补全快捷键
alt+/
|