IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 5.6 Spark算子 -> 正文阅读

[大数据]5.6 Spark算子

第一关 map

  • 案例

sc = SparkContext("local", "Simple App")

data = [1,2,3,4,5,6]

rdd = sc.parallelize(data)

print(rdd.collect())

rdd_map = rdd.map(lambda x: x * 2)

print(rdd_map.collect())

????????输出:?

[1, 2, 3, 4, 5, 6]

[2, 4, 6, 8, 10, 12]

  • 编程要求

????????使用 map 算子,将rdd的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:

? ? ? ? 1.偶数转换成该数的平方;

? ? ? ? 2.奇数转换成该数的立方。

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个1到5的列表List
    List = [1,2,3,4,5]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(List)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 map 算子,将 rdd 的数据 (1, 2, 3, 4, 5) 按照下面的规则进行转换操作,规则如下:
    需求:
        偶数转换成该数的平方
        奇数转换成该数的立方
    """

    # 5.使用 map 算子完成以上需求
    rdd_map = rdd.map(lambda x:(x*x if (x%2==0) else x*x*x))
    # 6.使用rdd.collect() 收集完成 map 转换的元素
    print(rdd_map.collect())
    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第二关 mapPartitions

  • 案例

def f(iterator):

? ?list = []

? ?for x in iterator:

? ?list.append(x*2)

? ?return list

if __name__ == "__main__":

? ?sc = SparkContext("local", "Simple App")

? ?data = [1,2,3,4,5,6]

? ?rdd = sc.parallelize(data)

? ?print(rdd.collect())

? ?partitions = rdd.mapPartitions(f)

? ?print(partitions.collect())

????????输出:

[1, 2, 3, 4, 5, 6]

[2, 4, 6, 8, 10, 12]?

  • 编程要求

????????使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:

将字符串与该字符串的长度组合成一个元组,例如:

dog --> (dog,3)

salmon --> (salmon,6)

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

#********** Begin **********#
def f(iterator):
    list = []
    for x in iterator:
        list.append((x,len(x)))
    return list
#********** End **********#

if __name__ == "__main__":
    #********** Begin **********#
    
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2. 一个内容为("dog", "salmon", "salmon", "rat", "elephant")的列表List
    data = ["dog", "salmon", "salmon", "rat", "elephant"]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 mapPartitions 算子,将 rdd 的数据 ("dog", "salmon", "salmon", "rat", "elephant") 按照下面的规则进行转换操作,规则如下:
    需求:
        将字符串与该字符串的长度组合成一个元组,例如:
        dog  -->  (dog,3)
        salmon   -->  (salmon,6)
    """

    # 5.使用 mapPartitions 算子完成以上需求
    partitions = rdd.mapPartitions(f)
    # 6.使用rdd.collect() 收集完成 mapPartitions 转换的元素
    print(partitions.collect())
    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第三关 filter

  • 案例

sc = SparkContext("local", "Simple App")

data = [1,2,3,4,5,6]

rdd = sc.parallelize(data)

print(rdd.collect())

rdd_filter = rdd.filter(lambda x: x>2)

print(rdd_filter.collect())

? ? ? ? 输出:

[1, 2, 3, 4, 5, 6]

[3, 4, 5, 6]

?

  • 编程要求

????????使用 filter 算子,将 rdd 中的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照以下规则进行过滤,规则如下:

