一、Spark RDD
RDD:resilient distributed dataset (RDD)
每个spark程序都有一个driver program运行main函数,在cluster集群上执行各种并行操作。我们也可以将RDD持久化到内存,便于在并行操作中重用。RDD 是 Spark 最基本的数据抽象,它是只读的、分区记录的集合,支持并行操作,可以由外部数据集或其他 RDD 转换而来,它具有以下特性:
- 一个 RDD 由一个或者多个分区(Partitions)组成。对于 RDD 来说,每个分区会被一个计算任务所处理,用户可以在创建 RDD 时指定其分区个数,如果没有指定,则默认采用程序所分配到的 CPU 的核心数;
- RDD 拥有一个用于计算分区的函数 compute;
- RDD 会保存彼此间的依赖关系,RDD 的每次转换都会生成一个新的依赖关系,这种 RDD 之间的依赖关系就像流水线一样。在部分分区数据丢失后,可以通过这种依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算;
- Key-Value 型的 RDD 还拥有 Partitioner(分区器),用于决定数据被存储在哪个分区中,目前 Spark 中支持 HashPartitioner(按照哈希分区) 和 RangeParationer(按照范围进行分区);
- 一个优先位置列表 (可选),用于存储每个分区的优先位置 (prefered location)。对于一个 HDFS 文件来说,这个列表保存的就是每个分区所在的块的位置,按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能的将计算任务分配到其所要处理数据块的存储位置。
二、使用RDD functions完成任务2的统计逻辑
from pyspark.sql import SparkSession
from pyspark import SparkFiles
import pandas as pd
spark = SparkSession.builder.appName('pyspark').getOrCreate()
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
df = spark.read.csv("file://"+SparkFiles.get("Pokemon.csv"), header=True, inferSchema= True)
df = df.withColumnRenamed('Sp. Atk', 'SpAtk')
df = df.withColumnRenamed('Sp. Def', 'SpDef')
df = df.withColumnRenamed('Type 1', 'Type1')
df = df.withColumnRenamed('Type 2', 'Type2')
df.show()
+--------------------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+
| Name|Type1| Type2|Total| HP|Attack|Defense|SpAtk|SpDef|Speed|Generation|Legendary|
+--------------------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+
| Bulbasaur|Grass|Poison| 318| 45| 49| 49| 65| 65| 45| 1| false|
| Ivysaur|Grass|Poison| 405| 60| 62| 63| 80| 80| 60| 1| false|
| Venusaur|Grass|Poison| 525| 80| 82| 83| 100| 100| 80| 1| false|
|VenusaurMega Venu...|Grass|Poison| 625| 80| 100| 123| 122| 120| 80| 1| false|
| Charmander| Fire| null| 309| 39| 52| 43| 60| 50| 65| 1| false|
| Charmeleon| Fire| null| 405| 58| 64| 58| 80| 65| 80| 1| false|
| Charizard| Fire|Flying| 534| 78| 84| 78| 109| 85| 100| 1| false|
|CharizardMega Cha...| Fire|Dragon| 634| 78| 130| 111| 130| 85| 100| 1| false|
|CharizardMega Cha...| Fire|Flying| 634| 78| 104| 78| 159| 115| 100| 1| false|
| Squirtle|Water| null| 314| 44| 48| 65| 50| 64| 43| 1| false|
| Wartortle|Water| null| 405| 59| 63| 80| 65| 80| 58| 1| false|
| Blastoise|Water| null| 530| 79| 83| 100| 85| 105| 78| 1| false|
|BlastoiseMega Bla...|Water| null| 630| 79| 103| 120| 135| 115| 78| 1| false|
| Caterpie| Bug| null| 195| 45| 30| 35| 20| 20| 45| 1| false|
| Metapod| Bug| null| 205| 50| 20| 55| 25| 25| 30| 1| false|
| Butterfree| Bug|Flying| 395| 60| 45| 50| 90| 80| 70| 1| false|
| Weedle| Bug|Poison| 195| 40| 35| 30| 20| 20| 50| 1| false|
| Kakuna| Bug|Poison| 205| 45| 25| 50| 25| 25| 35| 1| false|
| Beedrill| Bug|Poison| 395| 65| 90| 40| 45| 80| 75| 1| false|
|BeedrillMega Beed...| Bug|Poison| 495| 65| 150| 40| 15| 80| 145| 1| false|
+--------------------+-----+------+-----+---+------+-------+-----+-----+-----+----------+---------+
only showing top 20 rows
rdd = df.rdd
cols = df.columns
for i in range(len(cols)):
print('-'*10,cols[i],'-'*10)
print('不同的取值个数:',len(dict(rdd.map(lambda x: x[i]).countByValue())))
print('空值个数:',rdd.filter(lambda x: x[i] == None).count())
"""
---------- Name ----------
不同的取值个数: 799
空值个数: 0
---------- Type1 ----------
不同的取值个数: 18
空值个数: 0
---------- Type2 ----------
不同的取值个数: 19
空值个数: 386
---------- Total ----------
不同的取值个数: 200
空值个数: 0
---------- HP ----------
不同的取值个数: 94
空值个数: 0
---------- Attack ----------
不同的取值个数: 111
空值个数: 0
---------- Defense ----------
不同的取值个数: 103
空值个数: 0
---------- SpAtk ----------
不同的取值个数: 105
空值个数: 0
---------- SpDef ----------
不同的取值个数: 92
空值个数: 0
---------- Speed ----------
不同的取值个数: 108
空值个数: 0
---------- Generation ----------
不同的取值个数: 6
空值个数: 0
---------- Legendary ----------
不同的取值个数: 2
空值个数: 0
"""
Reference
[1] 官方文档RDD Programming Guide [2] https://blog.csdn.net/qq_56870570/article/details/118177403?spm=1001.2014.3001.5506 [3] Spark入门阶段一之扫盲笔记 [4] (重点)SPARK官方教程系列快速入门 [5] Spark RDD 简介
|