一、启动spark
###1.先启动zookeeper 三台虚拟机都要启动
zkServer.sh start
2.启动hadoop
start-all.sh
3.启动spark
在spark的根目录下输入
sbin/start-all.sh
spark-shell
二、Spark Rdd的简单操作
1.从文件系统加载数据创建ADD
(1)从Linux本地文件系统加载数据创建RDD——textFile(path)
val rdd = sc.textFile("file:///root/word.txt")
rdd.collect() //查看命令
(2)从HDFS中加载数据创建RDD
val rdd = sc.textFile("/spark/test/word.txt")
rdd.collect()
scala> val rdd = sc.textFile("/spark/test/word.txt") rdd: org.apache.spark.rdd.RDD[String] = /spark/test/word.txt MapPartitionsRDD[60] at textFile at :24 scala> rdd.collect() res27: Array[String] = Array(hello java, hello hadoop, hello mysql)
2.通过集合创建RDD——prarallize()
从一个已经存在的集合、数组,通过sarkContext对象调用parallelize的方法创建RDD,
val array =Array(1,2,3,4,5)
val arrRdd = sc.parallelize(array)
arrRdd.collect()
scala> val array =Array(1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5) scala> val arrRdd = sc.parallelize(array) arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[61] at parallelize at :26 scala> arrRdd.collect() res29: Array[Int] = Array(1, 2, 3, 4, 5)
3.RDD的处理
一些RDD编程API
命令 | 含义 |
---|
map() | 返回一个新的rdd,由()转换后组成 | filter() | 过滤,由()函数计算后返回值为true的元素组成 | flatMap() | 类似于map,但输入元素可以被映射,用于词频拆分 | union() | 相当于数学中集合的并集 | intersection() | 相当于数学中集合的交集 | distinct() | 去重操作后返回一个新的rdd | groupByKey() | 返回一个(l,iterator[数据类型]) 的rdd | reduceByKey() | 在一个(k,v)对的rdd上调用,返回一个新的(k,v)对rdd,用于词频统计 | sortByKey | 在一个(k,v)对的rdd上调用,第二个值为true时按从小到大排序,false为从大到小排序 | join() | 返回相同的key对应的所有元素,如(K,(V,W)) |
(1)案例1
通过并进行生成rdd
val rdd1 =List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)
val rdd2 =sc.parallelize(rdd1)
scala> val rdd1 =List(5,6,4,7,3,8,2,9,1,10) rdd1: List[Int] = List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10) scala> val rdd2 =sc.parallelize(rdd1) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :26 scala> rdd2.collect() res3: Array[Int] = Array(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)
对rdd1里的每一个元素乘2然后排序
val rdd3=rdd2.map(x=>x*2)
rdd3.collect()
scala> val rdd3=rdd2.map(x=>x*2) rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :28 scala> rdd3.collect() res4: Array[Int] = Array(10, 12, 8, 14, 6, 16, 4, 18, 2, 20)
val rdd4=rdd3.sortBy(x=>x,true)
rdd4.collect()
scala> val rdd4=rdd3.sortBy(x=>x,true) rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at sortBy at :30 scala> rdd4.collect() res5: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
(2)实例2
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
rdd1.collect()
scala> val rdd1 = sc.parallelize(Array(“a b c”, “d e f”, “h i j”)) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at :24 scala> rdd1.collect() res6: Array[String] = Array(a b c, d e f, h i j)
将rdd1里面的每一个元素先切分在压平
val rdd2 = rdd1.flatMap(x=>x.split(" "))
rdd2.collect
scala> val rdd2 = rdd1.flatMap(x=>x.split(" ")) rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at :26 scala> rdd2.collect() res7: Array[String] = Array(a, b, c, d, e, f, h, i, j)
(3)实例3
计数word.txt中单词的数量
val rdd = sc.textFile("/spark/test/word.txt") //从HDFS中加载数据创建RDD
val rdd1=rdd.flatMap(x=>x.split(" ")) //将rdd用空格分开
val rdd2=rdd1.map(x=>(x,1)) //将不同的单词(k,v)=(k,1)
rdd2.collect()
val rdd3=rdd2.groupByKey() //相同的k放到一起
rdd3.collect()
val rdd4=rdd2.reduceByKey((a,b)=>a+b) //将单词进行数量统计
rdd4.collect()
scala> val rdd = sc.textFile("/spark/test/word.txt") rdd: org.apache.spark.rdd.RDD[String] = /spark/test/word.txt MapPartitionsRDD[10] at textFile at :24 scala> val rdd1=rdd.flatMap(x=>x.split(" ")) rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at flatMap at :26 scala> val rdd2=rdd1.map(x=>(x,1)) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at :28 scala> rdd2.collect() res9: Array[(String, Int)] = Array((hello,1), (java,1), (hello,1), (hadoop,1), (hello,1), (mysql,1)) scala> val rdd3=rdd2.groupByKey() rdd3: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at :30 scala> rdd3.collect() res10: Array[(String, Iterable[Int])] = Array((hadoop,CompactBuffer(1)), (mysql,CompactBuffer(1)), (hello,CompactBuffer(1, 1, 1)), (java,CompactBuffer(1))) scala> val rdd4=rdd2.reduceByKey((a,b)=>a+b) rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at :30 scala> rdd4.collect() res11: Array[(String, Int)] = Array((hadoop,1), (mysql,1), (hello,3), (java,1))
(4)实例4
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
val rdd3 = rdd1.join(rdd2) //求join
val rdd4 = rdd1.union(rdd2) //求并集
val rdd5 = rdd4.groupByKey() //按key分组
rdd5.collect //查看
res11: Array[(String, Int)] = Array((hadoop,1), (mysql,1), (hello,3), (java,1)) scala> val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at :24 scala> val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[16] at parallelize at :24 scala> val rdd3 = rdd1.join(rdd2) rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[19] at join at :28 scala> val rdd4 = rdd1.union(rdd2) rdd4: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[20] at union at :28 scala> rdd3.collect() res12: Array[(String, (Int, Int))] = Array((tom,(1,1)), (jerry,(3,2))) scala> rdd4.collect() res13: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,1), (shuke,2)) scala> val rdd5 = rdd4.groupByKey() rdd5: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[21] at groupByKey at :30 scala> rdd5.collect() res14: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(3, 2)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))
(5)实例5
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
val rdd3 = rdd1.union(rdd2) //求并集
rdd3.collect()
val rdd4 = rdd1.intersection(rdd2) //求交集
rdd4.collect()
val rdd5 = rdd4.distinct() //去重
rdd5.collect //查看
scala> val rdd1 = sc.parallelize(List(5, 6, 4, 3)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at :24 scala> val rdd2 = sc.parallelize(List(1, 2, 3, 4)) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at :24 scala> val rdd3 = rdd1.union(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[24] at union at :28 scala> rdd3.collect() res15: Array[Int] = Array(5, 6, 4, 3, 1, 2, 3, 4) scala> val rdd4 = rdd1.intersection(rdd2) rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at intersection at :28 scala> rdd4.collect() res16: Array[Int] = Array(4, 3) scala> val rdd5 = rdd4.distinct() rdd5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at distinct at :30 scala> rdd5.collect() res17: Array[Int] = Array(4, 3)
|