IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 《Spark编程基础-python》考前复习-----考试前不停更---至第五章 -> 正文阅读

[大数据]《Spark编程基础-python》考前复习-----考试前不停更---至第五章

Spark 复习

更新至第四章

第二章

1、Spark的主要组成部分和各部分的作用

SparkSql ->SQL及时查询(满足交互式查询分析需求)

SparkStreaming -> 实时流式计算(满足流计算需求)

Spark Mllib -> 机器学习

Spark Graphx -> 图计算

image-20220606111050921

2、Spark架构

image-20220606135312398

名词解释:

  • DAG:有向无环图,反映RDD之间的依赖关系
  • Executor:运行在工作节点上的一个进程,负责运行任务,并为应用程序存储数据
  • Task:Executord 的工作单元
  • Job:一个作业包含多个RDD和相应RDD上的各种操作
  • Stage:作业的基本调度单位,一个作业会分为多组任务,每个任务称为阶段,也称为任务集

简述Spark架构模型:

  • Spark采用主从架构,一个Master,任务控制节点Driver Program简称为Driver
  • 多个Worker,运行作业任务的工作节点Worker Node,以及每个WN上负责具体任务的执行进程Executor
  • 协调两者的是集群资源(CPU,Memory,带宽)管理器,Cluster Manager 可以是Yarn,也可以是Mesos

简述Spark 各种应用程序,作业,任务,阶段 间的关系

  • 一个Spark应用程序有一个任务控制节点Driver和若干个作业Job构成,一个作业有多个阶段构成,一个阶段又由多个任务构成
  • 每执行一个application 时,Driver 会像Cluster Manager 申请资源,启动Executor 进程,然后向Executor 发送应用程序和文件,然后再Executor上执行任务,任务结束后任务结果写回任务控制节点或写到HDFS或其他的数据库中

3、Spark运行流程

image-20220606140414159

应用程序提交后首先创建基本运行环境

  • Driver 创建SparkContext对象
  • SparkContext作为application 连接集群的通道,负责与ClusterManager通信并进行资源申请,任务分配和监控,此时SC与CM注册并申请运行Executor 的资源
  • CM为Executor 分配资源,并启动Executor 进程,同时Executor 运行情况随心跳发送给资源管理器
  • SC根据RDD依赖关系构建DAG图,并将DAG图发送给DAGScheduler(DAG调度器) 进行解析,将DAG分为多个阶段,计算出各个阶段间的依赖关系,并将每个任务集提交给底层的TaskScheduler(任务调度器)进行处理
  • Executor 向SC申请任务,任务调度器将任务分发给Executor 运行,同时sc将程序代码发给Executor
  • Executor 将执行结果反馈给TaskScheduler,然后反馈给DAGScheduler,运行完毕写入数据后释放资源

Executor 的优点:

  • 程序运行时Executor利用多线程运行任务,减少多进程任务频繁的启动开销
  • Executor 有一个BlockManager 模块,将内存和磁盘共同作为存储设备(内存不够时才会使用磁盘),需要多轮迭代中间结果会写到磁盘中,下次需要则直接读取BM中的数据,而不需要写入HDFS中,减少了网络通信和IO开销

DAGScheduler的作用:

  • 以shuffle 为边界切割stages
  • 基于stages创建TaskSets,并将TaskSets提交给TaskScheduler 请求调度

3、RDD

RDD的相关概念

  1. RDD本质上是一个只读的分区记录集合
  2. RDD的不同分区可以保存到集群的不同节点上,从而可以在集群的不同节点上进行并行计算
  3. RDD高度受限的共享内存模型指的是RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集来创建RDD或者通过其他RDD上执行缺点的转换操作而创建得到新的RDD
  4. RDD提供的转换操作都是粗粒度的数据转换操作,而不针对某个数据项进行细粒度修改,

RDD的惰性机制

RDD的执行过程中,在转换过程中并不真正的发生计算生成RDD,而是记录转换的轨迹,也就是RDD之间的生成的依赖关系,也就是DAG图,当遇到第一个动作类型的操作后才会进行从头到尾的计算

RDD的血缘关系

image-20220606143604853

一系列的转换和动作操作生成的RDD之间形成了血缘关系,而通过血缘关系连接起来的结果不需要通过磁盘来保存中间数据,而是直接通过管道式流入下一个操作进行处理,避免多次转换操作之间数据同步的等待

