一、Spark对RDD的计算原理
RDD是弹性分布式数据集,即Spark对数据的核心抽象,Spark对数据的操作其实就是对RDD的创建、转换、计算。每个RDD都会分区,这些分区会被Spark自动分发到集群上去并行操作。只有第一次对RDD的计算发生时,Spark才会对RDD进行并行执行,在创建转换的环节先不予操作,这样可以只计算RDD的结果而不是将所有数据先保存再遍历计算,可以节省空间,提高处理速度。 RDD会在每次被处理时进行重新计算,保留最后一次处理的结果数据。如果需要对同一个RDD进行重复计算,可以用==RDD.persist()==将此RDD进行保存,RDD.unpersist()可以将不用的分区移除。
Spark程序的工作方式:
- 外部导入文件创建为RDD
- 使用filter、map等将RDD转换为新的RDD
- 将需要重用的RDD保存起来
- RDD的首次计算发生时,Spark开始对计算进行优化然后并行执行
val lines=sc.textFile("README.md")
val test=sc.parallelize([1,2,3])
val line_python=lines.filter(line => line.contains("python"))
val line_python.persist
println(line_python.count())
line_python.take(10).foreach(println)
line_python.collect()
line_python.saveAsTextFile()
二、向Spark传递函数
Python、Scala通过向Spark传递函数来操作RDD,Scala传递的函数及函数引用的数据必须是可序列化的,否则就会报错NotSerializableException。当传递的函数包含某一对象时,会把整个对象都引用;而当函数所在对象是某个对象中的一部分时,会把整个对象全部发到工作节点上,这会大大影响计算性能。所以此时需要先把函数所在对象定义为局部变量,然后传递这个局部变量就好。
Python为例
errors=rdd.filter(lambda x:"error" in x)
def fun(s):
return "error" in s
errors=rdd.filter(fun)
def fun(rdd,test);
return rdd.filter(lambda x:test.query in x)#整个test都会被发到工作节点上
def fun(rdd,test):
query=test.query
return rdd.filter(lambda x:query in x)
函数
1、RDD转化函数
2、RDD计算函数
rdd.aggregate()详解 这个函数可以返回与RDD中元素类型不同的值。以初始值为基数,通过函数1与RDD中的每个元素进行计算(相当于与各节点上的分区数据进行计算),每次计算的结果通过函数2进行合并(合并所有节点的结果),从而实现累积。
- Python:rdd.aggregate((初始值value),(函数1,(函数2)))
a=sc.parallelize([1,2,3,3])
a_sumcount=a.aggregate((0,0),(lambda value,i:(value[0]+i,value[1]+1),(lambda x,y:(x[0]+y[0],x[1]+y[1]))))
a_avg=a_sumcount[0]/a_sumcount[1]
print(a_avg)
- Scala:rdd.aggregate(初始值value)(函数1,函数2)
val rdd=sc.parallelize([1,2,3,3])
val rdd_sumcount=rdd.aggregate((0,0))((value,i)=>(value._1+i,value._2+1),(x,y)=>(x._1+y._1,x._2+y._2))
val rdd_avg=rdd_sumcount._1/rdd_sumcount._2
3、键值对RDD函数
(上文展示的RDD的函数基本也都适用与键值对RDD) rdd.combineByKey()详解 rdd.combineByKey()这个函数也可以返回与RDD中元素类型不同的值 rdd.combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)
- 给每个分区首次出现的每个键对应的值赋一个初始值createCombiner
- 以初始值为基数,通过mergeValue与各自分区中相同键对应的值进行计算
- 每个分区计算的结果通过mergeCombiners进行合并,从而实现累积
- partitioner可以设置分区数量,默认为Spark自动划分
Scala为例:
val rdd=sc.parallelize([1,2,3,3])
val rdd_avg=rdd.combineByKey((v)=>(v,1),(v,i)=>(v._1+i,v._2+1),(x,y)=>(x._1+y._1,x._2+y._2),2).map{case (x,y)=>(x,y._1/y._2)}
rdd_avg.collectAsMap().map(println(_))
三、并行优化
Spark会根据集群的大小自动推断分区数量,但是我们可以根据实际情况来自定义分区数量,从而减少并行度,优化性能。大部分操作函数都可以在最后设置一个参数partitioner来指定分区数。 当我们不通过操作函数来分区,可以直接对文件进行分区。
查看RDD的分区数:
- scala:rdd.partitions.size()
- python:rdd.getNumPartitions()
设置分区数:
- rdd.repartition()
- rdd.coalesce()
|