spark程序与pyspark交互流程
交互的流程图
说明
- 以提交到yarn集群为例:部署方式为 client
1- 由执行的spark-submit脚本提交任务,会在当前这个节点根据提交的信息启动一个Driver程序, 由这个Driver程序向yarn的主节点提交任务操作,yarn会认为这个任务启动一个applicationmaster程序,后续与任务相关的操作都找这个applicationmaster程序即可(任意节点的都可以启动和Driver不在一个节点)
2- applicationmaster会根据executor资源信息行yarn的主节点申请源,用于启动executor
3- 当applicationmaster拿到用于启动executor的资源后,通知相应的从节点启动executor执行器即可,当对应节点的执行器启动后,反向响应给Driver程序
4- Driver程序就会开始进行任务的执行流程图生成,此时会产生一个DAG的有向无环图,执行各个stage节点,以及划分区数的操作
4.1:先根据py4j读取spark程序中的代码,创建 sparkcontext对象
4.2:检查后续一共使用到哪些算子,再根据这些算子划分stage阶段(每个节点划分为几个区,对应有几个线程执行),并生成DAG执行流程图
4.3:通知executor进行执行对应的任务操作即可
4.4:监控各个executor执行进度,等待执行完成,关闭sparkcontext对象,释放资源,通知applicationmaster已经执行完成
5- executor接收到Driver程序分配任务后, 开始运行执行任务即可, 如果任务的结果需要返回给Driver, 此时将结果数据返回即可, 如果不需要, 直接输出操作, 那么Task程序就直接将结果输出即可
6- AppMaster收到Driver程序处理完成的信息后, 通知yarn的主节点, 任务执行完成, 回收资源并关闭整个任务
- 以提交到yarn集群为例:部署方式为 cluster
跟部署方式为 client相对比
第1- 步不同:由执行的spark-submit脚本提交的任务给 resourcemanager(yarn的主节点),yarn会为这个任务启动一个applicationMaster程序,后续与任务相关的操作都找这个applicationMaster即可 (任意某个从节点)
第2- 步不同:appMaster会根据任务的信息分别启动Driver程序(让自己同时升级为Driver程序),然后根据executor资源信息向yarn主节点申请资源, 用于启动executor
- 以提交到spark集群为例 部署方式为 client
1- 由执行spark-submit脚本提交任务, 会在当前这个节点根据提交信息启动一个Driver程序, Driver程序会根据资源信息,向Master申请资源, 用于启动executor
2- 当Driver拿到用于启动executor的资源后, 通知相对应的从节点启动executor执行器即可, 当对应节点执行器启动后, 反向响应给Driver程序, 已经启动好了
3- Driver程序就会开始进行任务的执行流程图生成. 此时就会产生DAG执行流程图, 划分各个执行stage节点, 以及划分分区数等操作:
3.1:首先根据py4j 读取spark程序中代码,创建sparkContext对象
3.2:检查后续一共使用到那些算子, 根据这些算子划分stage阶段(每个节点划分为几个分区, 对应有几个线程执行), 并生成DAG执行流程图
3.3:通知各个executor进行执行对应任务操作即可
3.4:监控各个executor执行进度, 等待执行完成, 关闭sparkContext对象, 释放资源, 通知Master , 任务已经执行完成
4- executor接收到Driver程序分配任务后, 开始运行执行任务即可, 如果任务的结果需要返回给Driver, 此时将结果数据返回即可, 如果不需要, 直接输出操作, 那么Task程序就直接将结果输出即可
5- master收到任务处理完成的信息后, 回收资源即可
- 以提交到spark集群为例 部署方式为 cluster
与部署方式为 client对比
第1- 步不同:由执行spark-submit脚本提交任务到 spark集群的Master节点, Master会根据提交信息, 在某一个worker节点上, 启动一个Driver程序
。。。。。。。。。。
剩下的步骤相同
spark-submit想关的参数
spark-submit.sh脚本的作用
--master :用于指定提交到那个资源调度平台 (可选择: local| spark | yarn ....)
-- deploy-mode: 用于指定提交部署方式(可选择: client 和 cluster)
--conf : 用于设置相关配置信息
python-file : 指定spark的python脚本
args: 添加程序入口参数, 如果没有参数是可以不配置的
spark-core的内容(核心部分)
RDD的基本介绍
1)在早期的计算模型: 单机模型
比如: pandas , mysql
依赖于单个节点的性能
适用于: 少量数据集统计分析的处理
在计算过程中,数据都是在一个进程中的,不断地进行迭代计算操作
2)当数据量大了以后, 单机的这种计算的模式就无法支撑了,此时需要分布式的计算的模型
核心:让多个节点参与计算, 将计算任务进行划分, 将这个部分交给各个节点进行运行, 运算后, 将结果进行汇总
比如:MapReduce, spark
MapReduce计算的模型:
在计算过程中, 每一个MR都是有两部分组成: map 和 reduce,在计算过程中, 需要将数据从磁盘读取内存中, 从内存落入磁盘, 再从磁盘读取到内存中, 这样导致整个IO变大, 不断的与磁盘进行交互, 整个执行效率 也是比较低的
由于一个MR只有map和reduce节点, map进行分布式计算, 计算reduce汇总统计, 如果需要进行多次的分布式计算和多次聚合统计(迭代计算), 对于MR来说, 必须使用多个MR进行串行执行了,而这样的执行导致每一个MR都需要重新申请资源, 回收资源, 大量的时候都消耗在了资源申请和回收上, 而且这种操作中间结果只能保存在磁盘中, 导致效率比较低
正因为有了这个MR问题后, 此时想办法解决问题,解决问题思路:
1) 是否可以让中间结果都保存在内存中, 这样效率是不是就比较高了
2) 是否可以在一个程序中完成多次的不断迭代计算操作
什么是RDD
RDD:弹性的分布式数据集
RDD的目的:主要用于支持更加高效的迭代计算
注:1,2,3是必须有的 4,5可选
1- 可以分区的:每一个分区对应了一个task线程
2- 计算函数: 对每一个分区进行计算操作
3- 存在依赖关系
4- 对于k-v数据存在分区计算函数
5- 移动数据不如移动计算(将计算程序运行在离数据越近越好)
1- 可分区的:分区是抽象定的分区,仅仅是定义分区的信息规则
2- 只读的特性:一个RDD对象中数据是不可变的
3- 依赖: RDD与RDD之间是存在依赖关系的:依赖的关系越长,整个血缘关系越长,血缘关系越长重新计算的代价越大
依赖关系可以分为:宽依赖的窄依赖
4- 缓存:当需要对一个RDD的结果进行重复使用的时候, 可以将这个RDD的计算结果缓存起来, 减少后续重新计算的资源和时间消耗
5- checkpoint: 检测点
当依赖链条比较长的时候, 如果其中一个算子计算失败, 重新计算一次的代价是很大的, 需要重新将整个依赖关系全部重塑, 非常耗费资源, 可以同检测点将血缘关系打断, 在对应打断点上记录当前结果数据, 这样后续即使使用了, 也不需要重塑全部依赖流程 提升容错能留
如何获取RDD对象
from pyspark import SparkContext,SparkConf
import os
os.environ["SPARK_HOME"]="/export/server/spark"
os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"
if __name__ == '__main__':
print("构建RDD对象: 方式一演示")
conf = SparkConf().setMaster("local[*]").setAppName("init0")
sc = SparkContext(conf=conf)
rdd = sc.parallelize(["张三", "李四", "王五", "赵六", "田七"],6)
rdd2 = rdd.map(lambda name: (name, 1))
print(rdd2.getNumPartitions())
print(rdd2.glom().collect())
from pyspark import SparkContext, SparkConf
import os
os.environ["SPARK_HOME"] = "/export/server/spark"
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"
if __name__ == '__main__':
print("演示构建RDD的第二种方式: 加载外部数据集")
conf = SparkConf().setMaster("local[*]").setAppName("init1")
sc = SparkContext(conf=conf)
rdd = sc.textFile("file:///export/data/workspace/_02_pyspark_core/data/")
print(rdd.getNumPartitions())
print(rdd.glom().collect())
1) 默认情况下, 分区数量取决于 Master参数设置, 以及linux服务器的cpu的数量设置
2) 支持手动设置数据的分区数量:
parallelize(初始数据集, 分区数量)
3) 如何获取分区数量:getNumPartitions()
4) 如果获取每一个分区下的数据: glom().collect()
在用对象获取文件的时候用wholeTextFiles这个算子,可以避免多个文件都进行分区,避免造成资源的浪费
rdd = sc.wholeTextFiles("file:///export/data/workspace/_02_pyspark_core/data")
RDD算子操作
RDD算子的分类
1)一类是转换算子(transformastion)
1- 会返回一个新的RDD对象
2- 所有的转换算子函数都是lazy(惰性),不会立即执行,只有遇到了动作算子函数触发
3- 不负责数据的存储,仅仅是为了定义计算的规则
2)一类是动作算子(action)
1- 不会返回RDD,无返回值(saveAsTextFile),或直接返回计算结果(collect)
2- 会立即执行生成一个DAG的有向无环图执行任务,一个spark程序中有多少个动作算子,也就代表着有多少个任务
如:count, first, take,collect
算子的介绍
1)值类型转换算子:数据类型只有value 或者说算子只对value对处理
rdd = sc.parallelize(range(10))
需求: 请将 0-9的列表数据, 每一个数据都新增+1操作
rdd = sc.parallelize(range(10))
rdd2 = rdd.map(lambda num: num+1)
rdd2.collect()
结果为:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(range(10))
需求: 请将 0-9的列表数据,将其中 偶数分为一组 奇数分为一组
rdd = sc.parallelize(range(10))
rdd2= rdd.groupBy(lambda num: 'o' if(num%2 == 0) else 'j' )
rdd2.collect()
[('o', <pyspark.resultiterable.ResultIterable object at 0x7fd7faa41940>), ('j', <pyspark.resultiterable.ResultIterable object at 0x7fd7fab88fd0>)]
>>> rdd2.mapValues(list).collect()
[('o', [0, 2, 4, 6, 8]), ('j', [1, 3, 5, 7, 9])]
对key value中value数据执行map操作, 对其一对一转换操作
rdd = sc.parallelize(range(10))
需求: 请将 0-9的列表数据, 请将 >5的数据过滤掉
rdd = sc.parallelize(range(10))
rdd2 = rdd.filter(lambda num: num > 5)
rdd2.collect()
结果:
[6, 7, 8, 9]
需求: 将 姓名信息 进行切分, 然后将其所有姓名放置在一个列表中, 一个姓名就是一个列表元素
希望结果为: ["张三,老张,老王,老李,李四,周八,李九,王五,赵六,田七"]
rdd = sc.parallelize(["张三 老张 老王 老李","李四 周八 李九","王五 赵六 田七"])
rdd2 = rdd.flatMap(lambda names: names.split(" "))
rdd2.collect()
结果为:
['张三', '老张', '老王', '老李', '李四', '周八', '李九', '王五', '赵六', '田七']
2)双值类型的转换算子
rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([6,7,3,4,9])
分别计算其交集和并集:
rdd3 = rdd1.union(rdd2)
rdd4 = rdd1.intersection(rdd2)
rdd3.collect()
结果:
[1, 2, 3, 4, 5, 6, 7, 3, 4, 9]
rdd4.collect()
结果
[3, 4]
3)key-values类型的转换算子
rdd = sc.parallelize([('c01','张三'), ('c02','老张'), ('c01','老王'), ('c03','老李'), ('c02','李四'), ('c02','周八'), ('c04','李九'), ('c03','王五'), ('c02','赵六'), ('c03','田七')])
需求; 根据key分组, 求出每个班级人员
rdd2 = rdd.groupByKey()
rdd2.mapValues(list).collect()
结果:
[('c01', ['张三', '老王']), ('c02', ['老张', '李四', '周八', '赵六']), ('c03', ['老李', '王五', '田七']), ('c04', ['李九'])]
rdd = sc.parallelize([('c01','张三'), ('c02','老张'), ('c01','老王'), ('c03','老李'), ('c02','李四'), ('c02','周八'), ('c04','李九'), ('c03','王五'), ('c02','赵六'), ('c03','田七')])
需求; 根据key分组, 求出每个班级有多少个人
rdd2 = rdd.map(lambda x:(x[0],1)).reduceByKey(lambda agg,curr : agg+curr)
rdd2.collect()
结果:
[('c01', 2), ('c02', 4), ('c03', 3), ('c04', 1)]
rdd = sc.parallelize([(2,'c01'), (4,'c02'), (3,'c03'), (1,'c04')])
rdd2 = rdd.sortByKey(ansending=False).map(lambda num: (num[1],num[0]))
print(rdd2.collect())
rdd = sc.parallelize([1,2,3,1,2,2,1,4,5,2,3])
print(rdd.countByValue())
defaultdict(<class 'int'>, {1: 3, 2: 4, 3: 2, 4: 1, 5: 1})
|