RDD的特性及其原因

  • 高容错性:
    • 常见的容错机制:数据复制和日志记录
    • RDD只读,不可修改,如果要修改必须生成新的RDD建立血缘关系(天然容错),不需要通过数据冗余的方式实现容错,只需要通过依赖关系通过计算得到丢失分区的数据,而不需要回滚整个系统
    • RDD只提供粗粒度的操作,rdd依赖只需要记录这种粗粒度的操作而不需要记录具体的的数据和各种细粒度操作的日志,降低容错开销
  • 中间结果存储在内存:RDD转换间的中间数据是持久化在内存上而不是磁盘上,所以可以减少IO开销
  • 存放的数据可以是Java对象,减少了不必要的对象序列化和反序列化的开销

RDD的依赖关系

分类:

  • 宽依赖,不可以进行流水线优化
    • 存在shuffle操作(数据分发的过程,存在IO和网络开销)
    • 多对一
    • 包含shuffle操作
  • 窄依赖,可以进行流水线优化
    • 不存在shuffle操作
    • 一对多
    • 一对一
    • 不包含shuffle 操作

流水线优化:

每个RDD操作其实都可以视为是fork/join模式,也就是同一个任务中各个部分并行进行计算后将每个部分的结果合并起来然后进入下一个阶段的模式

那么对于每一个RDD的准换过程可以视为不停的进行for,join,fork,join,这就意味着每一次的转换的join必须等待最慢的节点执行完才能进行下一步的操作,其次join操作本身也是需要下一阶段去同步等待的,如果这些操作都是窄依赖的话,那么不同分区之间的数据变化并不会影响到其他分区

image-20220606154407726

这样就没有必要在每一个过程中都进行数据的合并,可以将若干的fork /join 合并成一个fork/join,这个过程称之为流水线优化

描述下面图中Stage 的划分过程:

image-20220606154809674

核心:DAG回溯算法

过程:

  1. RDD-G触发了写入操作,所以Spark内核首先会从触发Action操作的RDD开始从后向前进行回溯
  2. 先为RDD-G创建一个Stage3
  3. 向前回溯过程中首先发现RDD-F到RDD-G的过程是宽依赖,所以为RDD-F创建一个新的Stage2
  4. 因为RDD-D -> RDD-F 及RDD-E->RDD-F 均为窄依赖,所以DFG都为同一个Stage2
  5. 因为RDD-C -> RDD-D为窄依赖,所以CDRF均为一个Stage2
  6. 因为RDD-B -> RDD-G 为窄依赖,所以BG为同一Stage3
  7. 因为RDD-A -> RDD-B 为宽依赖,所以为RDD-A创建新的Stage1

RDD的运行过程:

image-20220606155837595

  1. 创建RDD对象
  2. SC负责计算RDD之间的依赖关系,并创建DAG图,提交给DAGScheduler
  3. DAGScheduler 把DAG图分解为多个阶段,每个阶段包含多个任务,每个任务被TaskScheduler 分发给各个工作阶段上的Executor 进程去执行

4、Spark 部署模式

  • Local 单机模式
  • Standalone 模式(集群)
    • 使用Spark自带的资源调度管理器
  • Spark Mesos模式(集群)
    • scala语言开发的资源调度管理器,效率更高
  • Spark Yarn模式
    • hadoop 生态资源调度管理器

第四章

Hadoop is good
Spark is fast
Spark is better

1、创建RDD

从文件中创建

#从linux 本地文件创建
lines = sc.textFile("file:///home/user/code/py/temp.txt")
#从hdfs中创建
lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
lines = sc.textFile("/user/hadoop/word.txt")
#默认到/user/用户名目录下去寻找数据,也就是hdfs 的用户主目录
lines = sc.textFile("word.txt")

读取数据后每一行都是RDD中的一个元素

通过集合创建

array = [1,2,3,4,5]
rdd = sc.paralleize(array)

2、transform API

filter(func)

  • func = true 则输出
  • func = false 则丢弃
from pyspark import SparkConf,SparkContext
#生成配置上下文信息,Master是local模式,app名称是MyApp
conf = SparkConf().setMaster("local").setAppName("My App")
#生成SparkContext 对象
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///home/user/code/py/temp.txt")
filterTest = lines.filter(lambda line: "Spark" in line)
filterTest.foreach(print)

