IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark(三) -> 正文阅读

[大数据]spark(三)

一、RDD的 Action 操作

1、reduce(func)

通过func函数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。

2、collect

以数组的形式返回 RDD 中的所有元素.
所有的数据都会被拉到 driver 端, 所以要慎用

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Action {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        val rdd = sc.makeRDD(List(1,2,3,4))

        // TODO - 行动算子
        // 所谓的行动算子,其实就是触发作业(Job)执行的方法
        // 底层代码调用的是环境对象的runJob方法
        // 底层代码中会创建ActiveJob,并提交执行。
        rdd.collect()

        sc.stop()

    }
}

3、count()

返回 RDD 中元素的个数

4、take(n)

返回 RDD 中前 n 个元素组成的数组.
take 的数据也会拉到 driver 端, 应该只对小数据集使用

5、first

返回 RDD 中的第一个元素. 类似于take(1).

6、takeOrdered(n, [ordering])

返回排序后的前 n 个元素, 默认是升序排列
数据也会拉到 driver 端

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Action {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        val rdd = sc.makeRDD(List(1,2,3,4))

        // TODO - 行动算子

        // reduce
        //val i: Int = rdd.reduce(_+_)
        //println(i)

        // collect : 方法会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
        //val ints: Array[Int] = rdd.collect()
        //println(ints.mkString(","))

        // count : 数据源中数据的个数
        val cnt = rdd.count()
        println(cnt)

        // first : 获取数据源中数据的第一个
        val first = rdd.first()
        println(first)

        // take : 获取N个数据
        val ints: Array[Int] = rdd.take(3)
        println(ints.mkString(","))

        // takeOrdered : 数据排序后,取N个数据
        val rdd1 = sc.makeRDD(List(4,2,3,1))
        val ints1: Array[Int] = rdd1.takeOrdered(3)
        println(ints1.mkString(","))

        sc.stop()

    }
}

7、aggregate

aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致

8、 fold

折叠操作,aggregate的简化操作,seqop和combop一样的时候,可以使用fold

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Action {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        val rdd = sc.makeRDD(List(1,2,3,4),2)

        // TODO - 行动算子

        //10 + 13 + 17 = 40
        // aggregateByKey : 初始值只会参与分区内计算
        // aggregate : 初始值会参与分区内计算,并且和参与分区间计算
        //val result = rdd.aggregate(10)(_+_, _+_)
        val result = rdd.fold(10)(_+_)

        println(result)

        sc.stop()

    }
}

9、 saveAsTextFile(path)

作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用toString方法,将它装换为文件中的文本

10、saveAsSequenceFile(path)

作用:将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统

11、saveAsObjectFile(path)

作用:用于将 RDD 中的元素序列化成对象,存储到文件中。

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Operator_Action {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        //val rdd = sc.makeRDD(List(1,1,1,4),2)
        val rdd = sc.makeRDD(List(
            ("a", 1),("a", 2),("a", 3)
        ))

        // TODO - 行动算子
        rdd.saveAsTextFile("output")
        rdd.saveAsObjectFile("output1")
        // saveAsSequenceFile方法要求数据的格式必须为K-V类型
        rdd.saveAsSequenceFile("output2")

        sc.stop()

    }
}

12、countByKey()

作用:针对(K,V)类型的 RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
应用: 可以用来查看数据是否倾斜

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Action {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        //val rdd = sc.makeRDD(List(1,1,1,4),2)
        val rdd = sc.makeRDD(List(
            ("a", 1),("a", 2),("a", 3)
        ))

        // TODO - 行动算子

       //val intToLong: collection.Map[Int, Long] = rdd.countByValue()
        //println(intToLong)
        val stringToLong: collection.Map[String, Long] = rdd.countByKey()
        println(stringToLong)

        sc.stop()

    }
}

13、foreach(func)

作用: 针对 RDD 中的每个元素都执行一次func
每个函数是在 Executor 上执行的, 不是在 driver 端执行的.

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Action {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        val rdd = sc.makeRDD(List(1,2,3,4))

        // foreach 其实是Driver端内存集合的循环遍历方法
        rdd.collect().foreach(println)
        println("******************")
        // foreach 其实是Executor端内存数据打印
        rdd.foreach(println)

        // 算子 : Operator(操作)
        //         RDD的方法和Scala集合对象的方法不一样
        //         集合对象的方法都是在同一个节点的内存中完成的。
        //         RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
        //         为了区分不同的处理效果,所以将RDD的方法称之为算子。
        //        RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。

        sc.stop()

    }
}

package com.atguigu.bigdata.spark.core.rdd.operator.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Action {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        val rdd = sc.makeRDD(List[Int]())

        val user = new User()

        // SparkException: Task not serializable
        // NotSerializableException: com.atguigu.bigdata.spark.core.rdd.operator.action.Spark07_RDD_Operator_Action$User

        // RDD算子中传递的函数是会包含闭包操作,那么就会进行检测功能
        // 闭包检测
        rdd.foreach(
            num => {
                println("age = " + (user.age + num))
            }
        )

        sc.stop()

    }
    //class User extends Serializable {
    // 样例类在编译时,会自动混入序列化特质(实现可序列化接口)
    //case class User() {
    class User {
        var age : Int = 30
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-15 15:56:05  更:2021-11-15 15:58:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 21:51:39-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码