IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark简述二 -> 正文阅读

[大数据]Spark简述二

将程序打成jar包上传任务


import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo2WordCountSubmit {
  /**
    * 1.去除setMaster("local")
    * 2.修改文件的输入输出路径(提交到集群默认是从HDFS获取数据,需要修改成HDFS的路径)
    * 3.在HDFS中创建目录
    * hdfs dfs -mkdir -p /spark/data/works
    * 4.将数据上传至HDFS
    * hdfs dfs -put words.txt /spark/data/works
    * 5.将程序打成jar包
    * 6.将jar包上传至虚拟机,然后通过spark-submit提交(两种提交方式,一种本地打印日志,一种本地不打印)
    * spark-submit --class Demo2WordCountSubmit --master yarn-client /opt/modules/hadoop-2.7.6/share/hadoop/mapreduce/Spark-1.0-SNAPSHOT.jar //这里client模式,本地会有反馈
    * spark-submit --class Demo2WordCountSubmit --master yarn-cluster /opt/modules/hadoop-2.7.6/share/hadoop/mapreduce/Spark-1.0-SNAPSHOT.jar //这里cluster模式,提交上去就行了,就不用管了
    *
    */

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("Demo2WordCountSubmit")
    val sc = new SparkContext()

    val linesRDD: RDD[String] = sc.textFile("/spark/data/works/WordCount.txt")

    val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(","))

    val wordRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy(word=>word)

    val countRDD: RDD[(String, Int)] = wordRDD.map(
      word => {
        val id: String = word._1
        val words: Iterable[String] = word._2
        val size: Int = words.size
        (id, size)
      }
    )

    //使用javaApi判断输出路径是否存在,存在即删除
    val hdfsconf: Configuration = new Configuration()
    hdfsconf.set("fs.defaultFS","hdfs://master:9000")
    val fs: FileSystem = FileSystem.get(hdfsconf)
    //判断输出路径是否存在
    if(fs.exists(new Path("/spark/data/wordCount"))){
      fs.delete(new Path("/spark/data/wordCount"),true)
    }

    countRDD.saveAsTextFile("/spark/data/wordCount")

  }
}

On Yarn两种模式对比(client模式和cluster模式)

client模式(本地有反馈)运行过程中,给它杀掉进程,发现直接就挂掉了
cluster模式(本地无反馈)运行过程中,给它杀掉进程,程序会照常运行(只要集群还在,就会跑完)

这地方的节点可能会被很多人用,很多人提交到同一个节点,可能会造成网卡风暴
Yarn-client
在这里插入图片描述
在这里插入图片描述

本地不启动Driver程序
这里的Driver端不再是本机了,而是一台nodemanager上的applicationmaster
这里的任务就不知道在哪打印了,因为这里是任意一台nodemanager上的任务
Yarn-cluster
在这里插入图片描述

将每条数据写到MySQL,对比每种方式的不同(为什么有了foreach还需要有foreachPartition)

一:
我们按照原本的想法,获取每一条数据,之后一条条存入MySQL中,发现会出现一个错误

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo5MapPartition")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")

    Class.forName("com.mysql.jdbc.Driver")

    val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
    val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")

    stuRDD.foreach(line=>{
      val strings: Array[String] = line.split(",")
      val id: Int = strings(0).toInt
      val name: String = strings(1)
      val age: Int = strings(2).toInt
      val gender: String = strings(3)
      val clazz: String = strings(4)
      statement.setInt(1,id)
      statement.setString(2,name)
      statement.setInt(3,age)
      statement.setString(4,gender)
      statement.setString(5,clazz)
      statement.execute()
    })

  }

任务没有被序列化:
在这里插入图片描述
这是为什么呢:
是因为我们在将任务提交到集群中运行的时候,会有网络IO,所以要经过一个序列化,而我们算子之外的代码都是在Driver端运行的,算子内的代码都是通过Driver端发送到Executor端执行,这里我们的连接是在master端做的,但是运行计算是在node1、node2端运行的,类似MySQL这种网络连接不能被序列化,然后传递给node1、node2执行

我们就需要在算子内部创建连接

二:
解决:把整个代码放进master运行,不发送给node1和node2了

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo5MapPartition")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")

    stuRDD.foreach(line=>{
      Class.forName("com.mysql.jdbc.Driver")

      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
      val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")

      val strings: Array[String] = line.split(",")
      val id: Int = strings(0).toInt
      val name: String = strings(1)
      val age: Int = strings(2).toInt
      val gender: String = strings(3)
      val clazz: String = strings(4)
      statement.setInt(1,id)
      statement.setString(2,name)
      statement.setInt(3,age)
      statement.setString(4,gender)
      statement.setString(5,clazz)
      statement.execute()
    })
  }