????????过滤掉rdd中的所有奇数。

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个1到8的列表List
    data = [1,2,3,4,5,6,7,8]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
    使用 filter 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 7, 8) 按照下面的规则进行转换操作,规则如下:
    需求:
        过滤掉rdd中的奇数
    """
    # 5.使用 filter 算子完成以上需求
    rdd_filter = rdd.filter(lambda x:x%2==0)
    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(rdd_filter.collect())
    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第四关 flatMap

  • 案例

sc = SparkContext("local", "Simple App")

data = [["m"], ["a", "n"]]

rdd = sc.parallelize(data)

print(rdd.collect())

flat_map = rdd.flatMap(lambda x: x)

print(flat_map.collect())

? ? ? ? 输出:

[['m'], ['a', 'n']]

['m', 'a', 'n']?

  • 编程要求

????????使用 flatMap 算子,将rdd的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:

????????合并RDD的元素,例如:

????????([1,2,3],[4,5,6]) --> (1,2,3,4,5,6)

????????([2,3],[4,5],[6]) --> (1,2,3,4,5,6)

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
       #********** Begin **********#
       
    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个[[1, 2, 3], [4, 5, 6], [7, 8, 9]] 的列表List
    data = [[1,2,3],[4,5,6],[7,8,9]]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素。
    print(rdd.collect())

    """
        使用 flatMap 算子,将 rdd 的数据 ([1, 2, 3], [4, 5, 6], [7, 8, 9]) 按照下面的规则进行转换操作,规则如下:
        需求:
            合并RDD的元素,例如:
                            ([1,2,3],[4,5,6])  -->  (1,2,3,4,5,6)
                            ([2,3],[4,5],[6])  -->  (1,2,3,4,5,6)
    """

    # 5.使用 filter 算子完成以上需求
    flat_map = rdd.flatMap(lambda x:x)
    # 6.使用rdd.collect() 收集完成 filter 转换的元素
    print(flat_map.collect())
    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第五关 distinct

  • 案例

sc = SparkContext("local", "Simple App")

data = ["python", "python", "python", "java", "java"]

rdd = sc.parallelize(data)

print(rdd.collect())

distinct = rdd.distinct()

print(distinct.collect())

? ? ? ? 输出:

['python', 'python', 'python', 'java', 'java']

['python', 'java']?

  • 编程要求

????????使用 distinct 算子,将 rdd 中的数据进行去重。

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    #********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个内容为(1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1)的列表List
    data = [1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
       使用 distinct 算子,将 rdd 的数据 (1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素去重,例如:
                        1,2,3,3,2,1  --> 1,2,3
                        1,1,1,1,     --> 1
    """
    # 5.使用 distinct 算子完成以上需求
    distinct = rdd.distinct()
    # 6.使用rdd.collect() 收集完成 distinct 转换的元素
    print(distinct.collect())
    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第六关 sortBy

  • 案例

sc = SparkContext("local", "Simple App")

data = [("a",1),("a",2),("c",1),("b",1)]

rdd = sc.parallelize(data)

by = rdd.sortBy(lambda x: x)

print(by.collect())

输出:

[('a', 1), ('a', 2), ('b', 1), ('c', 1)]

  • 编程要求

????????使用 sortBy 算子,将 rdd 中的数据进行排序(升序)。

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个内容为(1, 3, 5, 7, 9, 8, 6, 4, 2)的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())


    """
       使用 sortBy 算子,将 rdd 的数据 (1, 3, 5, 7, 9, 8, 6, 4, 2) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            5,4,3,1,2  --> 1,2,3,4,5
    """

    # 5.使用 sortBy 算子完成以上需求
    by = rdd.sortBy(lambda x:x)
    # 6.使用rdd.collect() 收集完成 sortBy 转换的元素
    print(by.collect())
    # 7.停止 SparkContext
    sc.stop()

    #********** End **********#

第七关 sortByKey

  • 案例

sc = SparkContext("local", "Simple App")

data = [("a",1),("a",2),("c",1),("b",1)]

rdd = sc.parallelize(data)

key = rdd.sortByKey()

print(key.collect())

输出:

[('a', 1), ('a', 2), ('b', 1), ('c', 1)]

  • 编程要求

