IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark-day04-依赖关系-持久化-分区器-数据结构 -> 正文阅读

[大数据]spark-day04-依赖关系-持久化-分区器-数据结构

一:依赖关系

1:依赖和血缘关系介绍

????????rdd.todebugstring:打印血缘关系

? ? ? ? rdd.dependencies:打印依赖关系

2:保存血缘关系

?3:OneToOne依赖---窄依赖

?4:shuffle依赖--宽依赖

? ? ? ? 新的RDD的一个分区的数据依赖于旧的RDD多个分区的数据,这个依赖称之为shuffle依赖。

5:窄依赖的任务

?6:宽依赖的任务

?7:任务分类

1: 一个main方法里面可能有多个行动算子,比如collect,所以会有多个job

2:一个job可能会有多个阶段,比如上图宽依赖

3:一个阶段可能会有多个task,比如上图一个阶段中的多个分区

?二:持久化

1:RDD自身并不会保存数据,重复读取对象

?2:引入持久化进行优化(文件、内存均可)

3:持久化操作必须在行动算子执行时完成的。不然没有数据,没办法进行持久化。?

4:RDD对象的持久化操作并不一定是为了重用,在数据执行较长,或数据比较重要的场合也可以采用持久化操作。

5:CheckPoint检查点

所谓的检查点,就是通过将RDD中间结果写入磁盘。

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后出现问题,可以从检查点开始重做血缘,减少了开销。

对RDD进行checkpoint操作并不会马上被执行,必须执行action操作才能触发。

6: 缓存和检查点的区别

1:cache缓存只是将数据保存起来,不切断血缘依赖。checkpoint检查点切断血缘依赖。

2:cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。checkpoint的数据通常存储在hdfs等容错、高可用的文件系统,可靠性高。

3:建议对checkpoin的rdd使用cache缓存,这样checkpoint的job只需从cache缓存中读取数据即可,否则需要再从头计算一次rdd

cache:将数据临时存储在内存中进行数据重用

? ? ? ? ? ? ? ? 会在血缘关系中添加新的依赖。一旦出现问题,可以重头读取数据。

persist:将数据临时存储在磁盘文件中进行数据重用

? ? ? ? ? ? ? ? 涉及到磁盘IO,性能较低,但是数据安全

? ? ? ? ? ? ? ? 如果作业执行完毕,临时保存的数据文件就会丢失

checkpoint:将数据长久的保存在磁盘文件中进行数据重用

? ? ? ? ? ? ? ? 涉及到磁盘IO,性能较低,但是数据安全

? ? ? ? ? ? ? ? 为了保证数据安全,所以一般情况下,会独立执行作业

? ? ? ? ? ? ? ? 为了能够提高效率,一般情况下,是需要和cache联合使用

? ? ? ? ? ? ? ? 执行过程中,会切断血缘关系,重新建立新的血缘关系。因为保存的数据比较安全,所以就是数据源的保存地址发生了改变。导致血缘关系发生改变。

三:分区器

1:自定义分区器:根据设置的规则,将同一规则的数据放在同一分区内

package com.atguigu.bigdata.spark.rdd.part

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Spark01_RDD_Part {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(
      ("nba","************"),
      ("cba","************"),
      ("wnba","************"),
      ("nba","************")
    ),3)

    val value = rdd.partitionBy(new MyPartitioner)
    value.saveAsTextFile("output")
    sc.stop()
  }

  class MyPartitioner extends Partitioner{
    //分区数量
    override def numPartitions: Int = 3

    //根据数据的key值,返回数据的分区索引(从0开始)
    override def getPartition(key: Any): Int = {
      key match {
        case "nba" => 0
        case "wnba" => 1
        case _ => 2
      }
    }
  }
}

四:文件的读取与保存

1:保存

package com.atguigu.bigdata.spark.rdd.IO

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_IO_Save {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(
      List(
        ("a",1),
        ("b",2),
        ("c",3)
      )
    )
    rdd.saveAsTextFile("output1")
    rdd.saveAsObjectFile("output2")
    rdd.saveAsSequenceFile("output3")

    sc.stop()
  }
}

2:读取

package com.atguigu.bigdata.spark.rdd.IO

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_IO_Load {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.textFile("output1")
    println(rdd.collect().mkString(","))

    val rdd1 = sc.objectFile[(String,Int)]("output2")
    println(rdd1.collect().mkString(","))

    val rdd2 = sc.sequenceFile[String,Int]("output3")
    println(rdd2.collect().mkString(","))

    sc.stop()
  }
}

