IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark-spark程序与pyspark交互-submit的参数-RDD的特性-初识对象数据集-算子类型 -> 正文阅读

[大数据]spark-spark程序与pyspark交互-submit的参数-RDD的特性-初识对象数据集-算子类型

spark程序与pyspark交互流程

交互的流程图

在这里插入图片描述

说明

  • 以提交到yarn集群为例:部署方式为 client

  1- 由执行的spark-submit脚本提交任务,会在当前这个节点根据提交的信息启动一个Driver程序, 由这个Driver程序向yarn的主节点提交任务操作,yarn会认为这个任务启动一个applicationmaster程序,后续与任务相关的操作都找这个applicationmaster程序即可(任意节点的都可以启动和Driver不在一个节点)
  2- applicationmaster会根据executor资源信息行yarn的主节点申请源,用于启动executor
  3- 当applicationmaster拿到用于启动executor的资源后,通知相应的从节点启动executor执行器即可,当对应节点的执行器启动后,反向响应给Driver程序
  4- Driver程序就会开始进行任务的执行流程图生成,此时会产生一个DAG的有向无环图,执行各个stage节点,以及划分区数的操作
     4.1:先根据py4j读取spark程序中的代码,创建       sparkcontext对象
     4.2:检查后续一共使用到哪些算子,再根据这些算子划分stage阶段(每个节点划分为几个区,对应有几个线程执行),并生成DAG执行流程图
     4.3:通知executor进行执行对应的任务操作即可
     4.4:监控各个executor执行进度,等待执行完成,关闭sparkcontext对象,释放资源,通知applicationmaster已经执行完成

5- executor接收到Driver程序分配任务后, 开始运行执行任务即可, 如果任务的结果需要返回给Driver, 此时将结果数据返回即可, 如果不需要, 直接输出操作, 那么Task程序就直接将结果输出即可

6- AppMaster收到Driver程序处理完成的信息后, 通知yarn的主节点, 任务执行完成, 回收资源并关闭整个任务

  • 以提交到yarn集群为例:部署方式为 cluster
跟部署方式为 client相对比
第1- 步不同:由执行的spark-submit脚本提交的任务给 resourcemanager(yarn的主节点),yarn会为这个任务启动一个applicationMaster程序,后续与任务相关的操作都找这个applicationMaster即可 (任意某个从节点)2- 步不同:appMaster会根据任务的信息分别启动Driver程序(让自己同时升级为Driver程序),然后根据executor资源信息向yarn主节点申请资源, 用于启动executor

  • 以提交到spark集群为例 部署方式为 client
1- 由执行spark-submit脚本提交任务, 会在当前这个节点根据提交信息启动一个Driver程序, Driver程序会根据资源信息,向Master申请资源, 用于启动executor

2- 当Driver拿到用于启动executor的资源后, 通知相对应的从节点启动executor执行器即可, 当对应节点执行器启动后, 反向响应给Driver程序, 已经启动好了

3- Driver程序就会开始进行任务的执行流程图生成. 此时就会产生DAG执行流程图, 划分各个执行stage节点, 以及划分分区数等操作:
    3.1:首先根据py4j 读取spark程序中代码,创建sparkContext对象
    3.2:检查后续一共使用到那些算子, 根据这些算子划分stage阶段(每个节点划分为几个分区, 对应有几个线程执行), 并生成DAG执行流程图
    3.3:通知各个executor进行执行对应任务操作即可
    3.4:监控各个executor执行进度, 等待执行完成, 关闭sparkContext对象, 释放资源, 通知Master , 任务已经执行完成

4- executor接收到Driver程序分配任务后, 开始运行执行任务即可, 如果任务的结果需要返回给Driver, 此时将结果数据返回即可, 如果不需要, 直接输出操作, 那么Task程序就直接将结果输出即可

5- master收到任务处理完成的信息后, 回收资源即可
  • 以提交到spark集群为例 部署方式为 cluster
与部署方式为 client对比
第1- 步不同:由执行spark-submit脚本提交任务到 spark集群的Master节点, Master会根据提交信息, 在某一个worker节点上, 启动一个Driver程序
。。。。。。。。。。
剩下的步骤相同