????????使用 sortByKey 算子,将 rdd 中的数据进行排序(升序)。

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个内容为[(B',1),('A',2),('C',3)]的列表List
    data = [('B',1),('A',2),('C',3)]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
       使用 sortByKey 算子,将 rdd 的数据 ('B', 1), ('A', 2), ('C', 3) 按照下面的规则进行转换操作,规则如下:
       需求:
           元素排序,例如:
            [(3,3),(2,2),(1,1)]  -->  [(1,1),(2,2),(3,3)]
    """

    # 5.使用 sortByKey 算子完成以上需求
    key = rdd.sortByKey()
    # 6.使用rdd.collect() 收集完成 sortByKey 转换的元素
    print(key.collect())
    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

第八关 mapValues

  • 案例

sc = SparkContext("local", "Simple App")

data = [("a",1),("a",2),("b",1)]

rdd = sc.parallelize(data)

values = rdd.mapValues(lambda x: x + 2)

print(values.collect())

输出:

[('a', 3), ('a', 4), ('b', 3)]

  • 编程要求

????????使用mapValues算子,将rdd的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:

????????偶数转换成该数的平方

????????奇数转换成该数的立方

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个内容为[("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]的列表List
    data = [("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5)]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
           使用 mapValues 算子,将 rdd 的数据 ("1", 1), ("2", 2), ("3", 3), ("4", 4), ("5", 5) 按照下面的规则进行转换操作,规则如下:
           需求:
               元素(key,value)的value进行以下操作:
                                                偶数转换成该数的平方
                                                奇数转换成该数的立方
    """

    # 5.使用 mapValues 算子完成以上需求
    values = rdd.mapValues(lambda x:x*x if x%2==0 else x*x*x)
    # 6.使用rdd.collect() 收集完成 mapValues 转换的元素
    print(values.collect())
    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

第九关 reduceByKey

  • 案例

sc = SparkContext("local", "Simple App")

data = [("a",1),("a",2),("b",1)]

rdd = sc.parallelize(data)

print(rdd.reduceByKey(lambda x,y:x+y).collect())

????????输出:

[('a', 3), ('b', 1)]

  • 编程要求

????????使用 reduceByKey 算子,将 rdd(key-value类型) 中的数据进行值累加。

????????例如:

????????("soma",4), ("soma",1), ("soma",2) -> ("soma",7)

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个内容为[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]的列表List
    data = [("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.使用rdd.collect() 收集 rdd 的元素
    print(rdd.collect())

    """
          使用 reduceByKey 算子,将 rdd 的数据[("python", 1), ("scala", 2), ("python", 3), ("python", 4), ("java", 5)] 按照下面的规则进行转换操作,规则如下:
          需求:
              元素(key-value)的value累加操作,例如:
                                                (1,1),(1,1),(1,2)  --> (1,4)
                                                (1,1),(1,1),(2,2),(2,2)  --> (1,2),(2,4)
    """

    # 5.使用 reduceByKey 算子完成以上需求
    red = rdd.reduceByKey(lambda x,y:x+y)
    # 6.使用rdd.collect() 收集完成 reduceByKey 转换的元素
    print(red.collect())
    # 7.停止 SparkContext
    sc.stop()

    # ********** End **********#

第十关 Action常用算子

count():返回 RDD 的元素个数

first():返回 RDD 的第一个元素(类似于take(1)

take(n):返回一个由数据集的前 n 个元素组成的数组。

reduce():通过func函数聚集 RDD 中的所有元素,该函数应该是可交换的和关联的,以便可以并行正确计算。

collect():在驱动程序中,以数组的形式返回数据集的所有元素。

  • 编程要求

????????使用 count 算子,统计下 rdd 中元素的个数;

????????使用 first 算子,获取 rdd 首个元素;

????????使用 take 算子,获取 rdd 前三个元素;

????????使用 reduce 算子,进行累加操作;

????????使用 collect 算子,收集所有元素。

  • 代码实现
# -*- coding: UTF-8 -*-
from pyspark import SparkContext

if __name__ == "__main__":
    # ********** Begin **********#

    # 1.初始化 SparkContext,该对象是 Spark 程序的入口
    sc = SparkContext("local","Simple App")
    # 2.创建一个内容为[1, 3, 5, 7, 9, 8, 6, 4, 2]的列表List
    data = [1, 3, 5, 7, 9, 8, 6, 4, 2]
    # 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
    # 4.收集rdd的所有元素并print输出
    print(rdd.collect())
    # 5.统计rdd的元素个数并print输出
    print(rdd.count())
    # 6.获取rdd的第一个元素并print输出
    print(rdd.first())
    # 7.获取rdd的前3个元素并print输出
    print(rdd.take(3))
    # 8.聚合rdd的所有元素并print输出
    print(rdd.reduce(lambda x,y:x+y))
    # 9.停止 SparkContext
    sc.stop()

    # ********** End **********#
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-03 16:10:38  更:2022-01-03 16:10:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 4:04:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码