DataFrame对象构成
层面 | 对象 | 说明 |
---|
结构 | StructType | 描述整个DataFrame的表结构 | StructField | 描述一个列的信息 | 数据 | Column | 记录一列数据并包含列的信息 | Row | 记录一行数据 |
DataFrame对象创建
Hichael,29
Andy,30
Justin,19
2.1 基于RDD的创建方式一
又可称 从RDD转化为DafaFrame [利用反射机制推断RDD模式]
from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__ == '__main__':
spark = SparkSession.builder\
.config(conf= SparkConf())\
.appName("CREATE DataFrame 01")\
.master("local[*]")\
.getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("./data/people.txt")\
.map(lambda line : line.strip().split(","))\
.map(lambda line : (line[0] , int(line[1])))
df = spark.createDataFrame(rdd,schema= ['name' , 'age'])
df.printSchema()
df.show()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Hichael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
df = spark.createDataFrame(rdd)
df.printSchema()
df.show()
root
|-- _1: string (nullable = true)
|-- _2: long (nullable = true)
+-------+---+
| _1| _2|
+-------+---+
|Hichael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
- show(n=20, truncate=True, vertical=False) API 说明
参数 | 官方解释 | 个人理解 |
---|
n | Number of rows to show. | 指定显示DataFrame中的行数 | truncate | If set to True , truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right. | 默认为True,意为对列中数据截断,若列的字符长度超过20,便以…替代,设为False,便不再截断,全部显示 | vertical | If set to True , print output rows vertically (one line per column value). | 若设为true,便竖直打印Row对象 |
- 教材[林子雨 主编 Spark编程基础]示例
将Rdd中字符串构造为 Row对象 ,从而在创建DataFrame时无需指定Schema 构造Row对象时,形参名不可省略
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import Row
if __name__ == '__main__':
spark = SparkSession.builder\
.config(conf= SparkConf())\
.appName("CREATE DataFrame 01")\
.master("local[*]")\
.getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("./data/people.txt")\
.map(lambda line : line.strip().split(","))\
.map(lambda line : Row(name = line[0] , age = int(line[1])))
df = spark.createDataFrame(rdd)
df.printSchema()
df.show(2)
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Hichael| 29|
| Andy| 30|
+-------+---+
only showing top 2 rows
2.2 基于RDD的创建方式二
又可称 从RDD转化为DafaFrame [使用编程方式定义RDD模式] 创建表结构信息
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
if __name__ == '__main__':
spark = SparkSession.builder\
.config(conf= SparkConf())\
.appName("CREATE DataFrame 01")\
.master("local[*]")\
.getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("./data/people.txt")\
.map(lambda line : line.strip().split(","))\
.map(lambda line : (line[0] , int(line[1])))
schema = StructType()\
.add("name" , StringType() , nullable=False)\
.add("age" , IntegerType() , nullable=True)
df = spark.createDataFrame(rdd,schema)
df.printSchema()
df.show()
root
|-- name: string (nullable = false)
|-- age: integer (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Hichael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
2.3 基于RDD的创建方式三
使用rdd的 toDF()方法
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
if __name__ == '__main__':
spark = SparkSession.builder\
.config(conf= SparkConf())\
.appName("CREATE DataFrame 01")\
.master("local[*]")\
.getOrCreate()
sc = spark.sparkContext
rdd = sc.textFile("./data/people.txt")\
.map(lambda line : line.strip().split(","))\
.map(lambda line : (line[0] , int(line[1])))
schema = StructType()\
.add("name" , StringType() , nullable=False)\
.add("age" , IntegerType() , nullable=True)
print("toDF中 传入一个参数: 列名列表[只传列名,类型自动推断,默认该列均为允许为空]")
df = rdd.toDF([ "name" ,"age"])
df.printSchema()
df.show()
print("toDF中 传入一个参数: 表结构[完整的schema描述对象StructType]")
df = rdd.toDF(schema)
df.printSchema()
df.show()
toDF中 传入一个参数: 列名列表[只传列名,类型自动推断,默认该列均为允许为空]
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Hichael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
toDF中 传入一个参数: 表结构[完整的schema描述对象StructType]
root
|-- name: string (nullable = false)
|-- age: integer (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Hichael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
2.4 基于RDD的创建方式四
文件 /usr/local/spark/examples/src/main/resources/people.json 无需创建,解压spark时便存在
from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__ == '__main__':
spark = SparkSession.builder\
.config(conf = SparkConf())\
.appName("pandas")\
.master("local[*]")\
.getOrCreate()
df = spark.read.json("/usr/local/spark/examples/src/main/resources/people.json")
df.printSchema()
df.show()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
2.5 基于pandas的创建方式
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__ == '__main__':
spark = SparkSession.builder\
.config(conf = SparkConf())\
.appName("pandas")\
.master("local[*]")\
.getOrCreate()
df = pd.read_csv("./data/people.txt" , names = ["name" , "age"] ,header=None)
df = spark.createDataFrame(df)
df.printSchema()
df.show()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
+-------+---+
| name|age|
+-------+---+
|Hichael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
|