#-*-coding:UTF-8-*-
from pyspark import SparkConf,SparkContext
import time
class Spark05Marager(object):
def __init__(self,master,app_name):
'''
构造函数
:param master: master
:param app_name: appname
'''
self.__master=master
self.__app_Name=app_name
self.__sc=self.__createSc()
def __createSc(self):
'''
创建连接上下文
:return: 返回连接上下文
'''
try:
conf=SparkConf().setMaster(self.__master).setAppName(self.__app_Name)
sc=SparkContext(conf=conf)
return sc
except Exception as e:
return None
################################以下是学习算子函数##################################################################
def my_map(self):
'''
map 将一个集合转换成新集合
:return:
'''
data=[1,2,3,4,5,6,7,8,9]
rdd1=self.__sc.parallelize(data)
rdd2=rdd1.map(lambda x:x+1)
print(rdd2.collect())
time.sleep(30)
def my_map(self):
'''
将原有集合转换成新的形式
:return:
'''
data=['dog','cat','lion','tiger']
rdd1=self.__sc.parallelize(data)
rdd2=rdd1.map(lambda x:(x,1))
print(rdd2.collect())
def my_filter(self):
'''
filter 算子过滤偶数
:return:
'''
data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
rdd1=self.__sc.parallelize(data)
rdd2=rdd1.filter(lambda x:x%2==0)
print(rdd2.collect())
def my_flatMap(self):
#flatMap
data=["Hello Word","Hello Spark","woelect hadoop"]
rdd1=self.__sc.parallelize(data)
rdd2=rdd1.flatMap(lambda str:str.split(" "))
print(rdd2.collect())
def my_groupBykey(self):
#groupByKey
data = ["Hello Word", "Hello Spark", "woelect hadoop"]
rdd1 = self.__sc.parallelize(data)
mapRdd = rdd1.flatMap(lambda str: str.split(" ")).map(lambda x:(x,1))
rdd2=mapRdd.groupByKey().map(lambda x:{x[0]:list(x[1])})
cont_rdd=mapRdd.groupByKey().map(lambda x:{x[0]:sum(list(x[1]))})
print(rdd2.collect())
print(cont_rdd.collect())
def my_reduceByKey(self):
# reduceByKey
data = ["Hello Word", "Hello Spark", "woelect hadoop"]
rdd1 = self.__sc.parallelize(data)
mapRdd = rdd1.flatMap(lambda str: str.split(" ")).map(lambda x: (x, 1))
print(mapRdd.collect())
reduceRdd=mapRdd.reduceByKey(lambda a,b:a+b)
print(reduceRdd.collect())
def wc_pricate(self):
#按照单词数量从大到小排序 sortByKey
data = ["Hello Word", "Hello Spark", "woelect hadoop","woelect C#"]
rdd1 = self.__sc.parallelize(data)
mapRdd = rdd1.flatMap(lambda str: str.split(" ")).map(lambda x: (x, 1))
print(mapRdd.collect())
reduceRdd = mapRdd.reduceByKey(lambda a, b: a + b)
print(reduceRdd.collect())
reducemaprdd= reduceRdd.map(lambda x:(x[1],x[0]) ).sortByKey(False)
print(reducemaprdd.collect())
reducemaprdd2 = reducemaprdd.map(lambda x: (x[1], x[0]))
print(reducemaprdd2.collect())
def test():
master="local[2]"
app_name="myspark05"
sparmgr = Spark05Marager(master,app_name)
#sparmgr.my_map()
#sparmgr.my_map()
#sparmgr.my_filter()
#sparmgr.my_flatMap()
#sparmgr.my_groupBykey()
#sparmgr.my_reduceByKey()
#sparmgr.wc_pricate()
sparmgr.wc_pricate()
if __name__ =="__main__":
test()
|