image-20220606163920701

map(func)

对RDD中的每个元素进行操作

eg1:

lines = sc.parallelize([1,2,3,4,5])
nRdd = lines.map(lambda x: x+10)
nRdd.foreach(print)

eg2:

image-20220606164316338

flatMap(func)

lines = sc.textFile("file:home/user/code/py/testFlatMap.txt")
words = lines.flatMap(lambda line:line.split(" "))
words.foreach(print)

若对RDD中每个元素进行Map操作后中RDD中的元素是集合对象,则flatMap会在此基础上将集合中的每个元素取出来,并作为新的元素置于RDD中

image-20220606164533699

groupByKey()

将RDD中具有相同key的元素合并起来,这些key 对应的值归并成一个集合对象

image-20220606164818929

image-20220606164722518

这个结合对象的类型是ResultIterable类型

reduceByKey(func)

对groupByKey 的结果进行进一步的func 计算,提供的func函数用于实现对valueList 的 reduce 操作

image-20220606165100380

lambda a,b: a+b

每次运算的中间结果会赋值给下一次运算的a,然后读取下一个数作为b

3、Action API

count()

返回RDD中元素的个数

collect()

以数组的形式返回数据集中的所有元素

在Driver 节点运行,所以有网络传输开销

first()

返回数据集中的第一个元素

take(n)

以数组的形式返回数据集中的前n个元素

reduce(func)

通过函数func聚合数据集中的元素

foreach(func)

将数据集中的每个元素传递给func函数作为参数运行

在WorkerNode 中运行,也就是在集群中运行,没有网络开销

4、持久化API

  • 对于Spark 来说持久化操作是持久化到内存中的,只有当内存不够用的时候才会将数据持久化到硬盘中
  • 持久化操作同理也是一种非action操作,所以仍然符合惰性机制
  • 被持久化的RDD往往是在迭代运算中需要经常重复使用的中间结果
  • persist() 对一个RDD标记为持久化
    • MEMORY_ONLY作为参数
      • ===RDD.cache()
    • MEMORY_AND_DISK 内存不足存放磁盘
  • unpersist() 将RDD从内存中消除掉

5、指定分区

分区个数

local模式下RDD分区个数 = 集群中CPU核心数

standalone和yarn 模式,默认值是集群中所有CPU数目的总和和2之间取较大值

也可以自定义指定:

#指定路径和分区个数
sc.textFile(path,partitionNum)
sc.parallelize(array,partitionNum)

对于已经生成的RDD重新制定分区:

rdd.repartition(n)

显示分区个数

len(rdd.glom().collect())

分区方法

image-20220606173046854

自定义分区方法

from pyspark import SparkConf,SparkContext

def MyPartitioner(key):
     print("myPartitioner is Running")
     print("the key is %d" %key)
     return key %10

def main():
     print("The main function is Running")
     conf = SparkConf().setMaster("local").setAppName("My APP")
     sc = SparkContext(conf = conf)
     data = sc.parallelize(range(10),5)
     data.map(lambda x : (x,1))\
             .partitionBy(10,MyPartitioner)\
             .map(lambda x :x [0])\
             .saveAsTextFile("file:///home/user/code/py/temp")

if __name__ == '__main__':

image-20220606173205565

有多少个分区,最终数据就会写到多少个文件当中

6、键对RDD

创建:

pairRDD = lins.flatMap(lambd line : lin.split(" ")).map(lambda word : (word,1))
pariRDD = sc.paralleize(list).map(lambda word : (word,1))

reduceByKey(func)

聚合运算

先对具有相同键的键值对汇总成一个列表,然后对list value 进行汇总聚合

groupByKey(func)

将Key 相同的键值对归并为,key:值的列表

keys()

生成新的RDD,每个元素都是原RDD的键

values()

将旧RDD的value 返回生成一个新的RDD

sortByKey()

默认情况下是对Key进行升序排序,参数默认是True,若修改参数为False 则进行降序排序

key必须是可比较的对象

sortBy()

sortBy 则可以指定根据什么进行排序,传入两个参数,第一个是一个lambda 表达式,指明排序根据的对象,第二个参数指明升序或者降序排序的方式

