Spark 读取CSV文件为RDD
1 准备数据
在开始之前,假设我们在文件夹“c:/tmp/files”中有以下带有逗号分隔文件内容的 CSV 文件名,我使用这些文件来演示示例。
Before we start, let’s assume we have the following CSV file names with comma delimited file contents at folder “c:/tmp/files” and I use these files to demonstrate the examples.
文件名 | 文件内容 |
---|
text01.csv | Col1,Col2 One,1 Eleven,11 | text02.csv | Col1,Col2 Two,2 Twenty One,21 | text03.csv | Col1,Col2 Three,3 | text04.csv | Col1,Col2 Four,4 |
2 加载CSV文件为RDD
textFile() method read an entire CSV record as a String and returns RDD[String], hence, we need to write additional code in Spark to transform RDD[String] to RDD[Array[String]] by splitting the string record with a delimiter.
textFile() 方法将整个 CSV 记录作为字符串读取并返回 RDD[String],因此,我们需要在 Spark 中编写额外的代码,通过将字符串记录拆分为 RDD[String] 到 RDD[Array[String]] 分隔符。
下面的示例将文件读入“rddFromFile”RDD 对象,RDD 中的每个元素都表示为一个字符串。
The below example reads a file into “rddFromFile” RDD object, and each element in RDD represents as a String.
object CSVDataDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("csvDemo")
.master("local").getOrCreate()
val RDD: RDD[String] = spark.sparkContext.textFile("InData/SparkScalaExampleData/CSVData02/*")
val RDD1: RDD[Array[String]] = RDD.map{f => f.split(",")}
RDD1.foreach(f=>{
println("Col1:"+f(0)+",Col2:"+f(1))
})
}
}
3 读取 CSV 文件时跳过标题
When you have a header with column names in a CSV file and to read and process with Spark RDD, you need to skip the header as there is no way in RDD to specify your file has a header.
当您在 CSV 文件中有包含列名的标题并使用 Spark RDD 读取和处理时,您需要跳过标题,因为 RDD 中无法指定您的文件具有标题。
object CSVDataDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("csvDemo")
.master("local").getOrCreate()
val RDD: RDD[String] = spark.sparkContext.textFile("InData/SparkScalaExampleData/CSVData02/header.csv")
val RDDCSV: RDD[Array[String]] = RDD.map{f => f.split(",")}
println("--------原始csv数据---------")
RDDCSV.foreach(f => println(f.mkString(",")))
val re: RDD[Array[String]] = RDDCSV.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter
}
println("--------原始csv数据删除第一行后---------")
re.foreach(f => println(f.mkString(",")))
}
}
4 将多个 CSV 文件读入 RDD
To read multiple CSV files in Spark, just use textFile() method on SparkContext object by passing all file names comma separated. The below example reads text01.csv & text02.csv files into single RDD.
要在 Spark 中读取多个 CSV 文件,只需在 SparkContext 对象上通过传递所有 文件名逗号分隔。 下面的示例将 text01.csv 和 text02.csv 文件读入单个 RDD。
object CSVDataDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("csvDemo")
.master("local").getOrCreate()
val rdd4 = spark.sparkContext.textFile("InData/SparkScalaExampleData/CSVData02/text01.csv" +
",InData/SparkScalaExampleData/CSVData02/text02.csv")
rdd4.foreach(f=>{
println(f)
})
}
}
5 将目录中的所有CSV文件读入RDD
To read all CSV files in a directory or folder, just pass a directory path to the testFile() method.
要读取目录或文件夹中的所有 CSV 文件,只需将目录路径传递给 testFile() 方法。
object CSVDataDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("csvDemo")
.master("local").getOrCreate()
val rdd4 = spark.sparkContext.textFile("InData/SparkScalaExampleData/CSVData02/*")
rdd4.foreach(f=>{
println(f)
})
}
}
6 总结
In this tutorial you have learned how to read a single CSV file, multiples CSV files and reading all CSV files from a directory/folder into a single Spark RDD. You have also learned how to skip the header while reading CSV files into RDD.
在本教程中,您学习了如何读取单个 CSV 文件、多个 CSV 文件以及将目录/文件夹中的所有 CSV 文件读取到单个 Spark RDD 中。 您还学习了如何在将 CSV 文件读入 RDD 时跳过标题。
|