一、运行spark-shell命令
? ? ? ? 进入spark-2.1.0-bin-hadoop2.4目录下 输入spark-shell命令
?二、Spark Rdd简单操作
? ? ? ? 1.在Linux本地文件加载数据 创建Rdd
collect() | 以数组的形式返回数据集的所有元素 |
val rdd = sc.textFile("file:///root/word.txt")
rdd.collect()
????????2.rdd中简单的运算和排序?
map() | 返回一个新的rdd,rdd由每一个输入元素经过函数转换后组成。 | sortBy() | 返回一个新的rdd,? ?rdd由经过排序后组成 | filter() | 返回一个新的rdd,rdd由经过函数计算后返回值为true的输入元素组成。 | collect() | 以数组的形式返回数据集的所有元素 |
val rdd1 = sc.parallelize(List(5, 4, 6, 7, 3, 2, 8, 9, 1))
//rdd1中的每个元素乘2
val rdd2 = rdd1.map(x => *2)
//对rdd2中的每个元素进行升序(降序)
val rdd3 = rdd2.sortBy(x => x, true(false))
//过滤在rdd3中大于10的元素
val rdd4 = rdd3.filter(x => x >= 10)
将元素以数组的方式显示
rdd4.collect()
? ? ? ? 3.rdd中元素的切分并按照放入同一个数组
flatMap() | 类似于map,但输入元素可以被映射,用于词频拆分 |
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
val rdd2 = rdd1.flatMap(x => x.split(' '))
rdd2.collect()
????????4.rdd中求交集、并集以及去重操作
union() | 对第一个rdd和第二个rdd求并集后返回一个新的rdd | intersection() | 第一个rdd和第二个rdd求交集后返回一个新的rdd | distinct() | 将rdd中的数据进行去重后返回一个新的rdd | collect() | 以数组的形式返回数据集的所有元素 |
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
val rdd3 = rdd1.union(rdd2)
val rdd4 = rdd1.intersection(rdd2)
rdd3.distinct.collect()
rdd4.collect()
? ? ? ? 5.rdd中连接操作
join() | 返回相同的key对应的所有元素 | collect() | 以数组的形式返回数据集的所有元素 |
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//将rdd1和rdd2连接
val rdd3 = rdd1.join(rdd2)
rdd3.collect()
? ? ? ? 6.map,reduce相似操作
groupByKey() | 返回一个(l,iterator[数据类型]) 的rdd | reduceByKey() | 在一个(k,v)对的rdd上调用,返回一个新的(k,v)对rdd,用于统计 | collect() | 以数组的形式返回数据集的所有元素 |
val rdd = sc.parallelize(Array("hello java, hello mysql, hello hadoop"))
val rdd2 = rdd.flatMap(x => x.split(" "))
val rdd3 = rdd2.map(x => (x,1))
val rdd4 = rdd3.groupByKey()
val rdd5 = rdd3.reduceByKey((a,b) => a+b))
rdd5.collect()
三、Rdd编程API
|