package pro_spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author YaPeng Li
* @version 0.0.1
**/
object FileCreateRDDByPartition04{
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("FileCreateRDDByPartition")
val sc = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sc.textFile("input")
fileRDD.collect().foreach(println)
"""
|
| def textFile(
| path: String,
| minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
| assertNotStopped()
| hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
| minPartitions).map(pair => pair._2.toString).setName(path)
| }
|
| def hadoopFile[K, V](
| path: String,
| inputFormatClass: Class[_ <: InputFormat[K, V]],
| keyClass: Class[K],
| valueClass: Class[V],
| minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
| assertNotStopped()
|
| // This is a hack to enforce loading hdfs-site.xml.
| // See SPARK-11227 for details.
| FileSystem.getLocal(hadoopConfiguration)
|
| // A Hadoop configuration can be about 10 KiB, which is pretty big, so broadcast it.
| val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
| val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
| new HadoopRDD(
| this,
| confBroadcast,
| Some(setInputPathsFunc),
| inputFormatClass,
| keyClass,
| valueClass,
| minPartitions).setName(path)
| }
|
| val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
|
|
| long totalSize = 0; // compute total size
| for (FileStatus file: files) { // check we have valid files
| if (file.isDirectory()) {
| throw new IOException("Not a file: "+ file.getPath());
| }
| totalSize += file.getLen();
| }
|
| long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
| long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
| FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
|
| 根据偏移量来进行存储。
|
|""".stripMargin
}
}
|