一、 RDD基本概念
RDD——分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错,位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度
二、运行spark-shell命令
执行spark-shell命令就可以进入Spark-Shell交互式环境
[root@hadoop1 spark-2.1.0-bin-hadoop2.4]# spark-shell
?三、Spark Rdd简单操作
1.从文件系统加载数据创建RDD----textFile
(1)从Linux本地文件系统加载数据创建RDD
scala> val rdd = sc.textFile("file:///root/word.txt")
scala> rdd.collect()
(2)从HDFS中加载数据创建RDD?
在主机hadoop1中创建文件目录,并将word.txt传进去
[root@hadoop1 ~]# hadoop fs -mkdir -p /spark/test
[root@hadoop1 ~]# hadoop fs -put word.txt /spark/test
在HDFS上的/spark/test的目录下有个word.txt文件,通过加载HDFS的数据创建RDD
scala> val rdd = sc.textFile("/spark/test/word.txt")
scala> rdd.collect()
执行上述代码后,从返回结果中看出RDD创建完成,其中参数也可以是? hdfs://hadoop1:9000/spark/test/word.txt? ? 最终效果是一致的
2.通过并行集合创建RDD----parallelize
从一个已经存在的集合、数组,通过sparkContext对象调用parallelize方法创建RDD
scala> val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val arrRdd = sc.parallelize(array)
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> arrRdd.collect()
res0: Array[Int] = Array(1, 2, 3, 4, 5)
由? .collect()? 方法已知创建成功
3. RDD编程API
Transformation(转换) RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
常用的Transformation:
命令 | ?含义 | map() | 返回一个新的rdd,由()转换后组成 | filter() | 过滤,由()函数计算后返回值为true的元素组成 | flatMap() | 类似于map,但输入元素可以被映射,用于词频拆分 | union() | 相当于数学中集合的并集 | intersection() | 相当于数学中集合的交集 | distinct() | 去重操作后返回一个新的rdd | groupByKey() | 返回一个(l,iterator[数据类型]) 的rdd | reduceByKey() | 在一个(k,v)对的rdd上调用,返回一个新的(k,v)对rdd,用于词频统计 | sortByKey | 在一个(k,v)对的rdd上调用,第二个值为true时按从小到大排序,false为从大到小排序 | join() | 返回相同的key对应的所有元素,如(K,(V,W)) |
4.练习
练习1:
//通过并行化生成rdd
scala> val list = List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)
list: List[Int] = List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)
scala> val rdd = sc.parallelize(list)
//对rdd1里的每一个元素乘2
scala> val rdd1 = rdd.map(x=>x*2)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:28
//然后排序
scala> val rdd2 = rdd1.sortBy(x=>x,true)----正序
scala> rdd2.collect()
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
scala> val rdd3 = rdd1.sortBy(x=>x,false)----倒序
scala> rdd3.collect()
res3: Array[Int] = Array(20, 18, 16, 14, 12, 10, 8, 6, 4, 2)
//过滤出大于等于十的元素
scala> val rdd4 = rdd2.filter(x=>x>=10)
//将元素以数组的方式在客户端显示
scala> rdd4.collect()
res4: Array[Int] = Array(10, 12, 14, 16, 18, 20)
练习2:
//拆分arr每一个元素
scala> val arr = Array("a b c", "d e f", "h i j")
scala> val rdd5 = sc.parallelize(arr)
scala> val rdd6 = rdd5.map(x=>x.split(" "))
scala> val rdd7 = rdd5.flatMap(x=>x.split(" "))
//查看
scala> rdd6.collect()
res6: Array[Array[String]] = Array(Array(a, b, c), Array(d, e, f), Array(h, i, j))
scala> rdd7.collect()
res7: Array[String] = Array(a, b, c, d, e, f, h, i, j)
练习3:
scala> val list1 = List(5, 6, 4, 3)
scala> val list2 = List(1, 2, 3, 4)
scala> val rdd1 = sc.parallelize(list1)
scala> val rdd2 = sc.parallelize(list2)
//求并集
scala> val rdd3 = rdd1.union(rdd2)
scala> rdd3.collect()
res14: Array[Int] = Array(5, 6, 4, 3, 1, 2, 3, 4)
//求交集
scala> val rdd4 = rdd1.intersection(rdd2)
scala> rdd4.collect()
res15: Array[Int] = Array(4, 3)
//去重
scala> val rdd5 = rdd4.distinct()
//查看
scala> rdd5.collect()
res16: Array[Int] = Array(4, 6, 2, 1, 3, 5)
练习4:
scala> val wordRdd = sc.textFile("/spark/test/word.txt")
scala> wordRdd.collect()
res8: Array[String] = Array(hello java, hello hadoop, java mysql, hello hadoop)
//拆分元素
scala> val rdd8 = wordRdd.flatMap(x=>x.split(" "))
scala> rdd8.collect()
res9: Array[String] = Array(hello, java, hello, hadoop, java, mysql, hello, hadoop)
//按(key,1)做相同操作
scala> val rdd9 = rdd8.map(x=>(x,1))
scala> rdd9.collect()
res10: Array[(String, Int)] = Array((hello,1), (java,1), (hello,1), (hadoop,1), (java,1), (mysql,1), (hello,1), (hadoop,1))
//分组
scala> val rdd10 = rdd9.groupByKey()
scala> rdd10.collect()
res11: Array[(String, Iterable[Int])] = Array((hadoop,CompactBuffer(1, 1)), (mysql,CompactBuffer(1)), (hello,CompactBuffer(1, 1, 1)), (java,CompactBuffer(1, 1)))
//词频统计
scala> val rdd11 = rdd9.reduceByKey((a,b)=>a+b)
scala> rdd11.collect()
res12: Array[(String, Int)] = Array((hadoop,2), (mysql,1), (hello,3), (java,2))
//按字母排序返回一个(K,V)的RDD
scala> val rdd12 = rdd9.sortByKey()
scala> rdd12.collect()
res13: Array[(String, Int)] = Array((hadoop,1), (hadoop,1), (hello,1), (hello,1), (hello,1), (java,1), (java,1), (mysql,1))
练习5:
scala> val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
scala> val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
scala> val rdd3 = rdd1.join(rdd2)
scala> rdd3.collect()
res17: Array[(String, (Int, Int))] = Array((tom,(1,1)), (jerry,(3,2)))
//求并集
scala> val rdd4 = rdd1.union(rdd2)
scala> rdd4.collect()
res18: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,1), (shuke,2))
//按key进行分组
scala> val rdd5 = rdd4.groupByKey()
rdd5: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[41] at groupByKey at <console>:30
//查看
scala> rdd5.collect()
res19: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(3, 2)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))
|