将程序打成jar包上传任务
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo2WordCountSubmit {
/**
* 1.去除setMaster("local")
* 2.修改文件的输入输出路径(提交到集群默认是从HDFS获取数据,需要修改成HDFS的路径)
* 3.在HDFS中创建目录
* hdfs dfs -mkdir -p /spark/data/works
* 4.将数据上传至HDFS
* hdfs dfs -put words.txt /spark/data/works
* 5.将程序打成jar包
* 6.将jar包上传至虚拟机,然后通过spark-submit提交(两种提交方式,一种本地打印日志,一种本地不打印)
* spark-submit --class Demo2WordCountSubmit --master yarn-client /opt/modules/hadoop-2.7.6/share/hadoop/mapreduce/Spark-1.0-SNAPSHOT.jar //这里client模式,本地会有反馈
* spark-submit --class Demo2WordCountSubmit --master yarn-cluster /opt/modules/hadoop-2.7.6/share/hadoop/mapreduce/Spark-1.0-SNAPSHOT.jar //这里cluster模式,提交上去就行了,就不用管了
*
*/
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("Demo2WordCountSubmit")
val sc = new SparkContext()
val linesRDD: RDD[String] = sc.textFile("/spark/data/works/WordCount.txt")
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(","))
val wordRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy(word=>word)
val countRDD: RDD[(String, Int)] = wordRDD.map(
word => {
val id: String = word._1
val words: Iterable[String] = word._2
val size: Int = words.size
(id, size)
}
)
//使用javaApi判断输出路径是否存在,存在即删除
val hdfsconf: Configuration = new Configuration()
hdfsconf.set("fs.defaultFS","hdfs://master:9000")
val fs: FileSystem = FileSystem.get(hdfsconf)
//判断输出路径是否存在
if(fs.exists(new Path("/spark/data/wordCount"))){
fs.delete(new Path("/spark/data/wordCount"),true)
}
countRDD.saveAsTextFile("/spark/data/wordCount")
}
}
On Yarn两种模式对比(client模式和cluster模式)
client模式(本地有反馈)运行过程中,给它杀掉进程,发现直接就挂掉了 cluster模式(本地无反馈)运行过程中,给它杀掉进程,程序会照常运行(只要集群还在,就会跑完)
这地方的节点可能会被很多人用,很多人提交到同一个节点,可能会造成网卡风暴 Yarn-client
本地不启动Driver程序 这里的Driver端不再是本机了,而是一台nodemanager上的applicationmaster 这里的任务就不知道在哪打印了,因为这里是任意一台nodemanager上的任务 Yarn-cluster
将每条数据写到MySQL,对比每种方式的不同(为什么有了foreach还需要有foreachPartition)
一: 我们按照原本的想法,获取每一条数据,之后一条条存入MySQL中,发现会出现一个错误
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Demo5MapPartition")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")
Class.forName("com.mysql.jdbc.Driver")
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")
stuRDD.foreach(line=>{
val strings: Array[String] = line.split(",")
val id: Int = strings(0).toInt
val name: String = strings(1)
val age: Int = strings(2).toInt
val gender: String = strings(3)
val clazz: String = strings(4)
statement.setInt(1,id)
statement.setString(2,name)
statement.setInt(3,age)
statement.setString(4,gender)
statement.setString(5,clazz)
statement.execute()
})
}
任务没有被序列化: 这是为什么呢: 是因为我们在将任务提交到集群中运行的时候,会有网络IO,所以要经过一个序列化,而我们算子之外的代码都是在Driver端运行的,算子内的代码都是通过Driver端发送到Executor端执行,这里我们的连接是在master端做的,但是运行计算是在node1、node2端运行的,类似MySQL这种网络连接不能被序列化,然后传递给node1、node2执行
我们就需要在算子内部创建连接
二: 解决:把整个代码放进master运行,不发送给node1和node2了
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Demo5MapPartition")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")
stuRDD.foreach(line=>{
Class.forName("com.mysql.jdbc.Driver")
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")
val strings: Array[String] = line.split(",")
val id: Int = strings(0).toInt
val name: String = strings(1)
val age: Int = strings(2).toInt
val gender: String = strings(3)
val clazz: String = strings(4)
statement.setInt(1,id)
statement.setString(2,name)
statement.setInt(3,age)
statement.setString(4,gender)
statement.setString(5,clazz)
statement.execute()
})
}
这时我发现数据已经可以存进去一部分了,但是又会出现连接过多这个错误 三: 我们在每一个foreach里面,没创建一次连接,使用完之后都关闭一次连接
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Demo5MapPartition")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt")
stuRDD.foreach(line=>{
Class.forName("com.mysql.jdbc.Driver")
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")
val strings: Array[String] = line.split(",")
val id: Int = strings(0).toInt
val name: String = strings(1)
val age: Int = strings(2).toInt
val gender: String = strings(3)
val clazz: String = strings(4)
statement.setInt(1,id)
statement.setString(2,name)
statement.setInt(3,age)
statement.setString(4,gender)
statement.setString(5,clazz)
statement.execute()
conn.close()
statement.close()
})
}
发现数据可以存入MySQL数据库 四:频繁的创建和关闭连接非常耗时 这里每一个分区都会创建一个task,每一个task都会创建一次连接 这里我们就可以使用foreachPartition,并且每个分区执行完所有的插入之后再关闭连接
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Demo5MapPartition")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt",6)
stuRDD.foreachPartition(line=>{
Class.forName("com.mysql.jdbc.Driver")
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")
line.foreach(
line=>{
val strings: Array[String] = line.split(",")
val id: Int = strings(0).toInt
val name: String = strings(1)
val age: Int = strings(2).toInt
val gender: String = strings(3)
val clazz: String = strings(4)
statement.setInt(1,id)
statement.setString(2,name)
statement.setInt(3,age)
statement.setString(4,gender)
statement.setString(5,clazz)
statement.execute()
})
conn.close()
statement.close()
})
五:这里可以使用批处理的方式,再次提升效率
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setMaster("local")
conf.setAppName("Demo5MapPartition")
val sc: SparkContext = new SparkContext(conf)
val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt",6)
stuRDD.foreachPartition(line=>{
Class.forName("com.mysql.jdbc.Driver")
val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?characterEncoding=utf-8", "root", "123456")
val statement: PreparedStatement = conn.prepareStatement("insert into students values(?,?,?,?,?)")
line.foreach(
line=>{
val strings: Array[String] = line.split(",")
val id: Int = strings(0).toInt
val name: String = strings(1)
val age: Int = strings(2).toInt
val gender: String = strings(3)
val clazz: String = strings(4)
statement.setInt(1,id)
statement.setString(2,name)
statement.setInt(3,age)
statement.setString(4,gender)
statement.setString(5,clazz)
statement.execute()
statement.addBatch()
})
statement.executeBatch()
conn.close()
statement.close()
})
}
我们这里一步一步的分析之后发现,在有了foreach的时候,我们可以使用其插入数据提高效率,再紧接着我们又发现了一些问题,比如创建链接,网络IO,我们又发现使用foreachPartition可以接着提升效率,而foreachPartition适合批量“写出”数据到外部,mapPartition适合批量“读取”数据到内部
记一些有意思的算子
转换算子:将一个RDD变成另一个RDD,是懒执行的,必须要操作算子触发才能执行 操作算子:不难将一个RDD转换成另一个RDD,每一个操作算子都会触发一个job(在sparkUI界面可以看到) 一个spark程序可以包含很多个job,可以通过算子的返回值来判断该算子是转换算子还是操作算子
MapPartition
相当于按分区去处理数据
//相当于拿到每个分区的数据来计算
stuRDD.mapPartitions(
rdd=>{
println("mapPartition")
rdd.map(
line=>{
val name: String = line.split(",")(1)
name
}
)
}).foreach(println)
foreachPartition
相当于拿到每一个分区的数据来进行计算
stuRDD.foreachPartition(
rdd=>{
rdd.map(
line=> {
val name: String = line.split(",")(1)
name
}).take(10).foreach(println)
}
)
reduceByKey和groupByKey的区别
reduceByKey 在map端会进行一个预聚合,提高效率,但是只能做一些简单的操作,groupByKey虽然效率没有这么高,但是可以做更多复杂的操作
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。
|