spark-submit想关的参数

spark-submit.sh脚本的作用

  • 用于将spark程序提交到指定的资源调度平台上进行运行,并且在提交过程中,可以对资源设置相关的配置信息

  • 基本参数

--master :用于指定提交到那个资源调度平台 (可选择: local| spark | yarn ....)

-- deploy-mode:  用于指定提交部署方式(可选择: client 和 cluster)

--conf :  用于设置相关配置信息

python-file : 指定spark的python脚本

args:  添加程序入口参数, 如果没有参数是可以不配置的

spark-core的内容(核心部分)

RDD的基本介绍

  • MR的计算过程

在这里插入图片描述

  • RDD的计算过程

在这里插入图片描述

  • 背景说明
1)在早期的计算模型: 单机模型
   比如: pandas , mysql
   依赖于单个节点的性能
   适用于: 少量数据集统计分析的处理
   在计算过程中,数据都是在一个进程中的,不断地进行迭代计算操作

2)当数据量大了以后, 单机的这种计算的模式就无法支撑了,此时需要分布式的计算的模型
   核心:让多个节点参与计算, 将计算任务进行划分, 将这个部分交给各个节点进行运行, 运算后, 将结果进行汇总
   比如:MapReduce, spark

MapReduce计算的模型:
   在计算过程中, 每一个MR都是有两部分组成: map  和 reduce,在计算过程中, 需要将数据从磁盘读取内存中, 从内存落入磁盘, 再从磁盘读取到内存中, 这样导致整个IO变大, 不断的与磁盘进行交互, 整个执行效率 也是比较低的
   由于一个MR只有map和reduce节点, map进行分布式计算, 计算reduce汇总统计, 如果需要进行多次的分布式计算和多次聚合统计(迭代计算), 对于MR来说, 必须使用多个MR进行串行执行了,而这样的执行导致每一个MR都需要重新申请资源, 回收资源, 大量的时候都消耗在了资源申请和回收上, 而且这种操作中间结果只能保存在磁盘中, 导致效率比较低
   
   正因为有了这个MR问题后, 此时想办法解决问题,解决问题思路: 
		1) 是否可以让中间结果都保存在内存中, 这样效率是不是就比较高了
		2) 是否可以在一个程序中完成多次的不断迭代计算操作

什么是RDD

RDD:弹性的分布式数据集 
RDD的目的:主要用于支持更加高效的迭代计算
  • RDD的五大特性
注:123是必须有的  4,5可选

1- 可以分区的:每一个分区对应了一个task线程
2- 计算函数: 对每一个分区进行计算操作
3- 存在依赖关系
4- 对于k-v数据存在分区计算函数
5- 移动数据不如移动计算(将计算程序运行在离数据越近越好)
  • RDD的五个特点
1- 可分区的:分区是抽象定的分区,仅仅是定义分区的信息规则
2- 只读的特性:一个RDD对象中数据是不可变的
3- 依赖: RDDRDD之间是存在依赖关系的:依赖的关系越长,整个血缘关系越长,血缘关系越长重新计算的代价越大
       依赖关系可以分为:宽依赖的窄依赖
4- 缓存:当需要对一个RDD的结果进行重复使用的时候, 可以将这个RDD的计算结果缓存起来, 减少后续重新计算的资源和时间消耗
5- checkpoint: 检测点
   当依赖链条比较长的时候, 如果其中一个算子计算失败, 重新计算一次的代价是很大的, 需要重新将整个依赖关系全部重塑, 非常耗费资源, 可以同检测点将血缘关系打断, 在对应打断点上记录当前结果数据, 这样后续即使使用了, 也不需要重塑全部依赖流程  提升容错能留

如何获取RDD对象

  • 构建RDD对象的方式主要有两种:

在这里插入图片描述

  • 第一种通过 parallelize()来构建
from pyspark import SparkContext,SparkConf
import os

# 目的: 锁定远端操作环境, 避免存在多个版本环境的问题
os.environ["SPARK_HOME"]="/export/server/spark"
os.environ["PYSPARK_PYTHON"]="/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/root/anaconda3/bin/python"

