spark案例
1、sparkpi
hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ vim SparkPi
hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ run-example SparkPi 10 > SparkPi.txt
object SparkPi {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("Spark Pi")
.getOrCreate()
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1))
spark.stop()
}
}
hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ more SparkPi.txt
Pi is roughly 3.1427711427711427
2、单词计数
对/opt/spark-2.1.0-bin-hadoop2.7/examples/src/main/java/org/apache/spark/examples目录中的JavaWordCount,java进行编译
详细请查阅:http://www.360doc.com/content/17/0323/17/41381374_639514974.shtml
运行
hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ bin/spark-submit --ddai-master spark://ddai-master:7077 --class org.apache.spark.examples.JavaWordCount --executor-memory 2g examples/jars/spark-examples_2.11-2.1.0.jar /input > WordCount.txt
代码详解
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
SparkSession spark = SparkSession
.builder()
.appName("JavaWordCount")
.getOrCreate();
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
spark.stop();
}
}
spark函数
hadoop@ddai-master:~$ hdfs dfs -mkdir /spark
hadoop@ddai-master:~$ hdfs dfs -put spam.data /spark
hadoop@ddai-master:~$ hdfs dfs -text /spark/spam.data
map:文本映射成双精度
scala> val nums = inFile.map(x=>x.split(’ ').map(_.toDouble)) nums: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[2] at map at <console> :26
scala> nums.first() res1: Array[Double] = Array(0.0, 0.64, 0.64, 0.0, 0.32, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.64, 0.0, 0.0, 0.0, 0.32, 0.0, 1.29, 1.93, 0.0, 0.96, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.778, 0.0, 0.0, 3.756, 61.0, 278.0, 1.0)
collect:list转化成rdd
scala> val rdd = sc.parallelize(List(1,2,3,4,5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console> :24 scala> val mapRdd = rdd.map(2*_) mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console> :26 scala> mapRdd.collect res2: Array[Int] = Array(2, 4, 6, 8, 10)
filter:数据过滤
scala> val filterRdd = mapRdd.filter(_>5) filterRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console> :28 scala> filterRdd.collect res3: Array[Int] = Array(6, 8, 10)
flatMap:将单词转化成(key, value)对
scala> val rdd = sc.textFile("/input") rdd: org.apache.spark.rdd.RDD[String] = /input MapPartitionsRDD[30] at textFile at :24 scala> rdd.cache res12: rdd.type = /input MapPartitionsRDD[30] at textFile at :24 scala> val wordCount = rdd.flatMap(.split(’ ')).map(x=>(x,1)).reduceByKey(+_) wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at :26 scala> wordCount.collect res13: Array[(String, Int)] = Array((little.,1), (between,1), (station,1), (too,2), (hadoop,3), (a,1), (good,1), (much,1), (is,1), (hello,2), (bye,1), (world,1), (Happiness,1), (and,1), (way,1)) scala> wordCount.saveAsTextFile("/output/w")
union:联合运算
scala> val rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2))) rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[6] at parallelize at <console> :24 scala> val rdd2 = sc.parallelize(List((‘b’,1),(‘b’,2))) rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[7] at parallelize at <console> :24 scala> rdd1 union rdd2 res4: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[8] at union at <console> :29
scala> res4.collect res7: Array[(Char, Int)] = Array((a,1), (a,2), (b,1), (b,2))
join:连接运算
scala> val rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2),(‘b’,3),(‘b’,4))) rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[9] at parallelize at <console> :24 scala> val rdd2 = sc.parallelize(List((‘a’,5),(‘a’,6),(‘b’,7),(‘b’,8))) rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[10] at parallelize at <console> :24 scala> val rdd3 = rdd1 join rdd2 rdd3: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[13] at join at <console> :28 scala> rdd3.collect res8: Array[(Char, (Int, Int))] = Array((b,(3,7)), (b,(3,8)), (b,(4,7)), (b,(4,8)), (a,(1,5)), (a,(1,6)), (a,(2,5)), (a,(2,6)))
lookup:按key查找values
scala> var rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2),(‘b’,3),(‘b’,4))) rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[14] at parallelize at <console> :24 scala> rdd1.lookup(‘a’) res9: Seq[Int] = WrappedArray(1, 2)
groupByKey:按key值进行分组
scala> var wc = sc.textFile("/input").flatMap(.split(’ ')).map((,1)).groupByKey wc: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[21] at groupByKey at <console> :24 scala> wc.collect res10: Array[(String, Iterable[Int])] = Array((little.,CompactBuffer(1)), (between,CompactBuffer(1)), (station,CompactBuffer(1)), (too,CompactBuffer(1, 1)), (hadoop,CompactBuffer(1, 1, 1)), (a,CompactBuffer(1)), (good,CompactBuffer(1)), (much,CompactBuffer(1)), (is,CompactBuffer(1)), (hello,CompactBuffer(1, 1)), (bye,CompactBuffer(1)), (world,CompactBuffer(1)), (Happiness,CompactBuffer(1)), (and,CompactBuffer(1)), (way,CompactBuffer(1)))
sortByKey:按key进行排序,参数false表示倒序
scala> var wc = sc.textFile("/input").flatMap(.split(’ ')).map((,1)).sortByKey(false) wc: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at sortByKey at <console> :24 scala> wc.collect res11: Array[(String, Int)] = Array((world,1), (way,1), (too,1), (too,1), (station,1), (much,1), (little.,1), (is,1), (hello,1), (hello,1), (hadoop,1), (hadoop,1), (hadoop,1), (good,1), (bye,1), (between,1), (and,1), (a,1), (Happiness,1))
|