RDD的内部运行方式
RDD(Resilient Distributed Datasets)
- 是一个容错的,并行的数据结构,可以让用户显式的将数据存储到磁盘和内存中,并能控制数据的分区
- 提供了一组丰富的操作来操作数据
- 本质是一个只读的分区记录集合,一个RDD可以包含多个分区,每个分区是一个DataSet片段
- RDD之间可以相互依赖(窄依赖,宽依赖)
RDD的分区
- 通过不同的分区,将数据实际映射到不同的Spark节点上
RDD的特点
- 只读不能修改:只能通过转换操作生成一个新的RDD
- 分布式存储:一个RDD通过分区可以分布在多台机器上进行并行数据处理
- 内存计算:可以将全部或部分数据缓存在内存中,且可在多次计算过程中重用
- 具有弹性:在计算过程中,当内存不足时,可以将一部分数据落到磁盘上处理
RDD的常用操作
RDD的创建
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName('RDD_create_demo')
sc = SparkContext(conf)
parallelize
list1 = [1, 2, 3, 4, 5]
rdd1 = sc.parallelize(list1, numSlices = 3)
rdd1.collect()
rdd1.glom().collect()
result : [[1], [2, 3], [4, 5]]
range
rdd2 = sc.range(1, 20, 2, numSlieces = 3)
textFile
rdd3 = sc.textFile('./wordcount.txt', 2)
rdd3.collect()
result : ['hadoop spark flume', 'spark hadoop', 'flume hadoop']
通过RDD衍生
wordsRDD = rdd3.flatMap(lambda line:line.split(" "))
wordsRDD.collect()
result : ['hadoop', 'spark', 'flume', 'spark', 'hadoop', 'flume', 'hadoop']
sc.stop()
RDD算子
RDD算子分类
- Transformation(转换)操作:在一个已经存在的RDD上创建一个新的RDD,将旧的RDD数据转换为另外一种形式后放入新的RDD。如:map, flatMap, filter
- Action(动作)操作:执行各个分区的计算任务,将得到的结果返回到driver中。如reduce, collect,show
算子特点
- 惰性求值:Spark中所有的Transformation是Lazy的,它们不会立即执行获得结果。它们只会记录在数据集上要应用的操作,只有当需要返回结果给Driver时才会执行这些操作,通过DAGScheduler和TaskScheduler分发到集群中运行
- 默认情况下,每一个Action运行的时候,其所关联的所有Transformation RDD都会重新计算,但是也可以使用缓存将RDD持久化到磁盘或内存中,这个是为了下次可以更快的访问,会把数据保存到集群上
操作演示
conf = SparkConf().setMaster("local[*]")
sc = SparkContext(conf=conf)
Map算子
rdd1 = sc.range(5)
rdd2 = rdd1.map(lambda x:x * 2)
rdd2.collect()
result : [0, 2, 4, 6, 8]
flatMap算子
list1 = ['Hello Lily', 'Hello Lucy', 'Hello Tim']
rdd1 = sc.parallelize(list1)
rdd2 = rdd1.map(lambda x:x.split(' '))
rdd3 = rdd1.flatMap(lambda x:x.split(' '))
print(rdd2.collect())
result : [['Hello', 'Lily'], ['Hello', 'Lucy'], ['Hello', 'Tim']]
print(rdd3.collect())
result : ['Hello', 'Lily', 'Hello', 'Lucy', 'Hello', 'Tim']
reduceByKey算子
list1 = ['Hello Lily', 'Hello Lucy', 'Hello Tim']
rdd1 = sc.parallelize(list1)
rdd1 = rdd1.flatMap(lambda x:x.split(' '))
rdd1 = rdd1.map(lambda x:(x, 1))
rdd1.collect()
result : [('Hello', 1), ('Lily', 1), ('Hello', 1), ('Lucy', 1), ('Hello', 1), ('Tim', 1)]
rdd2 = rdd1.reduceByKey(lambda x, y:x+y)
rdd2.collect()
result : [('Lily', 1), ('Hello', 3), ('Lucy', 1), ('Tim', 1)]
未完待续。。。
|