IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark的RDD行动算子 -> 正文阅读

[大数据]Spark的RDD行动算子

基本概念

行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发触发作业(Job)的执行。其底层代码调用的就是runJob的方法,底层会创建ActiveJob,并提交执行。

算子介绍

1. reduce

函数定义

def reduce(f: (T, T) => T): T 

说明
聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。

2. collect

函数定义

def collect(): Array[T] 

说明
在驱动程序中,以数组Array 的形式返回数据集的所有元素 。

3. count

函数定义

def count(): Long 

说明
返回RDD 中元素的个数。

4. first

函数定义

def first(): T 

说明
返回RDD 中的第一个元素 。

5. take

函数定义

def take(num: Int): Array[T] 

说明
返回一个由RDD 的前 n 个元素组成的数组。

6. takeOrdered

函数定义

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] 

说明
返回该RDD 排序后的前 n 个元素组成的数组。

案例实操1-6

package com.atguigu.bigdata.spark.core.rdd.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Action {

  def main(args: Array[String]): Unit = {

    //TODO 准备环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1,2,3,4))


    //TODO - 行动算子
    //reduce
//    val i = rdd.reduce(_ + _)
//
//    println(i)

    //collect:方法会讲不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
//    val ints: Array[Int] = rdd.collect()
//
//    println(ints.mkString(","))

    //统计数据源中的数据的个数
    val cnt = rdd.count()

    println(cnt)
    //first:获取数据源中第一个数据
    val first = rdd.first()
    println(first)

    //take:获取N个数据
    val ints: Array[Int] = rdd.take(3)

    println(ints.mkString(","))


    val rdd1 = sc.makeRDD(List(4,2,3,1))

    //takeOrdered:排序后,获取N个数据
    val ints1: Array[Int] = rdd1.takeOrdered(3)

    println(ints1.mkString(","))

    sc.stop()

  }
}

7. aggregate

函数定义

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U 

说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。

8. fold

函数定义

def fold(zeroValue: T)(op: (T, T) => T): T 

说明
折叠操作,aggregate 的简化版操作。

案例实操7-8

package com.atguigu.bigdata.spark.core.rdd.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Action {

  def main(args: Array[String]): Unit = {

    //TODO 准备环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1,2,3,4),2)


    //TODO - 行动算子
    //aggregateByKey:初始值只会参与分区内的计算
    //aggregate:初始值会参与分区内的计算,并且参与分区间的计算
//    val result: Int = rdd.aggregate(10)(_ + _, _ + _)


    val result: Int = rdd.fold(10)(_ + _)
    println(result)

    sc.stop()

  }
}

9. countByKey

函数定义

def countByKey(): Map[K, Long] 

说明
统计每种key 的个数。

package com.atguigu.bigdata.spark.core.rdd.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Action {

  def main(args: Array[String]): Unit = {

    //TODO 准备环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1,1,3,4),2)
    val rdd1 = sc.makeRDD(List(("a",1),("a",2),("a",3)))


    //TODO - 行动算子
    val intToLong: collection.Map[Int, Long] = rdd.countByValue()
    println(intToLong)

    val stringToLong: collection.Map[String, Long] = rdd1.countByKey()

    println(stringToLong)


    sc.stop()
  }
}

案例实操

10. save相关算子

函数定义

def saveAsTextFile(path: String): Unit 
def saveAsObjectFile(path: String): Unit 
def saveAsSequenceFile( 
 path: String, 
 codec: Option[Class[_ <: CompressionCodec]] = None): Unit 

说明
将数据保存到不同格式的文件中。

案例实操

package com.atguigu.bigdata.spark.core.rdd.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Operator_Action {

  def main(args: Array[String]): Unit = {

    //TODO 准备环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)


    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))


    //TODO - 行动算子
    rdd.saveAsTextFile("output")
    rdd.saveAsObjectFile("output1")

    //saveAsSequenceFile方法要求数据的格式必须为K-V类型
    rdd.saveAsSequenceFile("output2")

    sc.stop()
  }
}

11. foreach

函数定义

def foreach(f: T => Unit): Unit = withScope { 
 val cleanF = sc.clean(f) 
 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) 
} 

说明
分布式遍历RDD 中的每一个元素,调用指定函数。

案例实操

package com.atguigu.bigdata.spark.core.rdd.action

import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Action {

  def main(args: Array[String]): Unit = {

    //TODO 准备环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1,2,3,4) )


    //foreach其实Driver端内存集合循环遍历的方法
    rdd.collect().foreach(println)

    println("*************")

    //foreach其实是Executor端内存数据打印
    rdd.foreach(println)

    //算子:Operator(算子)
    //RDD的方法和Scala集合对象的方法不一样
    //集合对象的方法都是在同一个节点的内存中完成的
    //RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
    //为了区分不同的处理效果,所以将RDD的方法称之为算子
    //RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在EXecutor端执行

    sc.stop()
  }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-18 12:46:21  更:2021-08-18 12:46:38 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:14:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码