第一关 map
sc = SparkContext("local", "Simple App")
data = [1,2,3,4,5,6]
rdd = sc.parallelize(data)
print(rdd.collect())
rdd_map = rdd.map(lambda x: x * 2)
print(rdd_map.collect())
????????输出:?
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12]
????????使用 map 算子,将rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
? ? ? ? 1.偶数转换成该数的平方;
? ? ? ? 2.奇数转换成该数的立方。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个1到5的列表List
List = [1,2,3,4,5]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(List)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
需求:
偶数转换成该数的平方
奇数转换成该数的立方
"""
# 5.使用 map 算子完成以上需求
rdd_map = rdd.map(lambda x:(x*x if (x%2==0) else x*x*x))
# 6.使用rdd.collect() 收集完成 map 转换的元素
print(rdd_map.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
第二关 mapPartitions
def f(iterator):
? ?list = []
? ?for x in iterator:
? ?list.append(x*2)
? ?return list
if __name__ == "__main__":
? ?sc = SparkContext("local", "Simple App")
? ?data = [1,2,3,4,5,6]
? ?rdd = sc.parallelize(data)
? ?print(rdd.collect())
? ?partitions = rdd.mapPartitions(f)
? ?print(partitions.collect())
????????输出:
[1, 2, 3, 4, 5, 6]
[2, 4, 6, 8, 10, 12] ?
????????使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
将字符串与该字符串的长度组合成一个元组,例如:
dog --> (dog,3)
salmon --> (salmon,6)
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
#********** Begin **********#
def f(iterator):
list = []
for x in iterator:
list.append((x,len(x)))
return list
#********** End **********#
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
data = ["dog", "salmon", "salmon", "rat", "elephant"]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
需求:
将字符串与该字符串的长度组合成一个元组,例如:
dog --> (dog,3)
salmon --> (salmon,6)
"""
# 5.使用 mapPartitions 算子完成以上需求
partitions = rdd.mapPartitions(f)
# 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
print(partitions.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
第三关 filter
sc = SparkContext("local", "Simple App")
data = [1,2,3,4,5,6]
rdd = sc.parallelize(data)
print(rdd.collect())
rdd_filter = rdd.filter(lambda x: x>2)
print(rdd_filter.collect())
? ? ? ? 输出:
[1, 2, 3, 4, 5, 6]
[3, 4, 5, 6]
?
????????使用 filter 算子,将 rdd 中的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照以下规则进行过滤,规则如下:
????????过滤掉rdd 中的所有奇数。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个1到8的列表List
data = [1,2,3,4,5,6,7,8]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:
需求:
过滤掉rdd中的奇数
"""
# 5.使用 filter 算子完成以上需求
rdd_filter = rdd.filter(lambda x:x%2==0)
# 6.使用rdd.collect() 收集完成 filter 转换的元素
print(rdd_filter.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
第四关 flatMap
sc = SparkContext("local", "Simple App")
data = [["m"], ["a", "n"]]
rdd = sc.parallelize(data)
print(rdd.collect())
flat_map = rdd.flatMap(lambda x: x)
print(flat_map.collect())
? ? ? ? 输出:
[['m'], ['a', 'n']]
['m', 'a', 'n'] ?
????????使用 flatMap 算子,将rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
????????合并RDD 的元素,例如:
????????([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
????????([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
data = [[1,2,3],[4,5,6],[7,8,9]]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素。
print(rdd.collect())
"""
使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
需求:
合并RDD的元素,例如:
([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)
([2,3],[4,5],[6]) --> (1,2,3,4,5,6)
"""
# 5.使用 filter 算子完成以上需求
flat_map = rdd.flatMap(lambda x:x)
# 6.使用rdd.collect() 收集完成 filter 转换的元素
print(flat_map.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
第五关 distinct
sc = SparkContext("local", "Simple App")
data = ["python", "python", "python", "java", "java"]
rdd = sc.parallelize(data)
print(rdd.collect())
distinct = rdd.distinct()
print(distinct.collect())
? ? ? ? 输出:
['python', 'python', 'python', 'java', 'java']
['python', 'java'] ?
????????使用 distinct 算子,将 rdd 中的数据进行去重。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
#********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
需求:
元素去重,例如:
1,2,3,3,2,1 --> 1,2,3
1,1,1,1, --> 1
"""
# 5.使用 distinct 算子完成以上需求
distinct = rdd.distinct()
# 6.使用rdd.collect() 收集完成 distinct 转换的元素
print(distinct.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
第六关 sortBy
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("c",1),("b",1)]
rdd = sc.parallelize(data)
by = rdd.sortBy(lambda x: x)
print(by.collect())
输出:
[('a', 1), ('a', 2), ('b', 1), ('c', 1)]
????????使用 sortBy 算子,将 rdd 中的数据进行排序(升序)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
需求:
元素排序,例如:
5,4,3,1,2 --> 1,2,3,4,5
"""
# 5.使用 sortBy 算子完成以上需求
by = rdd.sortBy(lambda x:x)
# 6.使用rdd.collect() 收集完成 sortBy 转换的元素
print(by.collect())
# 7.停止 SparkContext
sc.stop()
#********** End **********#
第七关 sortByKey
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("c",1),("b",1)]
rdd = sc.parallelize(data)
key = rdd.sortByKey()
print(key.collect())
输出:
[('a', 1), ('a', 2), ('b', 1), ('c', 1)]
????????使用 sortByKey 算子,将 rdd 中的数据进行排序(升序)。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
data = [('B',1),('A',2),('C',3)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
需求:
元素排序,例如:
[(3,3),(2,2),(1,1)] --> [(1,1),(2,2),(3,3)]
"""
# 5.使用 sortByKey 算子完成以上需求
key = rdd.sortByKey()
# 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
print(key.collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
第八关 mapValues
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
values = rdd.mapValues(lambda x: x + 2)
print(values.collect())
输出:
[('a', 3), ('a', 4), ('b', 3)]
????????使用mapValues 算子,将rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
????????偶数转换成该数的平方
????????奇数转换成该数的立方
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
需求:
元素(key,value)的value进行以下操作:
偶数转换成该数的平方
奇数转换成该数的立方
"""
# 5.使用 mapValues 算子完成以上需求
values = rdd.mapValues(lambda x:x*x if x%2==0 else x*x*x)
# 6.使用rdd.collect() 收集完成 mapValues 转换的元素
print(values.collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
第九关 reduceByKey
sc = SparkContext("local", "Simple App")
data = [("a",1),("a",2),("b",1)]
rdd = sc.parallelize(data)
print(rdd.reduceByKey(lambda x,y:x+y).collect())
????????输出:
[('a', 3), ('b', 1)]
????????使用 reduceByKey 算子,将 rdd(key-value类型) 中的数据进行值累加。
????????例如:
????????("soma",4), ("soma",1), ("soma",2) -> ("soma",7)
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.使用rdd.collect() 收集 rdd 的元素
print(rdd.collect())
"""
使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
需求:
元素(key-value)的value累加操作,例如:
(1,1),(1,1),(1,2) --> (1,4)
(1,1),(1,1),(2,2),(2,2) --> (1,2),(2,4)
"""
# 5.使用 reduceByKey 算子完成以上需求
red = rdd.reduceByKey(lambda x,y:x+y)
# 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
print(red.collect())
# 7.停止 SparkContext
sc.stop()
# ********** End **********#
第十关 Action常用算子
count() :返回 RDD 的元素个数
first() :返回 RDD 的第一个元素(类似于take(1) )
take(n) :返回一个由数据集的前 n 个元素组成的数组。
reduce() :通过func 函数聚集 RDD 中的所有元素,该函数应该是可交换的和关联的,以便可以并行正确计算。
collect() :在驱动程序中,以数组的形式返回数据集的所有元素。
????????使用 count 算子,统计下 rdd 中元素的个数;
????????使用 first 算子,获取 rdd 首个元素;
????????使用 take 算子,获取 rdd 前三个元素;
????????使用 reduce 算子,进行累加操作;
????????使用 collect 算子,收集所有元素。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext
if __name__ == "__main__":
# ********** Begin **********#
# 1.初始化 SparkContext,该对象是 Spark 程序的入口
sc = SparkContext("local","Simple App")
# 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
# 3.通过 SparkContext 并行化创建 rdd
rdd = sc.parallelize(data)
# 4.收集rdd的所有元素并print输出
print(rdd.collect())
# 5.统计rdd的元素个数并print输出
print(rdd.count())
# 6.获取rdd的第一个元素并print输出
print(rdd.first())
# 7.获取rdd的前3个元素并print输出
print(rdd.take(3))
# 8.聚合rdd的所有元素并print输出
print(rdd.reduce(lambda x,y:x+y))
# 9.停止 SparkContext
sc.stop()
# ********** End **********#
|