| 背景使用sparkContext.textFile(path)处理hdfs上的文件,path配置的是*通配符。 返回的RDD是封装了文件每行数据内容的RDD,不包含文件的路径信息,业务中需要根据文件的路径信息填充字段,原生的textFile方法不能满足需求。 解决调用spark提供的hadoopFile方法生成HadoopRDD,然后从RDD的分片信息上获取文件路径信息,将路径信息和文件行内容组装成tuple,供后续处理。 实现我是使用Java Spark实现的,Scala也同理 package com.upupfeng.util;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.spark.api.java.JavaHadoopRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Iterator;
public class SparkUtil {
    public static JavaRDD<Tuple2<String, String>> textFile(JavaSparkContext jsc,
                                                           String path) {
        JavaHadoopRDD<LongWritable, Text> hadoopRDD
                = (JavaHadoopRDD<LongWritable, Text>) jsc.hadoopFile(path, TextInputFormat.class, LongWritable.class, Text.class);
        JavaRDD<Tuple2<String, String>> tuple2JavaRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, iterator) -> {
            
            FileSplit fileSplit = (FileSplit) inputSplit;
            
            String filePath = fileSplit.getPath().toString();
            Iterator newIterator = new Iterator<Tuple2<String, String>>() {
                @Override
                public boolean hasNext() {
                    return iterator.hasNext();
                }
                @Override
                public Tuple2<String, String> next() {
                    
                    return Tuple2.apply(filePath, iterator.next()._2.toString());
                }
            };
            return newIterator;
        }, false);
        return tuple2JavaRDD;
    }
}
 |