学习总结
一、数据统计
1.1 读取文件
步骤1:读取文件https://cdn.coggle.club/Pokemon.csv
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mypyspark').getOrCreate()
from pyspark import SparkFiles
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
df = spark.read.csv(SparkFiles.get('Pokemon.csv'),
header = True,
inferSchema = True)
df.show(5)
并且我们将表头属性有. 的部分进行更名:
df = df.withColumnRenamed('Sp. Atk', 'Sp Atk')
df = df.withColumnRenamed('Sp. Def', 'Sp Def')
df.show(5)
1.2 保存读取的信息
步骤2:将读取的进行保存,表头也需要保存,这里可保存为csv 或者json 格式文件。
df.write.csv(path='Pokemon.csv',mode='overwrite', header=True )
df.write.json(path='Pokemon.json',mode='overwrite' )
1.3 分析每列的类型,取值个数
步骤3:分析每列的类型,取值个数,这里就可以使用df.describe 分析每列的最大值、最小值、平均值、方差等特性了(下面我只贴出HP 的结果)。
for column in df.columns:
df.describe(column).show()
1.4 分析每列是否包含缺失值
步骤4:分析每列是否包含缺失值,打印出有缺失值的样本数,占总数的比例。
df.filter(df['Type 2'].isNull()).count()
df.toPandas().isnull().sum()
Name 0
Type 1 0
Type 2 386
Total 0
HP 0
Attack 0
Defense 0
Sp Atk 0
Sp Def 0
Speed 0
Generation 0
Legendary 0
dtype: int64
打印出有缺失值的样本数,占总数的比例:
for col in df.columns:
print(col, df.filter(df[col].isNull()).count()/df.count())
Name 0.0
Type 1 0.0
Type 2 0.4825
Total 0.0
HP 0.0
Attack 0.0
Defense 0.0
Sp Atk 0.0
Sp Def 0.0
Speed 0.0
Generation 0.0
Legendary 0.0
二、分组聚合
读取文件https://cdn.coggle.club/Pokemon.csv 和刚才的步骤一是一样的。
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mypyspark').getOrCreate()
from pyspark import SparkFiles
spark.sparkContext.addFile('https://cdn.coggle.club/Pokemon.csv')
df = spark.read.csv(SparkFiles.get('Pokemon.csv'),
header = True,
inferSchema = True)
df.show(5)
2.1 学习groupby分组聚合的使用
和sql的group by一样意思,分组聚合,下面找到Type 1 对应数据进行分组和计数。
df.groupby('Type 1').count().show(5)
还可以分组后(按照Type 1 分组)计算每组内的HP 平均值:
df.groupBy(['Type 1']).avg('HP').show(5)
+------+-----------------+
|Type 1| avg(HP)|
+------+-----------------+
| Water| 72.0625|
|Poison| 67.25|
| Steel|65.22222222222223|
| Rock|65.36363636363636|
| Ice| 72.0|
+------+-----------------+
only showing top 5 rows
而如果是计算每组的个数:
df.groupby('Type 1').count().show(5)
+------+-----+
|Type 1|count|
+------+-----+
| Water| 112|
|Poison| 28|
| Steel| 27|
| Rock| 44|
| Ice| 24|
+------+-----+
only showing top 5 rows
2.2 学习agg分组聚合的使用
和2.1一样实现分组后(按照Type 1 分组)计算每组内的HP 平均值,我们也可以用agg 分组聚合,{'HP': 'mean'} 为参数。
df.groupBy('Type 1').agg({'HP': 'mean'}).show(5)
+------+-----------------+
|Type 1| avg(HP)|
+------+-----------------+
| Water| 72.0625|
|Poison| 67.25|
| Steel|65.22222222222223|
| Rock|65.36363636363636|
| Ice| 72.0|
+------+-----------------+
only showing top 5 rows
当数据量不是很多时,可以使用toPandas 转成pandas的dataframe,然后使用pandas的各种函数:
df.toPandas().groupby('Type 1', as_index=False)['HP'].agg({'hp_mean': 'mean'})
2.3 transform的使用
transform的参数如下,可以看到需要传入一个函数(用于转换数据)。
DataFrame.transform(func: Callable[[…], Series], axis: Union[int, str] = 0, *args: Any, **kwargs: Any) → DataFrame
如果我们要按照Type 1 分组后,求组内关于HP 的平均值,可以将np.mean 函数传入transform ,具体介绍可以参考pyspark官方文档。
df.toPandas().groupby('Type 1')['HP'].transform(np.mean)
0 67.271429
1 67.271429
2 67.271429
3 67.271429
4 69.903846
...
795 65.363636
796 65.363636
797 70.631579
798 70.631579
799 69.903846
Name: HP, Length: 800, dtype: float64
Reference
- pyspark官方文档
- https://spark.apache.org/docs/latest/quick-start.html
- https://spark.apache.org/docs/latest/sql-programming-guide.html
- https://github.com/apache/spark/tree/4f25b3f712/examples/src/main/python
- https://sparkbyexamples.com/pyspark-tutorial/
- https://www.tutorialspoint.com/pyspark/index.htm
- pyspark—agg的用法
|