if __name__ == '__main__':
    print("构建RDD对象: 方式一演示")

    # 1) 构建sparkContext对象
    # local[*]:  * 表示当前环境中的cpu是几核的, * 就表示运行多少个线程
    conf = SparkConf().setMaster("local[*]").setAppName("init0")
    sc = SparkContext(conf=conf)


    #2) 通过 paralleleize 获取RDD对象
    # 通过初始化数据集方式来构建一个RDD对象
    rdd = sc.parallelize(["张三", "李四", "王五", "赵六", "田七"],6)
    #rdd = sc.parallelize(range(10))

    rdd2 = rdd.map(lambda name: (name, 1))

    #3) 打印这个RDD对象中数据
    # getNumPartitions()  是用于获取当前这个rdd有多少个分区数
    # glom() : 获取每一个分区的数据
    print(rdd2.getNumPartitions())
    print(rdd2.glom().collect())

  • 第二种通过加载外部数据源的方式构建

from pyspark import SparkContext, SparkConf
import os

# 目的: 锁定远端操作环境, 避免存在多个版本环境的问题
os.environ["SPARK_HOME"] = "/export/server/spark"
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"

if __name__ == '__main__':
    print("演示构建RDD的第二种方式: 加载外部数据集")

    # 1) 创建 sparkContext对象
    conf = SparkConf().setMaster("local[*]").setAppName("init1")
    sc = SparkContext(conf=conf)

    #2) 构建RDD对象: 加载外部数据集
    # 请问 : 这里的写的路径对不对?  1  2
    # 注意: file:/// 本地路径 指的是 linux的本地路径 不是windows本地路径
    # 注意:
    #   在加载外部数据集, 如果数据是本地路径,文件有多少个, 自动划分多少个分区数, 如果加载HDFS, 有多少个block 对应就有多少个分区
    #   注意:  如果文件的分区数量小于了 local[*]的数量, 依然会以local[*]数量为标准
    rdd = sc.textFile("file:///export/data/workspace/_02_pyspark_core/data/")

    # 3) 打印 rdd数据
    print(rdd.getNumPartitions())
    print(rdd.glom().collect())
    
  • 说明
1) 默认情况下, 分区数量取决于 Master参数设置, 以及linux服务器的cpu的数量设置
2) 支持手动设置数据的分区数量: 
	parallelize(初始数据集, 分区数量)
3) 如何获取分区数量:getNumPartitions()
4) 如果获取每一个分区下的数据: glom().collect()	

  • 如何处理大量小文件,避免开启太多的分区
在用对象获取文件的时候用wholeTextFiles这个算子,可以避免多个文件都进行分区,避免造成资源的浪费

 rdd = sc.wholeTextFiles("file:///export/data/workspace/_02_pyspark_core/data")
 

RDD算子操作

RDD算子的分类

1)一类是转换算子(transformastion)
   1- 会返回一个新的RDD对象
   2- 所有的转换算子函数都是lazy(惰性),不会立即执行,只有遇到了动作算子函数触发
   3- 不负责数据的存储,仅仅是为了定义计算的规则


2)一类是动作算子(action)
   1- 不会返回RDD,无返回值(saveAsTextFile),或直接返回计算结果(collect)
   2- 会立即执行生成一个DAG的有向无环图执行任务,一个spark程序中有多少个动作算子,也就代表着有多少个任务


如:count, first, take,collect

算子的介绍

  • 转换算子

1)值类型转换算子:数据类型只有value 或者说算子只对value对处理

#map算子
rdd = sc.parallelize(range(10))