五:数据结构--累加器(分布式的共享只写变量)

1:概念

累加器用来将executor端变量信息聚合到driver端。在driver程序中定义的变量,在executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回driver端进行merge

package com.atguigu.bigdata.spark.acc

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_Acc {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1,2,3,4))

    //获取系统累加器,spark默认提供了简单数据聚合的累加器
    val sumAcc = sc.longAccumulator("sum")
    rdd.foreach(
      num => {
        sumAcc.add(num)
      }
    )

    println(sumAcc.value)
    sc.stop()
  }
}

?2:累加器的少加和多加

package com.atguigu.bigdata.spark.acc

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_Acc {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1,2,3,4))

    //获取系统累加器,spark默认提供了简单数据聚合的累加器
    val sumAcc = sc.longAccumulator("sum")
    val mapRDD = rdd.map(
      num => {
        sumAcc.add(num)
        num
      }
    )

    //少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
    mapRDD.collect()
    mapRDD.collect()
    //多加:多次执行
    println(sumAcc.value)
    sc.stop()
  }
}

3:自定义累加器

package com.atguigu.bigdata.spark.acc

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

object Spark03_Acc {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List("hello","spark","hello"))

    //累加器:word count
    //创建累加器对象
    val wcAcc = new MyAccumulator()
    //向spark进行注册
    sc.register(wcAcc,"wordCountAcc")
    rdd.foreach(
      word => {
        //数据的累加(使用累加器)
        wcAcc.add(word)
      }
    )
    println(wcAcc.value)
    sc.stop()
  }

  /*
 自定义累加器
 1.继承:AccumulatorV2 定义泛型
    IN:累加器输入的数据类型
    OUT:累加器返回的数据类型

    2.重写方法
   */
  class MyAccumulator extends AccumulatorV2[String,Map[String,Long]] {

    private var wcMap = Map[String,Long]()

    //判断是否初始状态
    override def isZero: Boolean = {
      wcMap.isEmpty
    }

    override def copy(): AccumulatorV2[String, Map[String, Long]] = {
      new MyAccumulator()
    }

    override def reset(): Unit = {
      wcMap.clear()
    }

    //获取累加器需要计算的值
    override def add(word: String): Unit = {
      val newCnt = wcMap.getOrElse(word,0L) + 1
      wcMap.updated(word,newCnt)
    }

    //driver合并多个累加器
    override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {
      val map1 = this.wcMap
      val map2 = other.value
      map2.foreach{
        case (word,count) => {
          val newCount = map1.getOrElse(word,0L) + count
          map1.updated(word,newCount)
        }
      }
    }

    //累加器结果
    override def value: Map[String, Long] = {
      wcMap
    }
  }

}

六:广播变量

? ? ? ? Task的量,是由driver的分区数决定的,和executor的个数无关

? ? ? ? 转换为

? ? ? ?只能访问不能修改

? ? ? ? spark中的广播变量就可以将闭包的数据保存到executor的内存中,不能进行更改。

package com.atguigu.bigdata.spark.acc

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

object Spark04_Bc {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd1 = sc.makeRDD(List(
      ("a",1),("b",2),("c",3)
    ))

    /*val rdd2 = sc.makeRDD(List(
      ("a",4),("b",5),("c",6)
    ))

    //join会导致数据量几何增长,并且会影响shuffle大的性能,不推荐使用
    val value:RDD[(String,(Int,Int))] = rdd1.join(rdd2)

    value.collect().foreach(println)*/

    /*val map = mutable.Map(("a",4),("b",5),("c",6))
    rdd1.map{
      case (w,c) => {
        val l:Int = map.getOrElse(w,0)
        (w,(c,l))
      }
    }.collect().foreach(println)*/

    val map = mutable.Map(("a",4),("b",5),("c",6))
    //封装广播变量
    val bc:Broadcast[mutable.Map[String,Int]] = sc.broadcast(map)
    rdd1.map{
      case (w,c) => {
        //访问广播变量
        val l:Int = bc.value.getOrElse(w,0)
        (w,(c,l))
      }
    }.collect().foreach(println)

    sc.stop()
  }

}

?

? ? ? ??

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-16 13:11:15  更:2022-02-16 13:11:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 11:58:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码