一、编写和传递转换函数
1、Spark中的函数式编程(Functional Programming in Spark)
- 函数式编程的关键概念
- 函数是编程的基本单元
- 函数只有输入和输出
- 函数可以作为参数传递给其他函数
- Spark的架构基于函数式编程
2、RDD转换程序(RDD Transformation Procedures )
- RDD转换执行一个转换过程
- 一些变换操作实现它们自己的变换逻辑
- 大多数变换操作需要你传递一个函数
- 函数实现您自己的转换过程
- 例如:map and filter
- 这是RDDs和DataFrame/Datasets之间的关键区别
3、传递函数(Passing Functions )
- 传递的函数可以被命名或匿名
- 匿名函数在没有标识符的情况下进行内联定义
- 适合简短、一次性的功能
- 支持多种编程语言
- Python: lambda x: …
- Scala: x => …
- Java 8: x -> …
4、示例:传递命名函数
def toUpper(s):
return s.upper()
myRDD = sc.textFile("purplecow.txt")
myUpperRDD = myRDD.map(toUpper)
for line in myUpperRDD.take(2):
print(line)
I'VE NEVER SEEN A PURPLE COW.
I NEVER HOPE TO SEE ONE;
def toUpper(s: String):
String = { s.toUpperCase }
val myRDD = sc.textFile("purplecow.txt")
val myUpperRDD = myRDD.map(toUpper)
myUpperRDD.take(2).foreach(println)
I'VE NEVER SEEN A PURPLE COW.
I NEVER HOPE TO SEE ONE;
5、示例:传递匿名函数
- Python:使用lambda关键字指定输入参数的名称和返回输出的函数
myUpperRDD = myRDD.map(lambda line: line.upper())
- Scala:使用=>操作符来指定输入参数的名称和返回输出的函数
val myUpperRDD = myRDD.map(line => line.toUpperCase)
- Scala快捷方式:使用下划线(_)表示匿名输入参数
val myUpperRDD = myRDD.map(_.toUpperCase)
6、示例:map and filter转换
myFilteredRDD = myRDD.map(lambda line: line.upper()).filter(lambda line:line.startswith('I'))
val myFilteredRDD = myRDD.map(line => line.toUpperCase).filter(line => line.startsWith("I"))
二、转换执行(Transformation Execution )
1、RDD Execution
- 一个RDD query由一个action完成的一个或多个转换序列组成
- RDD queries延迟执行的
- RDD queries的执行方式不同于DataFrame queries和Dataset queries
- DataFrames和Datasets扫描它们的源以确定schema(当创建时)
- RDDs没有schema,在加载之前不会扫描它们的源
2、RDD Lineage
- 转换基于一个或多个现有RDD创建一个新的RDDs
- 结果RDDs被认为是基础(父)RDD的子RDD
- 子RDD依赖于父RDD
- 一个RDD的谱系(Lineage)是它所依赖的祖先RDD的序列
- 当RDD执行时,它从源开始执行沿袭(Lineage)
- Spark维护每个RDD的血统(Lineage)
- 使用toDebugString查看沿袭(Lineage)
3、RDD Lineage和toDebugString (Scala)
val myFilteredRDD = sc.textFile("purplecow.txt").map(line => line.toUpperCase).filter(line => line.startsWith("I"))
myFilteredRDD.toDebugString
(2) MapPartitionsRDD[7] at filter …
| MapPartitionsRDD[6] at map …
| purplecow.txt
MapPartitionsRDD[5]
at textFile …
| purplecow.txt HadoopRDD[4]
at textFile …
4、RDD Lineage and toDebugString (Python)
- 在Python shell中,toDebugString输出没有很好地显示
myFilteredRDD.toDebugString()
(2) PythonRDD[7] at RDD at PythonRDD.scala:48 []\n | purplecow.txt MapPartitionsRDD[6] … []\n | purplecow.txt HadoopRDD[5] at textFile … []
print myFilteredRDD.toDebugString()
(2) PythonRDD[7] at RDD at PythonRDD.scala:48 []
| purplecow.txt MapPartitionsRDD[6] at textFile …
| purplecow.txt HadoopRDD[5] at textFile …
5、Pipelining(管道)
- 在可能的情况下,Spark将按元素执行一系列转换,因此不存储数据
Scala:
val myFilteredRDD = sc.textFile("purplecow.txt").map(line => line.toUpperCase).filter(line => line.startsWith("I"))
myFilteredRDD.take(2)
val myFilteredRDD = sc.textFile("purplecow.txt").map(line => line.toUpperCase).filter(line => line.startsWith("I"))
myFilteredRDD.take(2)
val myFilteredRDD = sc.textFile("purplecow.txt").map(line => line.toUpperCase).filter(line => line.startsWith("I"))
myFilteredRDD.take(2)
val myFilteredRDD = sc.textFile("purplecow.txt").map(line => line.toUpperCase).filter(line => line.startsWith("I"))
myFilteredRDD.take(2)
I'VE NEVER SEEN A PURPLE COW.
val myFilteredRDD = sc.textFile("purplecow.txt").map(line => line.toUpperCase).filter(line => line.startsWith("I"))
myFilteredRDD.take(2)
I'VE NEVER SEEN A PURPLE COW.
val myFilteredRDD = sc.textFile("purplecow.txt").map(line => line.toUpperCase).filter(line => line.startsWith("I"))
myFilteredRDD.take(2)
I'VE NEVER SEEN A PURPLE COW.
val myFilteredRDD = sc.textFile("purplecow.txt").map(line => line.toUpperCase).filter(line => line.startsWith("I"))
myFilteredRDD.take(2)
I'VE NEVER SEEN A PURPLE COW.
val myFilteredRDD = sc.textFile("purplecow.txt").map(line => line.toUpperCase).filter(line => line.startsWith("I"))
myFilteredRDD.take(2)
I'VE NEVER SEEN A PURPLE COW.
I NEVER HOPE TO SEE ONE;
三、RDDs和DataFrames之间的转换
1、将RDDs转换为DataFrames
- 你可以从RDD创建一个数据帧
- 对于文本等非结构化或半结构化数据非常有用
- 定义架构
- 将基本RDD转换为Row对象(Scala)或列表(Python)的RDD
- 使用SparkSession.createDataFrame
- 你也可以返回一个DataFrame的底层RDD
- 使用DataFrame.rdd属性返回Row对象的RDD
2、示例:从RDD创建DataFrame
示例数据:半结构化文本数据源
02134,Hopper,Grace,52
94020,Turing,Alan,32
94020,Lovelace,Ada,28
87501,Babbage,Charles,49
02134,Wirth,Niklaus,48
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val mySchema = StructType(Array(
StructField("pcode",StringType),
StructField("lastName",StringType),
StructField("firstName",StringType),
StructField("age",IntegerType)))
val rowRDD = sc.textFile("people.txt").map(line => line.split(",")).map(values => Row(values(0),values(1),values(2),values(3).toInt))
val myDF = spark.createDataFrame(rowRDD,mySchema)
myDF.show(2)
+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|02134| Hopper| Grace| 52|
|94020| Turing| Alan| 32|
+-----+--------+---------+---+
from pyspark.sql.types import *
mySchema = StructType([
StructField("pcode",StringType()),
StructField("lastName",StringType()),
StructField("firstName",StringType()),
StructField("age",IntegerType())])
myRDD = sc.textFile("people.txt").map(lambda line: line.split(",")).map(lambda values:[values[0],values[1],values[2],int(values[3])])
myDF = spark.createDataFrame(myRDD,mySchema)
myDF.show(2)
+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|02134| Hopper| Grace| 52|
|94020| Turing| Alan| 32|
+-----+--------+---------+---+
3、示例:返回一个DataFrame的底层RDD
myRDD2 = myDF.rdd
for row in myRDD2.take(2): print(row)
Row(pcode=u'02134', lastName=u'Hopper', firstName=u'Grace', age=52) Row(pcode=u'94020', lastName=u'Turing', firstName=u'Alan', age=32)
val myRDD2 = myDF.rdd
myRDD2.take(2).foreach(println)
[02134,Hopper,Grace,52]
[94020,Turing,Alan,32]
四、基本要点
- RDDs(弹性分布式数据集)是Spark中的一个关键概念
- RDD Operations
- 转换在现有RDD的基础上创建新的RDD
- Actions从RDD返回一个值
- RDD转换使用函数式编程
- RDD查询执行是惰性的
- transformation在被操作触发之前不会执行
- 如果可能,同一RDD元素的操作流水线在一起
五、实践练习:使用RDD转换数据
1、探索Loudacre Web日志文件
1、在本节中,您将使用$DEVDATA/weblogs中的数据。查看目录中的.log文件。注意这几行的格式: 2、将“weblogs”目录从本地文件系统复制到HDFS目录“/devsh_loudacre”。
$ hdfs dfs -put $DEVDATA/weblogs /devsh_loudacre/
3、在Spark中,从HDFS中“/devsh_loudacre/weblogs/”目录下上传的web日志数据文件中创建RDD。
pyspark> logsRDD = sc.textFile("/devsh_loudacre/weblogs/")
scala> val logsRDD = sc.textFile("/devsh_loudacre/weblogs/")
4、创建一个只包含JPG文件请求行的RDD。使用带有转换函数的过滤操作,该转换函数接受一个字符串RDD元素并返回一个布尔值。
pyspark> jpglogsRDD = logsRDD.filter(lambda line:".jpg" in line)
scala> val jpglogsRDD = logsRDD.filter(line => line.contains(".jpg"))
5、使用take返回jpglogsRDD中的前五行数据。返回值是字符串列表(Python)或字符串数组(Scala)。
pyspark> jpgLines = jpglogsRDD.take(5)
scala> val jpgLines = jpglogsRDD.take(5)
6、循环并显示take返回的字符串。
pyspark> for line in jpgLines: print(line)
scala> jpgLines.foreach(println)
7、使用映射转换来定义一个新的RDD。从一个简单的map函数开始,该函数返回日志文件中每一行的长度。这将导致一个整数的RDD。
pyspark> lineLengthsRDD = logsRDD.map(lambda line: len(line))
scala> val lineLengthsRDD = logsRDD.map(line => line.length)
8、循环并显示RDD中的前5个元素(整数)。
9、计算行长度不是很有用。相反,尝试通过基于空格分割字符串来映射logsRDD中的每个字符串。结果将是一个RDD,其中每个元素都是字符串列表(Python)或字符串数组(Scala)。每个字符串代表web日志行中的一个“字段”。
pyspark> lineFieldsRDD = logsRDD.map(lambda line: line.split(' '))
scala> val lineFieldsRDD = logsRDD.map(line => line.split(' '))
10、返回lineFieldsRDD的前5个元素。结果将是字符串列表的列表(Python)或字符串数组的数组(Scala)。
pyspark> lineFields = lineFieldsRDD.take(5)
scala> val lineFields = lineFieldsRDD.take(5)
11、显示从take返回的内容。与上面的示例不同,上面的示例返回简单值(字符串和整数)的集合,这次您有一组复合值(包含字符串的数组或列表)。因此,为了正确地显示它们,您需要在lineFields中遍历数组/列表,然后遍历数组/列表中的每个字符串。为了更容易读取输出,使用-------分隔每一组字段值。如果您选择复制并粘贴下面的Pyspark代码到shell中,它可能不会自动正确缩进;在执行命令之前,请确保缩进是正确的。
pyspark> for fields in lineFields: print("-------") for field in fields: print(field)
scala> for (fields <- lineFields){
println("-------")
fields.foreach(println)}
12、现在您知道了map是如何工作的,那么创建一个新的RDD,其中只包含来自日志文件中每一行的IP地址。(IP地址是每行中以空格分隔的第一个字段。)
pyspark> ipsRDD = logsRDD.map(lambda line: line.split(' ')[0])
pyspark> for ip in ipsRDD.take(5): print(ip)
scala> val ipsRDD = logsRDD.map(line => line.split(' ')(0))
scala> ipsRDD.take(5).foreach(println)
13、最后,将IP地址列表保存为文本文件:
pyspark> ipsRDD.saveAsTextFile("/devsh_loudacre/iplist")
scala> ipsRDD.saveAsTextFile("/devsh_loudacre/iplist")
注意:如果您重新运行此命令,您将无法保存到相同的目录,因为它已经存在。再次保存之前,请务必删除HDFS中的目录。
14、列出HDFS目录“/devsh_loudacre/iplist”的内容。检查其中一个文件的内容,以确认它们是正确创建的。
2、映射Weblog条目到IP Address/User ID Pairs
15、使用RDD转换为每个HTML文件请求创建一个包含IP地址和相应用户ID的数据集。(过滤扩展名为.html的文件;忽略对其他文件类型的请求)用户ID是每个日志文件行中的第三个字段。将数据保存到/devsh_loudacre/userips_csv目录下以逗号分隔的文本文件中。确保数据以逗号分隔字符串的形式保存: 16、现在数据是CSV格式,Spark SQL可以轻松使用。将上面创建的/devsh_loudacre/userips_csv中的新CSV文件加载到DataFrame中,然后查看数据和模式。
3、附加练习1:清洁设备状态数据(选做)
4、练习2:转换多行XML文件到CSV文件(选做)
|