一、 RDD
弹性分布式数据集(resilient distributed dataset),控制不同机器上的不同数据分区,通过“partitionBy”对数据在不同机器上进行重排。一个task对应一个分区。
元数据类型
分区列表、 计算每个分区的函数、 对父RDD的依赖列表、 对Key-Value对数据类型RDD的分区器,控制分区策略和分区数。
创建方式
- Hadoop文件系统输入创建
从集合中创建: intRDD = sc.parallelize([4,1,2,5,5,6,8]) kvRDD.collect() # 动作 kvRDD1 = sc.parallelize([(2,3),(3,5),(4,6),(1,2)]) kvRDD.collect() # action 从文件读取:如果是分布式,需要将文件复制到不同地方 本地 lines = sc.textFile(“file:///home/hadoop/temp/word.txt”) lines.foreach(print) 远程 from pyspark import SparkConf,SparkCountext conf = SparkConf().setMaster(“local”).setAppName(“My App”) sc = SparkContext(conf = conf) lines = sc.textFile(“hdfs://hadoop1:9000/tmp/word.txt”) lines.foreach(print)
from pyspark.files import SparkFiles sc.addFile(“word.txt”) sc.textFile(SparkFiles.get(“word.txt”))
- 父RDD转换得到新的RDD
二、Map
map是对RDD 中的每个元素都执行一个指定的函数来产生一个新的RDD。 rdd.map(lambda x:x+1)
三、Reduce
reduce函数对所有的元素调用同一个函数,可以把所有的数据合并在一起,并返回最终的调查结果。 rdd.reduce(lambda x,y:x+y)
reduceByKey函数专门针对键值对类型的数据,生成新的RDD
四、 DAG
有向无环图(Directed Acyclic Graph)连接了一个个RDD, spark的转换操作不会执行,动作操作才会执行。 不同的RDD之间存在宽依赖和窄依赖, 窄依赖是一个父RDD的分区仅对应一个子分区,有利于计算的并行。 宽依赖是一个父RDD的分区对应多个子分区,一个stage中的所有依赖都结束了,才会进入下一个stage.另外如果某个task出错了,需要重算整个stage。
五、Transformation
对RDD进行转换产生新的RDD。
标准的RDD
map
对每一个元素进行操作
mapPartitions
对每个分区进行操作,
flatMap
先操作再扁平化输出
from pyspark import SparkConf,SparkContext
sc.stop()
conf = SparkConf().setMaster("local").setAppName("flatMap")
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize(['hello', 'You are very good'])
new_rdd1 = rdd1.flatMap(lambda x: x.split())
print('new_rdd1 = ', new_rdd1.collect())
filter
返回值为True的结果 result=rdd.filter(lambda x:x%2==0)
distinct
去重
union
两个rdd合并
cartesian
笛卡尔积
groupBy
后面跟agg()聚合方法,按照需要的聚合函数对数据进行分组聚合统计 跟pandas很像
from pyspark.sql.functions import count,min,max,avg,var_pop,stddev_pop
df.select("code", "sku", "gmv").distinct()\
.groupBy("code")\
.agg(
count("sku").alias("sku_cnt"),
min("gmv").alias("min_gmv"),
max("gmv").alias("max_gmv"),
avg("gmv").alias("avg_gmv"),
stddev_pop("gmv").alias("std_gmv"),
var_pop("gmv").alias("var_gmv")
)\
.sort("code")\
.show(200)
pipe
fold
countByvalue
partitionBy
对RDD进行重新分区,控制分区数
配对的RDD collectAsMap reduceByKey countByKey join rightOuterJoin leftOuterJoin combineByKey groupByKey cogroup
6. Action
没有返回新的rdd。
take、first、collect、count、stats均是查看。 reduce和foreach是用来对每个元素进行处理的
reduce
通过函数func聚集数据集中的所有元素。Func函数接受2个参数,前后元素可以关联。
sc.parallelize([11, 2, 8, 9, 5]).reduce(lambda x,y:max(x,y))
take
返回一个数组,由数据集的前n个元素组成
collect
在Driver的程序中,以数组的形式,返回数据集的所有元素。
count
返回数据集的元素个数
first
返回数据集的第一个元素(类似于take(1)
foreach
在数据集的每一个元素上,运行函数func
stats
返回数字列表RDD的统计信息.
|