map
String转为Tuple2<String, Integer>
public class test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas pip", "numpy", "pip", "pip", "pip"));
JavaRDD<String> mapResult = lines.map(new Function<String, String>() {
@Override
public String call(String o) throws Exception {
return o.concat("newTail");
}
});
JavaRDD<Iterable<Tuple2<String, Integer>>> maprdd = lines.map(new Function<String, Iterable<Tuple2<String, Integer>>>() {
public Iterable<Tuple2<String, Integer>> call(String line) throws Exception {
String[] fields = line.split(" ");
ArrayList<Tuple2<String, Integer>> al = new ArrayList<Tuple2<String, Integer>>();
for (int i = 0; i < fields.length; i++) {
al.add(new Tuple2(fields[i], 1));
}
return al;
}
});
maprdd.foreach(s -> {
System.out.println(s);
});
}
}
flatMap
<String>类型的转为<String> 这里比如一行元素有两个单词,使用这个方法可以将这一行元素拆为两行元素,每行元素一个单词
public class test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas pip", "numpy", "pip", "pip", "pip"));
JavaRDD<String> flatMapResult = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.toString().split(" ")).iterator();
}
});
flatMapResult.foreach(s -> {
System.out.println(s);
});
}
}
flatMapToPair
<String>类型的转为<String, Integer> 将每一行的元素拆分后再转为键值对类型的RDD
public class test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("jiangtao_demo").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD lines = jsc.parallelize(Arrays.asList("pandas pip", "numpy", "pip", "pip", "pip"));
JavaPairRDD<String, Integer> result1 = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer>> call(String line) throws Exception {
String[] fields = line.split(" ");
ArrayList<Tuple2<String, Integer>> al = new ArrayList<Tuple2<String, Integer>>();
for (int i = 0; i < fields.length; i++) {
al.add(new Tuple2<String, Integer>(fields[i], 1));
}
return al.iterator();
}
});
result1.foreach(s -> {
System.out.println(s);
});
}
}
|