lambda x : x 指定排序对象是键值对本身,则默认根据键进行排序

lambda x : x[0] 指定排序对象是键

lambda x : x[1] 指定排序对象是值

combinByKey()

image-20220606203148948

image-20220606203655370

方案一是计算各个分组内的和然后求平均,但是如果某一节点valueList 特别大,就会导致这个节点的压力特别大,所以这种方式可行,但是不是最优选项

  • 参数一func函数:对取出的第一个key,value 的value 进行func 操作
  • 参数二func函数:分区内的计算规则,通过参数1的func生成的value 与除第一个之外的元素进行相关的合并操作的函数
  • 参数三:分区间计算规则,分区间的value 进行func 的合并操作

image-20220606205322227

在这个例子中参数1 将每个分区的第一个key,value 的value 转换为了(value,1)的形式

参数二是同一分区中(这个分区中key是相同的)进行操作的方式,acc 是上面的(value,1)而income 是原来的第二个键值对的key,value 的value值

参数三就是不同的分区间,这时各个分区的**key是还是相同的**,也就是相同key 的分区之间的操作,(key,(value1+value2,number1+number2))然后再进行一个map操作,(key,(value1+value2,【value1+value2]/[number1+number2])

mapValues(func)

将func 作用到每个键值对的value上

join()

作用对象是两个RDD

只有两个RDD中 元素存在相同的key 的时候才会将value归并并且输出,没有相同key 的则不输出

image-20220606200841481

7、几个考察案例

求top值

image-20220606210409669

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("FindTop")
sc = SparkContext(conf = conf)

#制定文件目录而非文件,会把文件目录中的所有文件全部加载到RDD中
lines = sc.textFile("file:///home/user/code/py/file")
#对数据进行第一次处理,留下的数据必须满足两个条件,去头去尾空格后字符串长度大于0
#将每一行以,为间隔符分开后长度大于4
result1 = lines.filter(lambda line: (len(line.strip())>0) /
        and /
        (len(line.split(",")) == 4)
        )
#取出所有的payment
result2 = result1.map(lambda line : line.split(",")[2])
#将所有的元素转换为(number,"")的形式
result3 = result2.map(lambda x : (int(x),""))
#重置分区数置为1,保证全局有序,因为如果不加这一条,下一行代码执行完只会在
#某个子分区中有序,而不是全局有序
result4 = result3.repartition(1)
#true 升序, false 降序
result5 = result4.sortByKey(false)
#去掉临时加进来的字符串
result6 = result5.map(lambda x : x[0])
#取前5个payment
result7 = result6.take(5)

文件排序

image-20220606212945725

from pyspark import SparkContext,SparkConf

index = 0

def  getIndex():
        global getIndex
        index += 1
        return index
def main():

        conf = SparkConf().setMaster("local").setAppName("sortFile")
        sc = SparkContext(conf =conf)
        lines = sc.textFile("file:///home/user/code/py/file")
        resut1 = lines.filter(lambda line : len(line.strip()) > 0)
        result2 = result1.map(lambda x : (int(x.strip()),1))
        result3 = result2.repartition(1)
        result4 = result3.sortByKey(True)
        result5 = result4.map(lambda x : (getIndex(),x[0]))
        result5.foreach(print)
        #写入目录当中
        result5.saveAsTextFile("file:///home/user/py/code/py/fileSort")

if __name__ == '__main__':
        main()

二次排序

image-20220606214924608

image-20220606215616858

from pyspark import SparkContext,SparkConf
from operator import gt

class SecondarySortKey():
        #构造函数,python好像不需要显示声明成员变量
        #这里构造的这个SecondarySortKey 对象两个字段就是原来的一行两个value  
        def __init__(self,k):
                self.column1 = k[0]
                self.column2 = k[1]

        #这里应该是实现了一个名为operator接口的比较函数
        #在SortByKey这个函数中底层应该调用的是这个operator接口,这里返回值为boolean
        def __gt__(self,other)
                #如果column1 两者相同,就返回column2的比较结果
                if other.column1 == self.column1:
                    	#若a > b则返回true,这时意味着升序的时候 a在b后面
                        return gt(self.column2,other.column2)
                else:
                        #否则就返回column1 的比较结果,这里返回值为bol,如果a > b则返回true
                        return gt(self.column1,other.column1)

