Spark 复习
更新至第四章
第二章
1、Spark的主要组成部分和各部分的作用
SparkSql ->SQL及时查询(满足交互式查询分析需求)
SparkStreaming -> 实时流式计算(满足流计算需求)
Spark Mllib -> 机器学习
Spark Graphx -> 图计算
2、Spark架构
名词解释:
- DAG:有向无环图,反映RDD之间的依赖关系
- Executor:运行在工作节点上的一个进程,负责运行任务,并为应用程序存储数据
- Task:Executord 的工作单元
- Job:一个作业包含多个RDD和相应RDD上的各种操作
- Stage:作业的基本调度单位,一个作业会分为多组任务,每个任务称为阶段,也称为任务集
简述Spark架构模型:
- Spark采用主从架构,一个Master,任务控制节点Driver Program简称为Driver
- 多个Worker,运行作业任务的工作节点Worker Node,以及每个WN上负责具体任务的执行进程Executor
- 协调两者的是集群资源(CPU,Memory,带宽)管理器,Cluster Manager 可以是Yarn,也可以是Mesos
简述Spark 各种应用程序,作业,任务,阶段 间的关系
- 一个Spark应用程序有一个任务控制节点Driver和若干个作业Job构成,一个作业有多个阶段构成,一个阶段又由多个任务构成
- 每执行一个application 时,Driver 会像Cluster Manager 申请资源,启动Executor 进程,然后向Executor 发送应用程序和文件,然后再Executor上执行任务,任务结束后任务结果写回任务控制节点或写到HDFS或其他的数据库中
3、Spark运行流程
应用程序提交后首先创建基本运行环境
- Driver 创建SparkContext对象
- SparkContext作为application 连接集群的通道,负责与ClusterManager通信并进行资源申请,任务分配和监控,此时SC与CM注册并申请运行Executor 的资源
- CM为Executor 分配资源,并启动Executor 进程,同时Executor 运行情况随心跳发送给资源管理器
- SC根据RDD依赖关系构建DAG图,并将DAG图发送给DAGScheduler(DAG调度器) 进行解析,将DAG分为多个阶段,计算出各个阶段间的依赖关系,并将每个任务集提交给底层的TaskScheduler(任务调度器)进行处理
- Executor 向SC申请任务,任务调度器将任务分发给Executor 运行,同时sc将程序代码发给Executor
- Executor 将执行结果反馈给TaskScheduler,然后反馈给DAGScheduler,运行完毕写入数据后释放资源
Executor 的优点:
- 程序运行时Executor利用多线程运行任务,减少多进程任务频繁的启动开销
- Executor 有一个BlockManager 模块,将内存和磁盘共同作为存储设备(内存不够时才会使用磁盘),需要多轮迭代中间结果会写到磁盘中,下次需要则直接读取BM中的数据,而不需要写入HDFS中,减少了网络通信和IO开销
DAGScheduler的作用:
- 以shuffle 为边界切割stages
- 基于stages创建TaskSets,并将TaskSets提交给TaskScheduler 请求调度
3、RDD
RDD的相关概念
- RDD本质上是一个只读的分区记录集合
- RDD的不同分区可以保存到集群的不同节点上,从而可以在集群的不同节点上进行并行计算
- RDD高度受限的共享内存模型指的是RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集来创建RDD或者通过其他RDD上执行缺点的转换操作而创建得到新的RDD
- RDD提供的转换操作都是粗粒度的数据转换操作,而不针对某个数据项进行细粒度修改,
RDD的惰性机制
RDD的执行过程中,在转换过程中并不真正的发生计算生成RDD,而是记录转换的轨迹,也就是RDD之间的生成的依赖关系,也就是DAG图,当遇到第一个动作类型的操作后才会进行从头到尾的计算
RDD的血缘关系
一系列的转换和动作操作生成的RDD之间形成了血缘关系,而通过血缘关系连接起来的结果不需要通过磁盘来保存中间数据,而是直接通过管道式流入下一个操作进行处理,避免多次转换操作之间数据同步的等待
RDD的特性及其原因
- 高容错性:
- 常见的容错机制:数据复制和日志记录
- RDD只读,不可修改,如果要修改必须生成新的RDD建立血缘关系(天然容错),不需要通过数据冗余的方式实现容错,只需要通过依赖关系通过计算得到丢失分区的数据,而不需要回滚整个系统
- RDD只提供粗粒度的操作,rdd依赖只需要记录这种粗粒度的操作而不需要记录具体的的数据和各种细粒度操作的日志,降低容错开销
- 中间结果存储在内存:RDD转换间的中间数据是持久化在内存上而不是磁盘上,所以可以减少IO开销
- 存放的数据可以是Java对象,减少了不必要的对象序列化和反序列化的开销
RDD的依赖关系
分类:
- 宽依赖,不可以进行流水线优化
- 存在shuffle操作(数据分发的过程,存在IO和网络开销)
- 多对一
- 包含shuffle操作
- 窄依赖,可以进行流水线优化
- 不存在shuffle操作
- 一对多
- 一对一
- 不包含shuffle 操作
流水线优化:
每个RDD操作其实都可以视为是fork/join模式,也就是同一个任务中各个部分并行进行计算后将每个部分的结果合并起来然后进入下一个阶段的模式
那么对于每一个RDD的准换过程可以视为不停的进行for,join,fork,join,这就意味着每一次的转换的join必须等待最慢的节点执行完才能进行下一步的操作,其次join操作本身也是需要下一阶段去同步等待的,如果这些操作都是窄依赖的话,那么不同分区之间的数据变化并不会影响到其他分区
这样就没有必要在每一个过程中都进行数据的合并,可以将若干的fork /join 合并成一个fork/join,这个过程称之为流水线优化
描述下面图中Stage 的划分过程:
核心:DAG回溯算法
过程:
- RDD-G触发了写入操作,所以Spark内核首先会从触发Action操作的RDD开始从后向前进行回溯
- 先为RDD-G创建一个Stage3
- 向前回溯过程中首先发现RDD-F到RDD-G的过程是宽依赖,所以为RDD-F创建一个新的Stage2
- 因为RDD-D -> RDD-F 及RDD-E->RDD-F 均为窄依赖,所以DFG都为同一个Stage2
- 因为RDD-C -> RDD-D为窄依赖,所以CDRF均为一个Stage2
- 因为RDD-B -> RDD-G 为窄依赖,所以BG为同一Stage3
- 因为RDD-A -> RDD-B 为宽依赖,所以为RDD-A创建新的Stage1
RDD的运行过程:
- 创建RDD对象
- SC负责计算RDD之间的依赖关系,并创建DAG图,提交给DAGScheduler
- DAGScheduler 把DAG图分解为多个阶段,每个阶段包含多个任务,每个任务被TaskScheduler 分发给各个工作阶段上的Executor 进程去执行
4、Spark 部署模式
- Local 单机模式
- Standalone 模式(集群)
- Spark Mesos模式(集群)
- Spark Yarn模式
第四章
Hadoop is good
Spark is fast
Spark is better
1、创建RDD
从文件中创建
lines = sc.textFile("file:///home/user/code/py/temp.txt")
lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
lines = sc.textFile("/user/hadoop/word.txt")
lines = sc.textFile("word.txt")
读取数据后每一行都是RDD中的一个元素
通过集合创建
array = [1,2,3,4,5]
rdd = sc.paralleize(array)
2、transform API
filter(func)
- func = true 则输出
- func = false 则丢弃
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///home/user/code/py/temp.txt")
filterTest = lines.filter(lambda line: "Spark" in line)
filterTest.foreach(print)
map(func)
对RDD中的每个元素进行操作
eg1:
lines = sc.parallelize([1,2,3,4,5])
nRdd = lines.map(lambda x: x+10)
nRdd.foreach(print)
eg2:
flatMap(func)
lines = sc.textFile("file:home/user/code/py/testFlatMap.txt")
words = lines.flatMap(lambda line:line.split(" "))
words.foreach(print)
若对RDD中每个元素进行Map操作后中RDD中的元素是集合对象,则flatMap会在此基础上将集合中的每个元素取出来,并作为新的元素置于RDD中
groupByKey()
将RDD中具有相同key的元素合并起来,这些key 对应的值归并成一个集合对象
这个结合对象的类型是ResultIterable类型
reduceByKey(func)
对groupByKey 的结果进行进一步的func 计算,提供的func函数用于实现对valueList 的 reduce 操作
lambda a,b: a+b
每次运算的中间结果会赋值给下一次运算的a,然后读取下一个数作为b
3、Action API
count()
返回RDD中元素的个数
collect()
以数组的形式返回数据集中的所有元素
在Driver 节点运行,所以有网络传输开销
first()
返回数据集中的第一个元素
take(n)
以数组的形式返回数据集中的前n个元素
reduce(func)
通过函数func聚合数据集中的元素
foreach(func)
将数据集中的每个元素传递给func函数作为参数运行
在WorkerNode 中运行,也就是在集群中运行,没有网络开销
4、持久化API
- 对于Spark 来说持久化操作是持久化到内存中的,只有当内存不够用的时候才会将数据持久化到硬盘中
- 持久化操作同理也是一种非action操作,所以仍然符合惰性机制
- 被持久化的RDD往往是在迭代运算中需要经常重复使用的中间结果
- persist() 对一个RDD标记为持久化
- MEMORY_ONLY作为参数
- MEMORY_AND_DISK 内存不足存放磁盘
- unpersist() 将RDD从内存中消除掉
5、指定分区
分区个数
local模式下RDD分区个数 = 集群中CPU核心数
standalone和yarn 模式,默认值是集群中所有CPU数目的总和和2之间取较大值
也可以自定义指定:
sc.textFile(path,partitionNum)
sc.parallelize(array,partitionNum)
对于已经生成的RDD重新制定分区:
rdd.repartition(n)
显示分区个数
len(rdd.glom().collect())
分区方法
自定义分区方法
from pyspark import SparkConf,SparkContext
def MyPartitioner(key):
print("myPartitioner is Running")
print("the key is %d" %key)
return key %10
def main():
print("The main function is Running")
conf = SparkConf().setMaster("local").setAppName("My APP")
sc = SparkContext(conf = conf)
data = sc.parallelize(range(10),5)
data.map(lambda x : (x,1))\
.partitionBy(10,MyPartitioner)\
.map(lambda x :x [0])\
.saveAsTextFile("file:///home/user/code/py/temp")
if __name__ == '__main__':
有多少个分区,最终数据就会写到多少个文件当中
6、键对RDD
创建:
pairRDD = lins.flatMap(lambd line : lin.split(" ")).map(lambda word : (word,1))
pariRDD = sc.paralleize(list).map(lambda word : (word,1))
reduceByKey(func)
聚合运算
先对具有相同键的键值对汇总成一个列表,然后对list value 进行汇总聚合
groupByKey(func)
将Key 相同的键值对归并为,key:值的列表
keys()
生成新的RDD,每个元素都是原RDD的键
values()
将旧RDD的value 返回生成一个新的RDD
sortByKey()
默认情况下是对Key进行升序排序,参数默认是True,若修改参数为False 则进行降序排序
key必须是可比较的对象
sortBy()
sortBy 则可以指定根据什么进行排序,传入两个参数,第一个是一个lambda 表达式,指明排序根据的对象,第二个参数指明升序或者降序排序的方式
lambda x : x 指定排序对象是键值对本身,则默认根据键进行排序
lambda x : x[0] 指定排序对象是键
lambda x : x[1] 指定排序对象是值
combinByKey()
方案一是计算各个分组内的和然后求平均,但是如果某一节点valueList 特别大,就会导致这个节点的压力特别大,所以这种方式可行,但是不是最优选项
- 参数一func函数:对取出的第一个key,value 的value 进行func 操作
- 参数二func函数:分区内的计算规则,通过参数1的func生成的value 与除第一个之外的元素进行相关的合并操作的函数
- 参数三:分区间计算规则,分区间的value 进行func 的合并操作
在这个例子中参数1 将每个分区的第一个key,value 的value 转换为了(value,1)的形式
参数二是同一分区中(这个分区中key是相同的)进行操作的方式,acc 是上面的(value,1)而income 是原来的第二个键值对的key,value 的value值
参数三就是不同的分区间,这时各个分区的**key是还是相同的**,也就是相同key 的分区之间的操作,(key,(value1+value2,number1+number2))然后再进行一个map操作,(key,(value1+value2,【value1+value2]/[number1+number2])
mapValues(func)
将func 作用到每个键值对的value上
join()
作用对象是两个RDD
只有两个RDD中 元素存在相同的key 的时候才会将value归并并且输出,没有相同key 的则不输出
7、几个考察案例
求top值
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("FindTop")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///home/user/code/py/file")
result1 = lines.filter(lambda line: (len(line.strip())>0) /
and /
(len(line.split(",")) == 4)
)
result2 = result1.map(lambda line : line.split(",")[2])
result3 = result2.map(lambda x : (int(x),""))
result4 = result3.repartition(1)
result5 = result4.sortByKey(false)
result6 = result5.map(lambda x : x[0])
result7 = result6.take(5)
文件排序
from pyspark import SparkContext,SparkConf
index = 0
def getIndex():
global getIndex
index += 1
return index
def main():
conf = SparkConf().setMaster("local").setAppName("sortFile")
sc = SparkContext(conf =conf)
lines = sc.textFile("file:///home/user/code/py/file")
resut1 = lines.filter(lambda line : len(line.strip()) > 0)
result2 = result1.map(lambda x : (int(x.strip()),1))
result3 = result2.repartition(1)
result4 = result3.sortByKey(True)
result5 = result4.map(lambda x : (getIndex(),x[0]))
result5.foreach(print)
result5.saveAsTextFile("file:///home/user/py/code/py/fileSort")
if __name__ == '__main__':
main()
二次排序
from pyspark import SparkContext,SparkConf
from operator import gt
class SecondarySortKey():
def __init__(self,k):
self.column1 = k[0]
self.column2 = k[1]
def __gt__(self,other)
if other.column1 == self.column1:
return gt(self.column2,other.column2)
else:
return gt(self.column1,other.column1)
def main():
conf = SparkConf().setMaster("local").setAppName("sortFile")
sc = SparkContext(conf =conf)
rdd1 = sc.textFile("file:///home/user/code/py/file")
rdd2 = rdd1.filter(lambda x : (len(x.strip() > 0)))
rdd3 = rdd2.map(lambda x : ((int(x.split("")[0]),int(x.split("")[1])),x))
rdd4 = rdd3.map(lambda x : (SecondarySortKey(x[0]),x[1]))
rdd5 = rdd4.sortByKey(False)
rdd6 = rdd5.map(lambda x : x[1])
if __name__ == '__main__':
main()
8、文件读写
- sc.textFile(“path”)
- sc.saveAsTextFile(“文件夹名,将所有分区内容写到该文件夹中”)
9、读写Hbase
第五章
1、SparkSession
from pyspark import SparkContext,SparkConf
from pySpark import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
在pyspark 交互式环境中默认提供SparkSession(spark)和SparkContext(sc)
使用SparkSession 接口替代SQLContext
从不同的数据源中加载数据,并把数据生成DataFrame,支持把DataFrame 转换成SQLContext 自身中的表,然后使用SQL语句来操作数据
2、DataFrame
创建:
spark.read.txt("people.txt")
spark.read.format("text").load("people.txt")
spark.read.json("people.json")
spark.read.format("json").load("people.json")
spark.read.parquet("people.parquet")
spark.read.format("parquet").load("people.parquet")
保存:
df.write.txt("people.txt")
df.write.format("text").save("people.txt")
df.write.json("people.json")
df.write.format("json").save("people.json")
df.write.parquet("people.parquet")
df.write.format("parquet").save("people.parquet")
常用操作:
printSchema()
打印dataFrame 的模式,也就是表有什么字段,分别是什么类型
select(condition)
原sql中的表名.列名表示为:df[“列名”]
传入一个condition 对表进行查询
filter(condition)
condition为true 则留下,false 则滤去
groupBy(列名)
根据某一列进行分组合并,往往和action函数组合在一起
sort()
根据某一列进行排序,若传入多列则进行层次排序
先按age进行排序,然后age相同则按name进行排序
desc为降序排序
asc 为升序排序
withColumn
withColumnRenamed
drop
join
where
show
显示出查询结果
rdd->DataFrame
利用反射机制推断rdd模式
已知数据格式
from pyspark import SparkContext,SparkConf,SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
people = spark.sparkContext.textFile("file:///user/local/people.txt")/
.map(lambda line : line.split(","))
.map(lambda p : Row(name = p[0],age=int(p[1])))
schemaPeople = spark.createDataFrame(people)
people.createOrReplaceTempView("people")
personDF = spark.sql("select name,age from people where age >20")
personRDD = personDF.rdd.map(lambda p :"Name:"+p.name+","+"Age:"+str(p.age))
personRDD.foreach(print)
利用编程方式
from pyspark import SparkContext,SparkConf,SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
schemaString = "name age"
fields = [StructField(field_name,StringType,True)] for field_name in /
schemaString.split(" ")
schema = StructType(fields)
lines = spark.sparkContext.textFile("file:///user/local/people.txt")
parts = lines.map(lambda x : x.split(","))
people = parts.map(lambda p: Row(p[0],p[1].strip()))
schemaPeople = spark.createDataFrame(people.schema)
|