需求:  请将 0-9的列表数据, 每一个数据都新增+1操作
rdd = sc.parallelize(range(10))   # 初始化了 0~9的列表数据集
rdd2 = rdd.map(lambda num: num+1)
rdd2.collect()
结果为:
	[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 

#----------------------------------------------
#groupby算子
rdd = sc.parallelize(range(10))

需求:  请将 0-9的列表数据,将其中 偶数分为一组 奇数分为一组
rdd = sc.parallelize(range(10))   # 初始化了 0~9的列表数据集
rdd2= rdd.groupBy(lambda num: 'o' if(num%2 == 0) else 'j' )

rdd2.collect()
[('o', <pyspark.resultiterable.ResultIterable object at 0x7fd7faa41940>), ('j', <pyspark.resultiterable.ResultIterable object at 0x7fd7fab88fd0>)]
>>> rdd2.mapValues(list).collect()
[('o', [0, 2, 4, 6, 8]), ('j', [1, 3, 5, 7, 9])]

#--------------------------------------------------
#mapvalues算子
对key value中value数据执行map操作, 对其一对一转换操作

#--------------------------------------------------
#filter算子
rdd = sc.parallelize(range(10))

需求:  请将 0-9的列表数据, 请将 >5的数据过滤掉
rdd = sc.parallelize(range(10))   # 初始化了 0~9的列表数据集
rdd2 = rdd.filter(lambda num: num > 5)
rdd2.collect()
结果:
[6, 7, 8, 9]

#---------------------------------------------------
#flatmap算子
需求: 将 姓名信息 进行切分, 然后将其所有姓名放置在一个列表中, 一个姓名就是一个列表元素
希望结果为:  ["张三,老张,老王,老李,李四,周八,李九,王五,赵六,田七"]

rdd = sc.parallelize(["张三 老张 老王 老李","李四 周八 李九","王五 赵六 田七"])
rdd2 = rdd.flatMap(lambda names: names.split(" "))
rdd2.collect()
结果为: 
['张三', '老张', '老王', '老李', '李四', '周八', '李九', '王五', '赵六', '田七']

2)双值类型的转换算子

#union 计算并集(不会去重)
#intersection计算交集

rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([6,7,3,4,9])

分别计算其交集和并集:
rdd3 = rdd1.union(rdd2)
rdd4 = rdd1.intersection(rdd2)

rdd3.collect()
结果:
[1, 2, 3, 4, 5, 6, 7, 3, 4, 9]
rdd4.collect()
结果
[3, 4]

3)key-values类型的转换算子

#groupBykey  将相同的key进行分组操作,将相同的key的values进行合并形成一个列表
rdd = sc.parallelize([('c01','张三'), ('c02','老张'), ('c01','老王'), ('c03','老李'), ('c02','李四'), ('c02','周八'), ('c04','李九'), ('c03','王五'), ('c02','赵六'), ('c03','田七')])

需求; 根据key分组, 求出每个班级人员
rdd2 = rdd.groupByKey()
rdd2.mapValues(list).collect()
结果:
[('c01', ['张三', '老王']), ('c02', ['老张', '李四', '周八', '赵六']), ('c03', ['老李', '王五', '田七']), ('c04', ['李九'])]

#---------------------------------------------------

#reduceBykey 根据key进行分组,求每个班级有多少个人
rdd = sc.parallelize([('c01','张三'), ('c02','老张'), ('c01','老王'), ('c03','老李'), ('c02','李四'), ('c02','周八'), ('c04','李九'), ('c03','王五'), ('c02','赵六'), ('c03','田七')])
需求; 根据key分组, 求出每个班级有多少个人
rdd2 = rdd.map(lambda x:(x[0],1)).reduceByKey(lambda agg,curr : agg+curr)

rdd2.collect()
结果: 
[('c01', 2), ('c02', 4), ('c03', 3), ('c04', 1)]

#-----------------------------------------------

#sortByKey  根据key进行排序操作,默认是升序排序
#需求: 假设有一下数据, 请按照key进行倒序排序
rdd = sc.parallelize([(2,'c01'), (4,'c02'), (3,'c03'), (1,'c04')])

rdd2 = rdd.sortByKey(ansending=False).map(lambda num: (num[1],num[0]))

print(rdd2.collect())
#----------------------------------------------------

#countByValues 根据values进行统计操作

rdd = sc.parallelize([1,2,3,1,2,2,1,4,5,2,3])
print(rdd.countByValue())
defaultdict(<class 'int'>, {1: 3, 2: 4, 3: 2, 4: 1, 5: 1})

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-09 19:37:22  更:2021-11-09 19:39:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 4:35:29-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码