def main():

        conf = SparkConf().setMaster("local").setAppName("sortFile")
        sc = SparkContext(conf =conf)
        rdd1 = sc.textFile("file:///home/user/code/py/file")
        rdd2 = rdd1.filter(lambda x : (len(x.strip() > 0)))
        rdd3 = rdd2.map(lambda x : ((int(x.split("")[0]),int(x.split("")[1])),x))
        rdd4 = rdd3.map(lambda x : (SecondarySortKey(x[0]),x[1]))
        rdd5 = rdd4.sortByKey(False)
        rdd6 = rdd5.map(lambda x : x[1])
        
if __name__ == '__main__':
        main()

8、文件读写

  • sc.textFile(“path”)
  • sc.saveAsTextFile(“文件夹名,将所有分区内容写到该文件夹中”)

9、读写Hbase

第五章

1、SparkSession

#创建SparkSession
from pyspark import SparkContext,SparkConf
from pySpark import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

在pyspark 交互式环境中默认提供SparkSession(spark)和SparkContext(sc)

使用SparkSession 接口替代SQLContext

从不同的数据源中加载数据,并把数据生成DataFrame,支持把DataFrame 转换成SQLContext 自身中的表,然后使用SQL语句来操作数据

2、DataFrame

创建:

spark.read.txt("people.txt")
spark.read.format("text").load("people.txt")
spark.read.json("people.json")
spark.read.format("json").load("people.json")
spark.read.parquet("people.parquet")
spark.read.format("parquet").load("people.parquet")

保存:

df.write.txt("people.txt")
df.write.format("text").save("people.txt")
df.write.json("people.json")
df.write.format("json").save("people.json")
df.write.parquet("people.parquet")
df.write.format("parquet").save("people.parquet")

常用操作:

printSchema()

打印dataFrame 的模式,也就是表有什么字段,分别是什么类型

image-20220607092133142

select(condition)

原sql中的表名.列名表示为:df[“列名”]

传入一个condition 对表进行查询

image-20220607092250291

image-20220607093033830

filter(condition)

condition为true 则留下,false 则滤去

groupBy(列名)

根据某一列进行分组合并,往往和action函数组合在一起

image-20220607093004173

sort()

根据某一列进行排序,若传入多列则进行层次排序

image-20220607092612945

先按age进行排序,然后age相同则按name进行排序

desc为降序排序

asc 为升序排序

withColumn

image-20220607092830297

image-20220607092919770

withColumnRenamed

image-20220607092851293

drop

image-20220607092931833

join

image-20220607092940446

where

image-20220607093018335

show

显示出查询结果

rdd->DataFrame

image-20220607093153159

利用反射机制推断rdd模式

已知数据格式

from pyspark import SparkContext,SparkConf,SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

people = spark.sparkContext.textFile("file:///user/local/people.txt")/
        .map(lambda line : line.split(","))
        #因为这里已知表的模式,所有可以直接赋值
        .map(lambda p : Row(name = p[0],age=int(p[1])))
#rdd转DataFrame,前提是RDD中的每个元素都是一个Row对象
schemaPeople = spark.createDataFrame(people)
#不能直接对DataFrame进行查询,必须创建一个SparkSession 中的临时视图
people.createOrReplaceTempView("people")
#传入Sql语句进行查询
personDF = spark.sql("select name,age from people where age >20")
personRDD = personDF.rdd.map(lambda p :"Name:"+p.name+","+"Age:"+str(p.age))
personRDD.foreach(print)
利用编程方式
from pyspark import SparkContext,SparkConf,SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
#并不知道数据的格式,而是通过指定的情况引入处理,所谓编程方式其实理解成generate更好一些
#制作表头
schemaString = "name age"
fields = [StructField(field_name,StringType,True)] for field_name in /
        schemaString.split(" ")
schema = StructType(fields)


lines = spark.sparkContext.textFile("file:///user/local/people.txt")
parts = lines.map(lambda x : x.split(","))
people = parts.map(lambda p: Row(p[0],p[1].strip()))
#参数1是生成的RDD数据,参数2是表头
schemaPeople = spark.createDataFrame(people.schema)
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-08 19:06:49  更:2022-06-08 19:08:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:05:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码