背景
使用sparkContext.textFile(path) 处理hdfs上的文件,path 配置的是*通配符。
返回的RDD是封装了文件每行数据内容的RDD,不包含文件的路径信息,业务中需要根据文件的路径信息填充字段,原生的textFile 方法不能满足需求。
解决
调用spark提供的hadoopFile 方法生成HadoopRDD,然后从RDD的分片信息上获取文件路径信息,将路径信息和文件行内容组装成tuple,供后续处理。
实现
我是使用Java Spark实现的,Scala也同理
package com.upupfeng.util;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.spark.api.java.JavaHadoopRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Iterator;
public class SparkUtil {
public static JavaRDD<Tuple2<String, String>> textFile(JavaSparkContext jsc,
String path) {
JavaHadoopRDD<LongWritable, Text> hadoopRDD
= (JavaHadoopRDD<LongWritable, Text>) jsc.hadoopFile(path, TextInputFormat.class, LongWritable.class, Text.class);
JavaRDD<Tuple2<String, String>> tuple2JavaRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, iterator) -> {
FileSplit fileSplit = (FileSplit) inputSplit;
String filePath = fileSplit.getPath().toString();
Iterator newIterator = new Iterator<Tuple2<String, String>>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public Tuple2<String, String> next() {
return Tuple2.apply(filePath, iterator.next()._2.toString());
}
};
return newIterator;
}, false);
return tuple2JavaRDD;
}
}
|