I know, i know 地球另一端有你陪我
一、一些架构
1、Spark 架构
Driver : 1、负责任务的调度,将 task 发送到 Excutor 上执行 2、在 yarn-cluster 模式时兼顾资源申请的功能(被 ApplicationMaster 兼并) 3、算子外部的代码会在 Driver 中执行 4、BlockManagerMaster 负责接收 Executor 中的 BlockManager 的请求并回复
Executor : 1、运行 task 的线程池(task 是一个个线程对象) 2、计算资源(CPU、内存),由 ApplicationMaster 向 ResourceManager 申请得到 3、BlockManager 负责 Executor 中的各种资源管理 (RDD 的缓存、累加器和广播变量的数据、Shuffle 产生的文件) ?????ConnectionManager :负责创建连接 ?????BlockTransferService :负责获取数据 ?????MemoryStore :负责管理内存的数据(不同于 Hbase 中的那个) ?????DiskStore :负责管理磁盘的数据
二、常用算子
包含了 Scala 中的大部分常见算子,按照返回对象的类型可以分为:
转换算子:
由一个 RDD返回另一个RDD,是RDD之间的转换,
是懒执行的,需要action算子触发执行
行为算子:
由一个RDD调用,但最后没有返回新的RDD,而是返回了其他数据类型,
行为算子可以触发任务的执行,每个action算子都会触发一个job
1、一些转换算子
MapPartitions
package com.shujia
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo4MapPartitions {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo4MapPartitions")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val lineRDD: RDD[String]
= sc.textFile("Spark/data/words.txt")
lineRDD.mapPartitions((iter: Iterator[String]) => {
println("map partitions")
iter.flatMap(line => line.split(","))
}).foreach(println)
lineRDD.mapPartitionsWithIndex((index, iter) => {
println("当前的分区索引:" + index)
iter.flatMap(line => line.split(","))
}).foreach(println)
lineRDD.map(line => {
println("map")
line.split(",")
}).foreach(println)
}
}
ForeachPartition
package com.shujia
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import java.sql.{Connection, DriverManager, PreparedStatement}
object Demo5ForeachPartition {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo5ForeachPartitions")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String] = sc.textFile("Spark/data/students.txt", 4)
println(linesRDD.getNumPartitions)
linesRDD.foreachPartition(iter => {
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student", "root", "123456")
val st: PreparedStatement = conn.prepareStatement("insert into stu values(?,?,?,?,?)")
iter.foreach(line => {
val splits: Array[String] = line.split(",")
val id: Int = splits(0).toInt
val name: String = splits(1)
val age: Int = splits(2).toInt
val gender: String = splits(3)
val clazz: String = splits(4)
st.setInt(1, id)
st.setString(2, name)
st.setInt(3, age)
st.setString(4, gender)
st.setString(5, clazz)
st.addBatch()
})
st.executeBatch()
st.close()
conn.close()
})
}
}
groupByKey (key ,(v1,v2,v3…))
package com.shujia
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo8GroupBy {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo8GroupBy")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String]
= sc.textFile("Spark/data/students.txt")
val clazzRDD: RDD[(String, Int)]
= linesRDD.map(line => (line.split(",")(4), 1))
val groupRDD: RDD[(String, Iterable[(String, Int)])]
= clazzRDD.groupBy(kv => kv._1)
groupRDD.foreach(println)
groupRDD.map {
case (clazz: String, clazzIter: Iterable[(String, Int)]) => {
clazz + "," + clazzIter.map(_._2).sum
}
}.foreach(println)
clazzRDD
.groupByKey()
.map(kv => (kv._1, kv._2.sum))
.foreach(println)
}
}
ReduceByKey (key ,(聚合函数))
package com.shujia
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo9ReduceByKey {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo8GroupBy")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val linesRDD: RDD[String]
= sc.textFile("Spark/data/students.txt")
val clazzRDD: RDD[(String, Int)]
= linesRDD.map(line => (line.split(",")(4), 1))
clazzRDD
.groupByKey()
.map(kv => (kv._1, kv._2.sum))
.foreach(println)
clazzRDD.reduceByKey((i: Int, j: Int) => {
i + j
}).foreach(println)
clazzRDD.reduceByKey(_ + _).foreach(println)
while (true) {
}
}
}
MapValues
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo12MapValues {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo12MapValues")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)]
= sc.parallelize(List(("张三", 1), ("李四", 2), ("王五", 3)))
rdd.mapValues(i => i * i).foreach(println)
}
}
Sort
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo13Sort {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo12MapValues")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String]
= sc.textFile("Spark/data/stu/students.txt")
stuRDD
.sortBy(line => line.split(",")(2), ascending = false)
.foreach(println)
}
}
Join
package com.shujia
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Demo10Join {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo8GroupBy")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String]
= sc.textFile("Spark/data/students.txt")
val scoRDD: RDD[String] = sc.textFile("Spark/data/score.txt")
val stuKVRDD: RDD[(String, String)] = stuRDD.map(line => (line.split(",")(0), line.replace(",", "|")))
val scoKVRDD: RDD[(String, String)] = scoRDD.map(line => (line.split(",")(0), line.replace(",", "|")))
val joinRDD: RDD[(String, (String, String))] = stuKVRDD.join(scoKVRDD)
joinRDD.map {
case (id: String, (stu: String, sco: String)) =>
val stuSplits: Array[String] = stu.split("\\|")
val name: String = stuSplits(1)
val clazz: String = stuSplits(4)
val scoSplits: Array[String] = sco.split("\\|")
val sid: String = scoSplits(1)
val score: String = scoSplits(2)
s"$id,$name,$clazz,$sid,$score"
}.foreach(println)
joinRDD.map(kv => {
val id: String = kv._1
val stuScoT2: (String, String) = kv._2
val stu: String = stuScoT2._1
val sco: String = stuScoT2._2
val stuSplits: Array[String] = stu.split("\\|")
val name: String = stuSplits(1)
val clazz: String = stuSplits(4)
val scoSplits: Array[String] = sco.split("\\|")
val sid: String = scoSplits(1)
val score: String = scoSplits(2)
s"$id,$name,$clazz,$sid,$score"
}).foreach(println)
stuKVRDD.join(scoKVRDD).foreach(println)
}
}
2、一些执行算子
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo14Action {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo12MapValues")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String]
= sc.textFile("Spark/data/students.txt")
stuRDD.foreach(println)
println(stuRDD.count())
val stuArr: Array[String] = stuRDD.collect()
val blackListRDD: RDD[String]
= sc.parallelize(List("1500100001", "1500100007", "1500100009"))
val blacklistArr: Array[String] = blackListRDD.collect()
stuRDD.filter(line => {
blacklistArr.contains(line.split(",")(0))
}).foreach(println)
val sumAge: Int = stuRDD
.map(line => line.split(",")(2).toInt)
.reduce((i, j) => i + j)
println(sumAge)
val ids: Seq[String]
= stuRDD.map(line => (line.split(",")(1), line.split(",")(0)))
.lookup("尚孤风")
println(ids)
val stuArr2: Array[String] = stuRDD.take(10)
stuArr2.foreach(println)
}
}
partition 改变分区的方法
package core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo20Partition {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo20Partition")
conf.setMaster("local")
conf.set("spark.default.parallelism", "4")
val sc: SparkContext = new SparkContext(conf)
val lineRDD: RDD[String] = sc.textFile("Spark/data/words")
println("lineRDD的分区数量为:" + lineRDD.getNumPartitions)
val reParRDD: RDD[String] = lineRDD.repartition(5)
println("经过repartition过后的分区数:" + reParRDD.getNumPartitions)
val coalesceRDD: RDD[String]
= lineRDD.coalesce(10, shuffle = true)
println("经过coalesce过后的分区数:" + coalesceRDD.getNumPartitions)
val coalesceRDD2: RDD[String] = lineRDD.coalesce(1)
println("第二个经过coalesce过后的分区数:"
+ coalesceRDD2.getNumPartitions)
val wordsRDD: RDD[String]
= lineRDD.flatMap(line => line.split(","))
println("wordsRDD的分区数量为:" + wordsRDD.getNumPartitions)
val wordKVRDD: RDD[(String, Int)]
= wordsRDD.map(word => (word, 1))
println("wordKVRDD的分区数量为:" + wordKVRDD.getNumPartitions)
val wordCntRDD: RDD[(String, Int)]
= wordKVRDD.reduceByKey(_ + _, 6)
println("wordCntRDD的分区数量为:" + wordCntRDD.getNumPartitions)
wordCntRDD.foreach(println)
}
}
三、接着有的没的
1、checkpoint
将 RDD 的数据“缓存”到 HDFS 先会完成一次计算任务,然后再往回回溯到调用了 checkpoint 的 RDD, 将其标记并重新启动一个 job,重头开始计算该标记的 RDD,最后写入 HDFS
一般用于SparkStreaming中的容错
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo17Cache {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo17Cache")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
sc.setCheckpointDir("Spark/data/stu/checkpoint")
val stuRDD: RDD[String] = sc.textFile("Spark/data/students.txt")
val mapStuRDD: RDD[String] = stuRDD.map(line => {
println("========student========")
line
})
mapStuRDD.cache()
mapStuRDD.checkpoint()
val clazzKVRDD: RDD[(String, Int)] = mapStuRDD
.map(line => (line.split(",")(4), 1))
val clazzCnt: RDD[(String, Int)] = clazzKVRDD
.reduceByKey((i, j) => i + j)
clazzCnt
.foreach(println)
val genderKVRDD: RDD[(String, Int)] = mapStuRDD
.map(line => (line.split(",")(3), 1))
val genderCnt: RDD[(String, Int)] = genderKVRDD
.reduceByKey(_ + _)
genderCnt
.foreach(println)
mapStuRDD.unpersist()
while (true) {
}
}
}
2、cache
不同于 checkpoint 将数据存储于 HDFS,cache 是将数据存储于自定义位置 cache 可以通过 persist 选择储存的不同等级,cache 本身相当于 MEMORY_ONLY offHeap 堆外内存 deserialized 否序列化,选择 false 即使用序列化,会压缩数据,相对会需要CPU 资源 replication 是否需要副本
用完记得 unpersist 释放资源
3、累加器 广播变量
累加器 由于 Spark 中,算子内部(executor)和算子外部(driver)的运行位置是不同的, 一个在算子外部建立的对象,是不能直接通过网络将原对象传进算子内部, 而是会建立一个原对象的副本(replication)去 executor 供算子使用,类似值传递, 因此在算子中对对象进行的操作是不能改变算子外部的原对象
Spark 针对这种情况提供了累加器 Accumulator,需要由 SparkContext 来定义 但其实功能并不多
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object Demo18ACC {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo18ACC")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
var i: Int = 1
val stuRDD: RDD[String]
= sc.textFile("Spark/data/students.txt")
val acc: LongAccumulator = sc.longAccumulator
val lb: ListBuffer[String] = ListBuffer[String]()
stuRDD.foreach(line => {
acc.add(1)
})
println(acc.value)
}
}
广播变量 虽然无法修改到算子外部的表,但是还是可以拿来直接用的,毕竟还有副本 但每次使用,都会按 task 进行 replication 的拷贝,内存的负荷非常大 这里可以使用广播变量 类似小表广播,会将对象传到每一个 executor 上,相较于 task ,executor 少很多 算子内再次调用该对象就会从 executor 中去请求,一样需要由 SparkContext 来定义
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo19Broadcast {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("Demo18ACC")
conf.setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String]
= sc.textFile("Spark/data/stu/students.txt")
val idsList: List[Int]
= List(1500100001, 1500100011, 1500100021, 1500100031)
val idsListBro: Broadcast[List[Int]] = sc.broadcast(idsList)
stuRDD.filter(line => {
val id: Int = line.split(",")(0).toInt
idsListBro.value.contains(id)
}).foreach(println)
}
}
四、任务调度
流程 1、向 ResourceManager 申请启动 ApplicationMaster 2、ResourceManager 会随机选择一个 NodeManager 来启动 ApplicationMaster 3、AM 会反过来向 RM 申请一批资源(CPU,内存)启动 Executor 4、在 NM 中分配启动 Executor 5、Executor 反向注册给 Dirver(本地机器或 AM 兼并)表示已准备就绪 6、扫描 Application,遇到 action 算子时开始任务调度 7、构建 DAG 有向无环图,将 job 罗列出来。形成 DAGScheduler 8、根据宽依赖,将 job 切分成多个 stage 9、将 stage 按照顺序以 task set 的形式发送给 taskScheduler 10、taskScheduler 将 task 一个个发送到 Executor 中执行,会尽可能发送到数据所在的节点上
重试机制 1、如果 task 执行失败,taskScheduler 会重试该 task 三次 2、如果重试三次依然失败,会回退到 DAGScheduler 重试四次
注:如果是因为shuffle file not found 导致的执行失败,会直接回退到上一个 stage 进行重试
推测执行 如果有一个 task 执行很慢,taskScheduler 将该 task 发送给其他 Executor 执行 以先完成的结果为准
yarn-client 模式 yarn-cluster 模式 一个较大的区别是 Driver 端启动的位置不一样 另一个是日志的打印位置,cluster 不会将日志全部打印在本地(Driver) 可以避免数据的过量传输
|