Spark 官方文档
1,spark 概述
Apache Spark 是一个快速通用的集群计算系统,它提供了提供了java,scala,python和R的高级API,以及一个支持一般图计算的优化引擎。它同样也一系列丰富的高级工具包括:Spark sql 用于sql和结构化数据处理,MLlib用于机器学习,Graphx用于图数据处理,以及Spark Streaming用于流数据处理。
2,快速入门
本教程对使用spark进行简单介绍。首先我们会通过spark的交互式 shell工具介绍Python和scalade API,然后展示如何通过java,scala和Python编写一个spark应用程序。 为了方便参照该指南进行学习。请先到 Spark 网站 下载一个 Spark 发布包。由于我们暂时还不会用到 HDFS,所以你可以下载对应任意 Hadoop 版本的 Spark 发布包。 Spark 2.0 版本之前, Spark 的核心编程接口是弹性分布式数据集(RDD)。Spark 2.0 版本之后, RDD 被 Dataset 所取代, Dataset 跟 RDD 一样也是强类型的, 但是底层做了更多的优化。Spark 目前仍然支持 RDD 接口, 你可以在 RDD 编程指南 页面获得更完整的参考,但是我们强烈建议你转而使用比 RDD 有着更好性能的 Dataset。想了解关于 Dataset 的更多信息请参考 Spark SQL, DataFrame 和 Dataset 编程指南。
2.1使用 Spark Shell 进行交互式分析
Spark shell 提供了一个简单的方式去学习API,同时它也是一个强大的分布式分析工具。park Shell 既支持 Scala(Scala 运行在 Java 虚拟机上,所以可以很方便的引用现有的 Java 库)也支持 Python。 Scala: 到spark安装目录bin下找到spark-shell :
./bin/spark-shell
**注意:**Python方式略,在之后的介绍中,只展示scala方式,python方式,可见官网详情: Spark 最主要的抽象概念就是一个叫做 Dataset 的分布式数据集。Dataset 可以从 Hadoop InputFormats(例如 HDFS 文件)创建或者由其他 Dataset 转换而来。下面我们利用 Spark 源码目录下 README 文件中的文本来新建一个 Dataset:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
你可以直接调用Action算子从Dataset中获取数据,或者转换该 Dataset 以获取一个新的 Dataset。更多细节请参阅 API 文档 。
scala> textFile.count()
res0: Long = 126
scala> textFile.first()
res1: String = # Apache Spark
现在我们将该 Dataset 转换成一个新的 Dataset。我们调用 filter 这个 transformation 算子返回一个只包含原始文件数据项子集的新 Dataset。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
我们可以将 transformation 算子和 action 算子连在一起:
scala> textFile.filter(line => line.contains("Spark")).count()
res3: Long = 15
2.2更多关于DataSet的操作
Dataset action 和 transformation 算子可以用于更加复杂的计算。比方说我们想要找到文件中包含单词数最多的行。
Scala
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
首先,使用 map 算子将每一行映射为一个整数值,创建了一个新的 Dataset。然后在该 Dataset 上调用 reduce 算子找出最大的单词计数。map 和 reduce 算子的参数都是 cala 函数字面量(闭包),并且可以使用任意语言特性或 Scala/Java 库。例如,我们可以很容易地调用其他地方声明的函数。为了使代码更容易理解,下面我们使用Math.max():
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
因 Hadoop 而广为流行的 MapReduce 是一种通用的数据流模式。Spark 可以很容易地实现 MapReduce 流程:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
这里我们调用 flatMap 这个 transformation 算子将一个行的 Dataset 转换成了一个单词的 Dataset, 然后组合 groupByKey 和 count 算子来计算文件中每个单词出现的次数,生成一个包含(String, Long)键值对的 Dataset。为了在 shell 中收集到单词计数, 我们可以调用 collect 算子:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
2.3缓存
Spark 还支持把数据集拉到集群范围的内存缓存中。当数据需要反复访问时非常有用,比如查询一个小的热门数据集或者运行一个像 PageRank 这样的迭代算法。作为一个简单的示例,我们把 linesWithSpark 这个数据集缓存起来。
Scala
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
用 Spark 浏览和缓存一个 100 行左右的文本文件看起来确实有点傻。但有趣的部分是这些相同的函数可以用于非常大的数据集,即使这些数据集分布在数十或数百个节点上。如 RDD 编程指南 中描述的那样, 你也可以通过 bin/spark-shell 连接到一个集群,交互式地执行上面那些操作。
2.4自包含的(self-contained)应用程序
假设我们想使用 Spark API 编写一个自包含(self-contained)的 Spark 应用程序。下面我们将快速过一下一个简单的应用程序,分别使用 Scala(sbt编译),Java(maven编译)和 Python(pip) 编写 Scala 首先创建一个非常简单的 Spark 应用程序 – 简单到连名字都叫 SimpleApp.scala:
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md"
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
**注意:**应用程序需要定义一个 main 方法,而不是继承 scala.App。scala.App 的子类可能不能正常工作。 这个程序只是统计 Spark README 文件中包含‘a’和包含’b’的行数。注意,你需要把 YOUR_SPARK_HOME 替换成 Spark 的安装目录。与之前使用 Spark Shell 的示例不同,Spark Shell 会初始化自己的 SparkSession 对象, 而我们需要初始化 SparkSession 对象作为程序的一部分。 我们调用 SparkSession.builder 来构造一个 [[SparkSession]] 对象, 然后设置应用程序名称, 最后调用 getOrCreate 方法获取 [[SparkSession]] 实例。 为了让 sbt 能够正常工作,我们需要根据一个标准规范的 Scala 项目目录结构来放置 SimpleApp.scala 和 build.sbt 文件。一切准备就绪后,我们就可以创建一个包含应用程序代码的 JAR 包,然后使用 spark-submit 脚本运行我们的程序。
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23
3 RDD编程
3.1,概述:
总的来说,每个 Spark 应用程序都包含一个驱动器(driver)程序,驱动器程序运行用户的 main 函数,并在集群上执行各种并行操作。Spark 最重要的一个抽象概念就是弹性分布式数据集(resilient distributed dataset – RDD), RDD是一个可分区的元素集合,这些元素分布在集群的各个节点上,并且可以在这些元素上执行并行操作。RDD通常是通过HDFS(或者Hadoop支持的其它文件系统)上的文件,或者驱动器中的Scala集合对象来创建或转换得到。其次,用户也可以请求Spark将RDD持久化到内存里,以便在不同的并行操作里复用之;最后,RDD具备容错性,可以从节点失败中自动恢复。 Spark 第二个重要抽象概念是共享变量,共享变量是一种可以在并行操作之间共享使用的变量。默认情况下,当Spark把一系列任务调度到不同节点上运行时,Spark会同时把每个变量的副本和任务代码一起发送给各个节点。但有时候,我们需要在任务之间,或者任务和驱动器之间共享一些变量。Spark 支持两种类型的共享变量:广播变量 和 累加器,广播变量可以用于在各个节点上缓存数据,而累加器则是用来执行跨节点的 “累加” 操作,例如:计数和求和。 本文将会使用 Spark 所支持的所有语言来展示 Spark 的这些特性。如果你能启动 Spark 的交互式shell动手实验一下,效果会更好(对于 Scala shell请使用bin/spark-shell,而对于python,请使用bin/pyspark)。
3.2,链接Saprk
Spark 2.4.3(目前官网最新版本)默认使用Scala2.11版本进行构建与并发的(备注:不同的spark版本使用不同的scala的版本进行构建与并发,不如:spark2.2.1 使用scala2.11;如果想用 Scala 写应用程序,你需要使用兼容的 Scala 版本(如:2.11.X)。 要编写 Spark 应用程序,你需要添加 Spark 的 Maven 依赖。Spark 依赖可以通过以下 Maven 坐标从 Maven 中央仓库中获得:
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 2.4.3
另外,如果你想要访问 HDFS 集群,那么需要添加对应 HDFS 版本的 hadoop-client 依赖:
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,你需要在程序中添加下面几行来引入一些 Spark 类:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在 Spark 1.3.0 版本之前,你需要显示地 import org.apache.spark.SparkContext._ 来启用必要的隐式转换) (使用java,python方式初始化链接spark详情请见官网)
3.3 初始化spark
Spark 程序需要做的第一件事就是创建一个 SparkContext 对象,SparkContext 对象决定了 Spark 如何访问集群。而要新建一个 SparkContext 对象,你还得需要构造一个 SparkConf 对象,SparkConf对象包含了你的应用程序的配置信息。 每个JVM进程中,只能有一个活跃(active)的 SparkContext 对象。如果你非要再新建一个,那首先必须将之前那个活跃的 SparkContext 对象stop()掉。
Scala
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
appName 参数值是你的应用展示在集群UI上的应用名称。master参数值是Spark, Mesos or YARN cluster URL 或者特殊的“local”(本地模式)。实际上,一般不应该将master参数值硬编码到代码中,而是应该用spark-submit脚本的参数来设置。然而,如果是本地测试或单元测试中,你可以直接在代码里给master参数写死一个”local”值。
3.3.1:使用shell
Scala 在 Spark Shell 中,默认已经为你新建了一个 SparkContext 对象,变量名为sc。所以 spark-shell 里不能自建SparkContext对象。你可以通过–master参数设置要连接到哪个集群,而且可以给–jars参数传一个逗号分隔的jar包列表,以便将这些jar包加到classpath中。你还可以通过–packages设置逗号分隔的maven工件列表,以便增加额外的依赖项。同样,还可以通过–repositories参数增加maven repository地址。 下面是一个示例,在本地4个CPU core上运行的实例。
$ ./bin/spark-shell --master local[4]
或者,将 code.jar 添加到 classpath 下:
$ ./bin/spark-shell --master local[4] --jars code.jar
通过 maven标识添加依赖:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
spark-shell –help 可以查看完整的选项列表。实际上,spark-shell 是在后台调用 spark-submit 来实现其功能的(spark-submit script.)。
3.3.2 弹性分布式数据集
Spark的核心概念是弹性分布式数据集(RDD),RDD是一个可容错、可并行操作的分布式元素集合。总体上有两种方法可以创建 RDD 对象:由驱动程序中的集合对象通过并行化操作创建,或者从外部存储系统中数据集加载(如:共享文件系统、HDFS、HBase或者其他Hadoop支持的数据源)。
3.3.2.1 并行集合
并行集合是以一个已有的集合对象(例如:Scala Seq)为参数,调用 SparkContext.parallelize() 方法创建得到的 RDD。集合对象中所有的元素都将被复制到一个可并行操作的分布式数据集中。例如,以下代码将一个1到5组成的数组并行化成一个RDD:
Scala
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
一旦创建成功,该分布式数据集(上例中的distData)就可以执行一些并行操作。如,distData.reduce((a, b) => a + b),这段代码会将集合中所有元素加和。后面我们还会继续讨论分布式数据集上的各种操作。
并行集合的一个重要参数是分区(partition),即这个分布式数据集可以分割为多少分区(partition)。Spark将会为集群中的每个分区(partition)运行一个task。一般情况下,在集群中每个CPU对应2-4个分区。通常,Spark会基于集群的情况,自动设置这个分区数。当然,你也可以通过设置第二参数parallelize (比如: sc.parallelize(data, 10)来手动设置分区数量。注意:Spark代码里有些地方仍然使用分片(slice)这个术语,这只不过是分区的一个别名,主要为了保持向后兼容。
3.3.2.2 外部数据集
Spark 可以通过Hadoop所支持的任何数据源来创建分布式数据集,包括:本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持的文件格式包括:文本文件(text files)、SequenceFiles,以及其他 Hadoop 支持的输入格式(InputFormat)。 文本文件创建RDD可以用 SparkContext.textFile 方法。这个方法输入参数是一个文件的URI(本地路径,或者 hdfs://,s3n:// 等),其输出RDD是一个文本行集合。以下是一个简单示例:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
一旦RDD被创建以后,distFile 就可以执行数据集的一些操作。比如,我们可以把所有文本行的长度加和:distFile.map(s => s.length).reduce((a, b) => a + b)。 以下是一些读取文本文件的一些注意点: 1,如果是本地文件系统路径,那么这个文件必须在所有的 worker 节点上能够以相同的路径访问到。所以要么把文件复制到所有worker节点上同一路径下,要么挂载一个共享文件系统。 2,所有 Spark 基于文件输入的方法(包括textFile)都支持输入参数为:目录,压缩文件,以及通配符。例如:textFile(“/my/directory”), textFile(“/my/directory/.txt”), 以及 textFile(“/my/directory/.gz”)。 3,textFile 方法同时还支持第二个可选参数,用以控制数据的分区个数。默认地,Spark会为文件的每一个block创建一个分区(HDFS上默认block大小为128MB,老版本block的大小为64M),你可以通过传递一个更大的参数来上调分区数。注意:分区数不能少于block个数。 除了文本文件之外,Spark的Scala API还支持其他几种数据格式: 1,SparkContext.wholeTextFiles 可以读取一个包含很多小文本文件的目录,并且以 (filename, content) 键值对的形式返回结果。这与textFile 不同,textFile只返回文件的内容,每行作为一个元素。 wholeTextFiles,分区数由数据位置决定,在一些情况下,可能导致分区数特别少,对于这些情况,wholeTextFiles提供了第二个可选的参数,来控制极少分区数的情况。 2,对于 SequenceFiles,可以调用 SparkContext.sequenceFile[K, V],其中 K 和 V 分别是文件中 key 和 value 的类型。这些类型都应该是Hadoop Writable 接口的子类, 如:IntWritable and Text 等。另外,Spark 允许你为一些常用Writable指定原生类型,例如:sequenceFile[Int, String] 将自动读取 IntWritable 和 Text。 4,对于其他的 Hadoop InputFormat,你可以用 SparkContext.hadoopRDD 方法,并传入任意的 JobConf 对象和 InputFormat,以及 key class、value class。这和设置 Hadoop job 的输入源是同样的方法。你还可以使用 SparkContext.newAPIHadoopRDD,该方法接收一个基于新版Hadoop MapReduce API (org.apache.hadoop.mapreduce)的InputFormat作为参数。 5,RDD.saveAsObjectFile 和 SparkContext.objectFile 支持将 RDD 中元素以 Java 对象序列化的格式保存成文件。虽然这种序列化方式不如 Avro 效率高,却为保存 RDD 提供了一种简便方式。
3.4 RDD算子
RDDS支持俩种类型的算子:转换算子(transformations),从已有的RDD创建一个新的RDD,和actions算子,通过对数据集(RDD)的计算,将计算结果值返回给driver驱动器。例如:map 是一个 transformation 算子,它将数据集中每个元素传给一个指定的函数,并将该函数返回结果构建为一个新的RDD;而 reduce 是一个 action 算子,它可以将 RDD 中所有元素传给指定的聚合函数,并将最终的聚合结果返回给驱动器(还有一个 reduceByKey 算子,返回一个分布式数据集(RDD))。 在spark中所有的转换算子都是懒加载的,也就是说,transformation 算子并不立即计算结果,而是记录下对基础数据集(如:一个数据文件)的转换操作,只有等到某个 action 算子需要计算一个结果返回给驱动器的时候,transformation 算子所记录的操作才会被计算。这样的设计,可以使spark运行的更加高效,例如:map算子创建了一个数据集,同时该数据集下一步会调用reduce算子,那么Spark将只会返回reduce的最终聚合结果(单独的一个数据)给驱动器,而不是将map所产生的数据集整个返回给驱动器。 默认情况,每次调用 action 算子的时候,每个由 transformation 转换得到的RDD都会被重新计算。然而,你也可以通过调用 persist(或者cache)操作来持久化一个 RDD,这意味着 Spark 将会把 RDD 的元素都保存在集群中,因此下一次访问这些元素的速度将大大提高。同时,Spark 还支持将RDD元素持久化到内存或者磁盘上,甚至可以支持跨节点多副本。
3.4.1 基础
以下简要说明以下RDD的基本操作,示例如下:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
其中,第一行是从外部文件加载数据,并创建一个基础RDD。这时候,数据集并没有加载进内存除非有其他操作施加于lines,这时候的lines RDD其实可以说只是一个指向 data.txt 文件的指针。第二行,用lines通过map转换得到一个lineLengths RDD,同样,lineLengths也是懒惰计算的。最后,我们使用 reduce算子计算长度之和,reduce是一个action算子。此时,Spark将会把计算分割为一些小的任务,分别在不同的机器上运行,每台机器上都运行相关的一部分map任务,并在本地进行reduce,并将这些reduce结果都返回给驱动器。 如果我们后续需要重复用到 lineLengths RDD,我们可以增加一行: lineLengths.persist() 这一行加在调用 reduce 之前,则 lineLengths RDD 首次计算后,Spark会将其数据保存到内存中。
3.4.2 给spark传入函数
Spark 的 API 很多都依赖于在驱动程序中向集群传递操作函数。以下是两种建议的实现方式: 1,函数(Anonymous function syntax),这种方式代码量比较少。 2,全局单件中的静态方法。例如,你可以按如下方式定义一个 object MyFunctions 并传递其静态成员函数 MyFunctions.func1:
object MyFunctions {
def func1(s: String): String = { ... }
}
dd.map(MyFunctions.func1)
注意:技术上来说,你也可以传递一个类对象实例上的方法(不是单例对象),不过这会致传递函数的同时,需要把相应的对象也发送到集群中各节点上。例如:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
如果我们创建一个新的MyClass实例,并调用其 doStuff 方法,同时doStuff中的 map算子引用了该MyClass实例上的 func1 方法,那么,整个MyClass对象将被发送到集群中所有节点上。这就类似去写一个rdd.map(x => this.func1(x)). 类似的,如果应用外部对象的成员变量,也会导致对整个对象实例的引用:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
上面的代码对 field 的引用等价于 rdd.map(x => this.field + x),这将导致应用整个this对象。为了避免类似问题,最简单的方式就是,将field复制到一个本地临时变量中,而不是从外部直接访问之,如下:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
3.4.3 理解闭包
Spark里一个比较难的事情就是,理解在整个集群上跨节点执行的变量和方法的作用域以及生命周期。Spark里一个频繁出现的问题就是RDD算子在变量作用域之外修改了其值。下面的例子,我们将会以foreach() 算子为例,来递增一个计数器counter,不过类似的问题在其他算子上也会出现。 3.4.3.1示例: 考虑如下例子,我们将会计算RDD中原生元素的总和,如果不是在同一个 JVM 中执行,其表现将有很大不同。例如,这段代码如果使用Spark本地模式(–master=local[n])运行,和在集群上运行(例如,用spark-submit提交到YARN上)结果完全不同。
Scala
var counter = 0
var rdd = sc.parallelize(data)
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
3.4.3.2,本地模式 VS 集群模式
上面这段代码其行为是没有被定义的,并且可能不能按照预期运行工作。为运行Job,spark将RDD操作的处理分解为多个任务,每个任务在一个exeutor中运行。在执行之前,spark计算任务是闭包的(closuer)。闭包(closuer)是执行器在RDD上执行计算时必须可见的变量和方法(比如:本例中的foreach())。闭包(closure)被序列化,并且发送到每个执行器(executor)中。 上例中,现在发送到每个执行器(executor)的闭包中的变量是副本,因此。当在foreach()函数中引用counter变量时,它已不再是diver节点的计数器(counter)。在driver节点的内存中,仍然存在一个counter变量,但是执行器已经不能再访问它!执行器只能看到序列化闭包中的副本。由于计数器(counter)的所有操作都引用了序列化闭包中的值,计数器(countter)的值最终仍然为0。 在本地模式,某些情况下,foreach算子实际上将和驱动器(driver)在同一个JVM中执行,并且将引用相同的原始计数器(counter),并且可能实际会跟新它。 为了确保在这些场景中能定义良好的行为,应该使用累加器(Accumulator)。累加器(Accumulator)在Spark中专门用于提供一种机制,在集群中跨工作节点执行拆分任务时,安全的更新变量。在本指南的累加器部分(Accumulators)会更加详细的说明。 通常,闭包(closures)结构 类似于循环或局部定义的方法,不应该被用于转换一些全局的状态。Spark没有定义或者保证从闭包(closures)外部引用对象的突变。有一些执行此操作的代码可能在本地模式下工作,但这只是偶然发生的,这样的代码在分布式模式下将不会像预期的那样工作。如果在一些全局计算时,使用累加器(Accumulator)是必须的。
3.4.3.3,打印RDD元素
另一种常用来打印RDD中所有元素的语法是使用rdd.foreach(println) 或者 rdd.map(println)方法。在单台机器上,这将产生预期的结果并打印RDD中所有的元素。然而,在集群模式下,被执行器调用的输出stdout 被正在写入执行器的stdout 取代,而不是驱动器(driver)上的stdout,因此驱动器(driver)上的stdout不会显示输出结果。想要在驱动器(driver)上打印所有的RDD元素,一种方式是使用collect()方法,首先将RDD提取到driver节点,因此可以使用rdd.collect().foreach(println)。这种方式可能导致驱动器(driver)节点的内存溢出,因为collect()算计将整个RDD中的数据收集到回迁到单台机器上。如果你只是需要打印RDD中的一小部分元素,一种更安全的策略是使用take()算子:rdd.take(100).foreach(println)。
3.4.4,使用key-value键值对
虽然,大部分Spark算子都能在包含任何对象类型的RDD上工作,但是有一小部分特殊的算子只能在key-value键值对的RDD上运行。这些算子最常见的应用是在分布式混洗(shuffle)过程,比如对key元素的分组或者聚合。 在scala中,这些算子在包含Tuple2对象(内建于scala语言中,可以这样简单的被创建:(a,b))的RDD上自动可用。Key-value键值对算子在ParRDDFunction类上可用,这个类型也会自动包装到包含tuples的RDD上。 例如:下面的这个例子将reduceByKey算子应用到key-value键值对来计算文件中每行文本出现的次数:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
我们也可以使用counts.sortByKey(),例如:对键值对按字母顺序排列,最终使用counts.collect()将排序结果以对象数组拉取到驱动器(driver)的内存中。 注意:当我们在使用自定义的对象作为key-value键值对算子中的key时,必须确保实现一个自定义的equals()方法带有一个匹配的hashCode()方法。完整详细的描述,请参考Object.hashCode()文档。
3.4.5 转换算子
下列列表描述了一些spark支持的经常使用的算子。详细的说明请参阅RDD API文档和键值对RDD方法文档: Transformation算子 含义 map(func) 返回一个新的分布式数据集,其中每个元素都是由源RDD中一个元素经func转换得到的。
filter(func) 返回一个新的数据集,其中包含的元素来自源RDD中元素经func过滤后(func返回true时才选中)的结果
flatMap(func) 类似于map,但每个输入元素可以映射到0到n个输出元素(所以要求func必须返回一个Seq而不是单个元素)
mapPartitions(func) 类似于map,但基于每个RDD分区(或者数据block)独立运行,所以如果RDD包含元素类型为T,则 func 必须是 Iterator => Iterator 的映射函数。
mapPartitionsWithIndex(func) 类似于 mapPartitions,只是func 多了一个整型的分区索引值,因此如果RDD包含元素类型为T,则 func 必须是 Iterator => Iterator 的映射函数。
sample(withReplacement, fraction, seed) 采样部分(比例取决于 fraction )数据,同时可以指定是否使用回置采样(withReplacement),以及随机数种子(seed)
union(otherDataset) 返回源数据集和参数数据集(otherDataset)的并集
intersection(otherDataset) 返回源数据集和参数数据集(otherDataset)的交集
distinct([numTasks])) 返回对源数据集做元素去重后的新数据集
groupByKey([numTasks]) 只对包含键值对的RDD有效,如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable) 对。注意:如果你需要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以获得更好的性能。注意:默认情况下,输出计算的并行度取决于源RDD的分区个数。当然,你也可以通过设置可选参数 numTasks 来指定并行任务的个数。
reduceByKey(func, [numTasks]) 如果源RDD包含元素类型 (K, V) 对,则该算子也返回包含(K, V) 对的RDD,只不过每个key对应的value是经过func聚合后的结果,而func本身是一个 (V, V) => V 的映射函数。另外,和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
sortByKey([ascending], [numTasks]) 如果源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于 ascending 参数)
join(otherDataset, [numTasks]) 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中将包含内关联后key对应的 (K, (V, W)) 对。外关联(Outer joins)操作请参考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。
cogroup(otherDataset, [numTasks]) 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable, Iterable))。该算子还有个别名:groupWith
cartesian(otherDataset) 如果源RDD包含元素类型 T 且参数RDD(otherDataset)包含元素类型 U,则返回的新RDD包含前二者的笛卡尔积,其元素类型为 (T, U) 对。
pipe(command, [envVars]) 以shell命令行管道处理RDD的每个分区,如:Perl 或者 bash 脚本。RDD中每个元素都将依次写入进程的标准输入(stdin),然后按行输出到标准输出(stdout),每一行输出字符串即成为一个新的RDD元素。
coalesce(numPartitions) 将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。
repartition(numPartitions) 将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。
repartitionAndSortWithinPartitions(partitioner) 根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。
3.4.6 操作算子
以下列表,列举了一些spark支持,经常使用的操作算子。详细的信息请参阅RDD API文档和键值对RDD方法文档。 Action算子 作用 reduce(func) 将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,并且func需要满足 交换律 和 结合律 以便支持并行计算)
collect() 将数据集中所有元素以数组形式返回驱动器(driver)程序。通常用于,在RDD进行了filter或其他过滤操作后,将一个足够小的数据子集返回到驱动器内存中。
count() 返回数据集中元素个数
first() 返回数据集中首个元素(类似于 take(1) )
take(n) 返回数据集中前 n 个元素
takeSample(withReplacement,num, [seed]) 返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。
takeOrdered(n, [ordering]) 按元素排序(可以通过 ordering 自定义排序规则)后,返回前 n 个元素
saveAsTextFile(path) 将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。
saveAsSequenceFile(path)(Java and Scala) 将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等)
saveAsObjectFile(path)(Java and Scala) 将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。
countByKey() 只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。
foreach(func) 在RDD的每个元素上运行 func 函数。通常被用于累加操作,如:更新一个累加器(Accumulator ) 或者 和外部存储系统互操作。
Spark RDD API同时也公开了一些算子的异步版本,比如:foreachde foreachAsync,它会向调用者立即返回一个FutureAction,而不是在操作完成时阻塞。这可用于管理或者等待动作的异步执行。
3.4.7 混洗算子(Shuffle Operations)
Spark中有一些算子能够触发众所周知的shuffle操作。Spark中的shuffle操作是将数据重新分布,以此来实现数据可以跨partition进行分组。Shuffle过程通常需要跨executors和machines之间进行数据拷贝,所以导致shuffle过程是一个复杂并且代价昂贵的操作。
3.4.7.1 背景:
为了很好的理解shuffle过程中发生了什么,可以以reduceByKey操作为例来考虑。reduceByKey算子会生成一个新的RDD,将原RDD中的相同key对应的所有value值组合为一个二元组(tuple):(key,与key关联的所有value值执行reduce函数产生的结果)。这个算子的难点是,对于某个key,它关联的所有value值,并不一定分布在同一个partition中,甚至不在同一台机器上,但是这些value值必须在相同的位置才能计算结果。 在spark中,为了进行某些特性的操作,数据通常分布到所需要的各个partition中。在分析过程中个,一个task任务将操作一个partition的数据。因此,为了组织在单个reduceByKey中的reduce任务中执行所有的数据,Spark需要执行一个all-to-all的操作。这必须从读取Rdd中的所有partition,并从中找到所有key对应的所有value值,并且将每个key对应得 value值放到一起,以便后续计算每个key对应的value的最终结果,这个过程就叫shuffle。 虽然,shuffle过程之后,每个partition中的数据的顺序都是确定的,并且分区自身的顺序也是确定的,但是这些数据的顺序并不确定。如果需要shuffle过程后,分区内的数据元素有序,则可以使用以下方式: 1,mapPartition 为每个partition排序,比如:.sorted; 2,repartitionAndSortWithinPartitions:重分区的同时,高效的对分区排序; 3,sortBy:对RDD元素进行全局排序; 能导致shuffle过程的操作有:重分区算子类(repartition),比如:reparation算子,和coalesce。‘Bykey’类算子(除了counting)如:groupByKey和reduceByKey。以及join类算子:比如,cogroup和join。
3.4.7.2性能影响
Shuffle过程是一个分厂昂贵的操作,因为它包括磁盘I/O,数据序列化,以及网络I/O。为了组织好shuffle过程的数据,spark需要生成task任务集,一系列map任务去组织数据,一系列reduce任务去计算数据。这些命名来源于MapReduce,和spark的map和reduce算子没有直接关系。 另外,单独的map task任务的输出结果尽量保存在内存中,一直到内存方不下。然后,这些数据会根据目标partition进行排序,写入一个文件。在reduce过程中,task任务读取与之紧密相关的已经排好序的blocks。 某些shuffle算子会导致非常明显的堆内存增长,因为这些算子在传输数据前后,会在内存中维护组织数据记录的各种数据结构。特别的,reduceByKey和aggregateByKey算子会在map端创建这些数据结构,而‘byKey’系列的算子会在reduce端创建数据结构。当数据在内存无法存储时,spark会将这些数据落盘存储到磁盘上,当然这会导致额外的磁盘I/O和垃圾回收的开销。 Shuffle操作还会在磁盘上产生大量的临时文件。比如在Spark1.3版本中,这些文件将一直保留到与其对应的RDD不在使用且被垃圾回收之后才会被删除。这么做的原因是,如果该RDD的血统信息(即:生成RDD的父RDD以及其爷爷RDD,统称为RDD的血统)被重新计算时,则不需要重新生成这些shuffle文件。如果应用程序保留这些RDD的引用或者GC启动频率较低,那么垃圾回收只能隔很长一段时间才能发生。这就意味着长时间运行的Spark job会占用大量的磁盘空间。Spark的临时存储目录是由spark.local.dir 配置参数指定,在初始化Spark contex时。 Shuffle操作受到大量参数控制,详细的参数信息请参考Spark Configuration Guide.
3.5 RDD 持久化
Spark中其中一个最重要的能力就是持久化(persisting或者 caching)数据集到内存中,从而可以在跨操作之间可以复用这些数据。当一个RDD被持久化后,那么每个节点会将该节点计算的任何partition存储到内存中,并且在该数据集(或者由该数据集衍生出的其他数据集)上的其他操作中复用partition中的数据。这使得后续的操作更快(通常能提升10倍)。因此,缓存对于迭代算法和快速交互分析是一个非常有用的工具。 可以使用persisit()或者cache()方法来标记一个需要持久化的RDD。在该RDD被一个action算子第一个计算时,该RDD就会被缓存到该节点的内存中。Spark的缓存具有容错功能-如果RDD中的任何一个partition数据丢失,那么spark会根据其血统信息自动重新计算。 另外,每个被持久化的RDD可以使用不同的存储级别。例如,你可以将数据集存储到磁盘,或者以Java 序列化对象(为了节省空间)保存到内存中,或者跨节点多副本存储。这些存储等级的设置通过给persist方法传递参数StorageLevel。Cache()方法本身是一个使用默认存储级别的快捷方式,默认存储等级是StorageLevel.MEMORY_ONLY(以未序列化的java对象存储在内存中),所有存储等级如下: Storage Level Meaning MEMORY_ONLY RDD以未序列化的java对象存储在JVM中,如果一个RDD未能全部存储到内存中,那么一部分分区中的数据将不会被缓存,而是在需要的时候从新计算,这是默认的缓存方式
MEMORY_AND_DISK 以未序列化后的java对象存储在JVM中,如果RDD中的数据不能再内存中全部存储,那么部分partition数据存储到磁盘,在需要的时候,从磁盘中读取。
MEMORY_ONLY_SER 以序列化的java对象存储RDD(每个分区中以字节数组存储)。通常这种方式比未序列化对象更节省空间,特别是使用更快的序列化方式,但是在的去的时候,更加消耗CPU MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER方式类似,不同的是当分区的数据在内存中无法全部存储时,将该部分数据存储到磁盘上,而不是每次需要的时候从新计算。
DISK_ONLY 将RDD中的数据全部存储到磁盘中。 MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc 和上面存储默认类似,但是每个分区的只在集群中的俩个节点保留副本。
OFF_HEAP (experimental) 和MEMPRY_ONLY_SER类似。只是将数据存储到堆外内存。这里需要堆外空间来存储。
注意:在python中,通常使用Pickle library方式来序列化存储对象,所以是否选择序列化级别无关紧要。在python中可用的存储级别有:MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY and DISK_ONLY_2。
Spark中,自动会将shuffle过程中产生的中间数据缓存(比如:reduceBykey),甚至需要用户调用persisit()方法。这样做是为了避免在shuffle过程中一个节点失败而重新计算数据。当然,还是建议用户在对需要重复使用的结果RDD调用persist()方法。
3.5.1 选择哪种缓存级别
Spark的缓存等级主要在于在内存使用和CPU之间做一些权衡。我们建议使用以下步骤来选择一种合适的存储级别: 1,如果RDD能够使用默认的存储方式(MEMORY_ONLY),那么尽量使用默认的存储方式,这样是CPU效率最高的方式,可以尽可能快速的执行spark算子; 2,如果默认存储级别不能满足需求,那么尽量使用MEMOTY_ONLY_SER并且选择一个搞笑的序列化协议来尽量节省数据的存储空间,同时执行速度也不错。 3,尽量不要将数据存储到磁盘上,除非重新计算数据的代价非常大,或者需要过滤巨大的数据。否则的话,从新计算一个分区和从磁盘上读取数据速度差不多。 4,如果需要支持容错,那么可以使用带副本的缓存级别(比如:使用spark来服务web请求)。所有的存储等级都通过从新计算丢失的数据来提供了完全容错机制,但是副本机制可以是task任务继续运行而不需要等待重新计算丢失的分区数据。
3.5.2 删除数据
Spark会自动监控每个节点上缓存数据的使用率,并且会使用最近最少使用(LRU)方式将旧数据删除,如果你想手动将RDD中的数据移除内存来取代等待数据自动从内存中移除,那么可以使用RDD.unpersist()方法。
3.6 共享变量
通常,当我们给Spark算子(比如:map或者reduce)传递一个函数时,这些函数将会在远程的集群节点上运行,并且这些函数引用的变量都是各个节点上独自的副本。这些变量被复制到每台机器上,并且对远程机器上变量的任何更新都不会传播回driver程序。通常来说,跨task任务读写共享变量的效率非常低下。但是,spark提供了俩种比较通用的共享变量:广播变量(broadcast variables)和累加器(accumulators)
3.6.1 广播变量
广播变量允许编程人员在每台机器上缓存一份只读变量,而不是将副本随着task任务一起分发。广播变量可以被用来:比如,在每个节点缓存一个大型输入数据集的副本,这是一种比较高效的方式。Spark还尝试使用高效的广播算法来分配广播变量以降低通信成本。 Spark的操作会由一组stage 来操作,stage之间被‘shuffle’过程分割开来。Spark会自动广播在每个stage中的task任务中需要使用的公共的数据。被广播的数据被序列化后缓存,然后在task任务使用之前反序列化。这意味着,只有在跨多个stage中的task任务之间需要使用共同的数据,或者以序列化和反序列化方式缓存数据的时候,显示的创建广播变量才是必须的。 广播变量可以通过调用SparkContext.broadcast(v)的方式来将变量v广播。广播变量是对变量v的包装,或者它的value值时,通过调用value方法。具体的方式如下:
Scala:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
在广播变量被创建之后,集群中的任何方法都不应该再使用原始的v数据,这样才能避免数据v被传输多次。另外,对象v被广播之后不应该再被更新,这样才能确保每个阶段拿到相同的广播值。(比如:如果变量被更新后,一个新的节点拿更新后的值,那么该节点与其他节点v的值将不一样)。
3.6.2 累加器(Accumulators)
累加器是只通过关联和交换操作累计“添加”的变量,因此可以高效的支持并行计算。它可以被用来实现累加器(比如:在MapReduce中)或者求总和。Spark原生支持数字类型的累加器,并且开发者可以添加新的类型。 作为开发者,即可以创建有个命名的累加器,也可以创建一个未命名的累加器。正如下图所示,一个命名的累加器(在被初始化为counter)将会Web UI上展示每个stage中修改累加器。Spark在“Tasks”任务表中显示每个task任务修改累加器的值。
在UI上追踪累加器的执行,对理解程序在每个stage中运行是非常有帮助的。(注意:这种方式并不支持Python) 一个数字类型的累加器可以通过调用SparkContext.longAccumulator() 或者 SparkContext.doubleAccumulator()方法分别去做Long类型和Double类型的累加值。Task任务运行在集群中,可以使用add方法做累加。但是,executor程序不能去读取累加器的值,只有driver程序可以读取累加器的value值,通过使用它的value方法。 下面的示例展示了累加器用来累加一个数组元素的值。
Scala:
scala> val accum = sc.longAccumulator("My Accumulator")accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))...10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.valueres2: Long = 10
当然,以上代码支持了Long类型的累加器,开发者也可以创建自己的类型,通过继承AccumulatorV2。AccumulatorV2抽象类有一系列方法用来被子类重写,方法:reset 方法来将累加器的值重新设置为0;add方法用来给累加器增加另一个值;merge方法用来将其他相同类型的累加器合并到一个。其他需要必须被重写的方法在API 文档中有详细讲解。比如:假设我们有一个MyVector类提供了数学的verctors,我们可以这样写:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...}
**注意:**当开发着定义了自己的AccumulatorV2类型,那么结果类型可以与添加的元素类型不同。 对于只在action算子内部执行跟新的累加器,spark保证每个task任务对累加器的跟新将只执行一次,即:重新执行task任务将不会再更新累加器的值。在转换(transformations)算子中,用户应该能意识到,如果tasks或者job中的stage被重新执行时,每个task任务中对累加器的更新操作将不仅执行一次。 累加器并不会改变Spark的懒运算模型。如果累加器的值在一个RDD的操作中被更新,那么RDD作为action的一部分被计算时,累加器的value值将只被更新一次。因此,当在一个懒加载执行的转换算子比如map()中,累加器的更新不能保证被执行。 下面的示例证明该属性:
Scala
val accum = sc.longAccumulatordata.map { x => accum.add(x); x }
|