这时我发现数据已经可以存进去一部分了,但是又会出现连接过多这个错误
在这里插入图片描述
在这里插入图片描述
三:
我们在每一个foreach里面,没创建一次连接,使用完之后都关闭一次连接

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo5MapPartition")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")

    stuRDD.foreach(line=>{
      Class.forName("com.mysql.jdbc.Driver")

      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
      val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")

      val strings: Array[String] = line.split(",")
      val id: Int = strings(0).toInt
      val name: String = strings(1)
      val age: Int = strings(2).toInt
      val gender: String = strings(3)
      val clazz: String = strings(4)
      statement.setInt(1,id)
      statement.setString(2,name)
      statement.setInt(3,age)
      statement.setString(4,gender)
      statement.setString(5,clazz)
      statement.execute()

      conn.close()
      statement.close()
    })
  }

发现数据可以存入MySQL数据库
在这里插入图片描述
四:频繁的创建和关闭连接非常耗时
这里每一个分区都会创建一个task,每一个task都会创建一次连接
这里我们就可以使用foreachPartition,并且每个分区执行完所有的插入之后再关闭连接

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo5MapPartition")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt",6)

    stuRDD.foreachPartition(line=>{
      Class.forName("com.mysql.jdbc.Driver")

      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
      val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")

      line.foreach(
        line=>{
          val strings: Array[String] = line.split(",")
          val id: Int = strings(0).toInt
          val name: String = strings(1)
          val age: Int = strings(2).toInt
          val gender: String = strings(3)
          val clazz: String = strings(4)
          statement.setInt(1,id)
          statement.setString(2,name)
          statement.setInt(3,age)
          statement.setString(4,gender)
          statement.setString(5,clazz)
          statement.execute()
      })
      conn.close()
      statement.close()

    })

五:这里可以使用批处理的方式,再次提升效率

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo5MapPartition")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt",6)

    stuRDD.foreachPartition(line=>{
      Class.forName("com.mysql.jdbc.Driver")

      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
      val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")

      line.foreach(
        line=>{
          val strings: Array[String] = line.split(",")
          val id: Int = strings(0).toInt
          val name: String = strings(1)
          val age: Int = strings(2).toInt
          val gender: String = strings(3)
          val clazz: String = strings(4)
          statement.setInt(1,id)
          statement.setString(2,name)
          statement.setInt(3,age)
          statement.setString(4,gender)
          statement.setString(5,clazz)
          statement.execute()
          statement.addBatch()
      })
      statement.executeBatch()
      conn.close()
      statement.close()

    })
  }

我们这里一步一步的分析之后发现,在有了foreach的时候,我们可以使用其插入数据提高效率,再紧接着我们又发现了一些问题,比如创建链接,网络IO,我们又发现使用foreachPartition可以接着提升效率,而foreachPartition适合批量“写出”数据到外部,mapPartition适合批量“读取”数据到内部

记一些有意思的算子

转换算子:将一个RDD变成另一个RDD,是懒执行的,必须要操作算子触发才能执行
操作算子:不难将一个RDD转换成另一个RDD,每一个操作算子都会触发一个job(在sparkUI界面可以看到)
一个spark程序可以包含很多个job,可以通过算子的返回值来判断该算子是转换算子还是操作算子

MapPartition

相当于按分区去处理数据

//相当于拿到每个分区的数据来计算
    stuRDD.mapPartitions(
      rdd=>{
        println("mapPartition")
        rdd.map(
          line=>{
            val name: String = line.split(",")(1)
            name
          }
        )

    }).foreach(println)

foreachPartition

相当于拿到每一个分区的数据来进行计算

stuRDD.foreachPartition(
      rdd=>{
        rdd.map(
          line=> {
            val name: String = line.split(",")(1)
            name
        }).take(10).foreach(println)
      }
    )

reduceByKey和groupByKey的区别

reduceByKey
在map端会进行一个预聚合,提高效率,但是只能做一些简单的操作,groupByKey虽然效率没有这么高,但是可以做更多复杂的操作
在这里插入图片描述

感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-11 12:46:40  更:2021-11-11 12:46:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 0:30:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码