使用pyspark.sql.SparkSession创建DataFrame
数据准备
data/score.txt
孙悟空,语文,87
孙悟空,数学,95
孙悟空,英语,68
大海,语文,94
大海,数学,56
大海,英语,84
宋宋,语文,64
宋宋,数学,86
宋宋,英语,84
婷婷,语文,65
婷婷,数学,85
婷婷,英语,78
读取本地文件
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext
rdd = (sc.textFile("../data/score.txt")
.map(lambda x: x.split(','))
.map(lambda x: (x[0], x[1], int(x[2]))))
1. 基于RDD和python列表创建
df = spark.createDataFrame(rdd, schema=['name', 'subject', 'score'])
2. 基于表结构描述对象StructType创建
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
schema = StructType([
StructField('name', StringType(), nullable=True),
StructField('subject', StringType(), nullable=False),
StructField('score', IntegerType(), nullable=True)
])
df = spark.createDataFrame(rdd, schema=schema)
3. toDF的方式创建
方式一、toDF( list )
df = rdd.toDF(['name', 'subject', 'score'])
方式二、toDF( StructType )
schema = (StructType()
.add("name", StringType(), nullable=False)
.add("subject", StringType(), nullable=True)
.add("score", IntegerType(), nullable=False)
)
df1 = rdd.toDF(schema)
4. 基于Pandas的DataFrame创建
import pandas as pd
pd_df = pd.DataFrame({
"id": [1, 2, 3]
, "name": ['坤坤', '鸡哥', '倒戈']
, "age": [12, 14, 21]
})
df = spark.createDataFrame(pd_df)
|