本文主要讲解应用SparkContext从hdfs上获取数据,做搜索,结果传到mysql上的过程!
一、应用scala编程,批处理上传数据到hdfs上
1.1.在maven工程的同目录小创建config目录,在下面在创建datasource.properties配置文件config/datasource.properties配置文件的内容为:
mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://192.168.43.200:3306/test?useSSL=true&useUnicode=true&characterEncoding=utf-8
mysql.username=root
mysql.password=ok
1.2.创建/util/common,/util/mysql目录结构。
1.3.在common下写配置文件
import java.io._
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import cn.kgc.util.common.Config._
class Config{
private var path:String = _
protected def _getPath:String = path
protected def _init(conf:Properties)(configPath:String)(f:()=>Unit = ()=>{})(createIfNotExist:Boolean=false):Unit = {
path = Config.init(conf)(configPath)(f)(createIfNotExist)
}
protected def _serialize(map:ConcurrentMap[String,Int], path:String="config/serialize.log"):String = {
val _path = s"${configRoot()}/$path"
serialize(map)(_path)
}
protected def _deserialize(path:String = "config/serialize.log"):ConcurrentMap[String,Int] = {
val _path = s"${configRoot()}/$path"
deserialize(_path)
}
protected def _close(closes:AutoCloseable*) = {
close(closes)
}
}
object Config {
protected def close(closes:Seq[AutoCloseable]):Unit
= closes.filterNot(null == _).foreach(close => {
try {
close.close()
} catch {
case e:Throwable => e.printStackTrace()
}
})
protected def configRoot()={
val path = Thread.currentThread()
.getContextClassLoader
.getResource("")
.getPath
new File(path)
.getParentFile
.getParentFile
.getParentFile
.getAbsolutePath
}
protected def init(conf: Properties)(configPath:String)(f:()=>Unit)(createIfNotExist:Boolean):String = {
var path = ""
try {
path = configRoot()
path = s"$path/$configPath"
if(createIfNotExist){
val file = new File(path)
val dir = file.getParentFile
if(!dir.exists()){
dir.mkdirs()
}
if(!file.exists()){
file.createNewFile()
}
}
conf.load(new FileReader(path))
f()
} catch {
case e: Throwable => {
println(e.getMessage)
System.exit(-1)
}
}
path
}
//序列化
protected def serialize(map:ConcurrentMap[String,Int])(path:String):String ={
if(null != map && !map.isEmpty){
var fis:FileOutputStream = null
var oos:ObjectOutputStream = null
try {
fis = new FileOutputStream(path)
oos = new ObjectOutputStream(fis)
oos.writeObject(map)
} catch {
case e:Throwable => e.printStackTrace()
} finally {
close(Seq(oos,fis))
}
}
path
}
//反序列化
protected def deserialize(path:String):ConcurrentMap[String,Int] = {
var fis : FileInputStream = null
var ois : ObjectInputStream = null
try {
fis = new FileInputStream(path)
ois = new ObjectInputStream(fis)
ois.readObject().asInstanceOf[ConcurrentMap[String,Int]]
} catch {
case e:Throwable => {
println(e.getMessage)
new ConcurrentHashMap[String,Int]()
}
} finally {
close(Seq(ois,fis))
}
}
}
1.4创建特质:(为了做批处理用的)
trait Batch {
def init(sql:String, batchSize:Int):Unit
def addBatch(params:Any*):Unit
def close:Long
}
1.5在mysql下创建MySqlDao类(MySqlDao.scala)
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import java.util.Properties
import cn.kgc.util.common.Config
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
/**
* scala -> mysql
*/
class MySqlDao(path:String) extends Config {
private val _conf:Properties = new Properties()
private var driver:String = _
private var url:String = _
private var username:String = _
private var password:String = _
init(path)
//解析配置文件:E:/Projects/scala/config/datasource.properties
private def init(path:String):Unit = {
_init(_conf)(path)(()=>{
driver = _conf.getProperty("mysql.driver")
url = _conf.getProperty("mysql.url")
username = _conf.getProperty("mysql.username")
password = _conf.getProperty("mysql.password")
if(null==driver || null==url || null==username || null==password){
throw new NullPointerException("Null for driver or url or username or password")
}
Class.forName(driver) //加载驱动
})()
}
//获取链接
private def getCon:Connection
= DriverManager.getConnection(url,username,password)
private def getPst(con:Connection, sql:String, params:Seq[Any]=Seq())(implicit batched:Boolean = false):PreparedStatement
= {
val pst = con.prepareStatement(sql)
if(!batched){
if(!params.isEmpty){
params.zipWithIndex.foreach(tp2=>{
pst.setObject(tp2._2 + 1, tp2._1)
})
}
}
pst
}
//执行增,删,改
def update(sql:String, params:Any*):Option[Int] = {
var opt:Option[Int] = None
var con:Connection = null
var pst:PreparedStatement = null
try{
con = getCon
pst = getPst(con, sql, params)
opt = Some(pst.executeUpdate())
}catch{
case e:Throwable => println(e.getMessage)
}finally{
_close(pst, con)
}
opt
}
//批处理
def exeBatch(sql:String,params:ArrayBuffer[Any])(f:()=>Int):Either[String,Int] = {
var rtn:Either[String,Int] = Left("UNKNOWN")
var con:Connection = null
var pst:PreparedStatement = null
//sql中的?数量,也就是列的数量
val FIELD_COUNT:Int = sql.split("\\?").size - 1
val ONE_BATCH:Int = f()*FIELD_COUNT
//批处理方法
def exe():Int = {
val sum = pst.executeBatch.sum
pst.clearBatch
sum
}
//单批次处理
def fill(seq:Seq[Any]):Int = {
val ixs:Seq[Int] = 0 until FIELD_COUNT
//按行滚动
seq
.sliding(FIELD_COUNT,FIELD_COUNT)
.foreach(seq=>{
//向执行者传参
ixs.foreach(ix=>pst.setObject(ix+1, seq(ix)))
//将当前行添加到当前批次中
pst.addBatch()
})
exe()
}
implicit val batched:Boolean = true
try {
con = getCon
pst = getPst(con,sql)
rtn = Right(
params
//按批次行滚动
.sliding(ONE_BATCH, ONE_BATCH)
//按批次执行并返回每个批次执行成功数量
.map(fill)
//累加所有执行成功的数量
.sum
)
} catch {
case e:Throwable => rtn = Left(e.getMessage)
} finally {
_close(pst, con)
}
rtn
}
//Batch特质内部类实现
def createBatch():Batch = {
new Batch {
private var con:Connection = null
private var pst:PreparedStatement = null
private var _batchSize:Int = 0
private var batchCount = 0
private var affectedRows:Long = 0
override def init(sql:String, batchSize:Int): Unit = {
con = getCon
pst = getPst(con, sql)
_batchSize = batchSize
}
private def exe(forced:Boolean = false): Unit = {
if (forced || batchCount % _batchSize == 0) {
affectedRows += pst.executeBatch().sum
pst.clearBatch()
}
}
override def addBatch(params: Any*): Unit = {
params
.zipWithIndex
.foreach(tp2 => pst.setObject(tp2._2 + 1, tp2._1))
pst.addBatch()
batchCount += 1
exe()
}
override def close: Long = {
exe(true)
_close(pst, con)
affectedRows
}
}
}
//执行查询
//class[T]静态类型:编译时有效,classTag[T] 动态类型:运行时有效
def select[T:ClassTag](f:Array[String]=>T)(sql:String, params:Any*)(implicit c:ClassTag[T]):Option[Array[T]] = {
var opt:Option[Array[T]] = None
var con:Connection = null
var pst:PreparedStatement = null
var rst:ResultSet = null
try{
con = getCon
pst = getPst(con, sql, params)
rst = pst.executeQuery
val buffer = ArrayBuffer[T]()
val colCount = rst.getMetaData.getColumnCount
val arr = new Array[String](colCount)
val seq = for(i<-0 until colCount) yield i
while (rst.next()) {
seq.foreach(ix=>arr(ix)=rst.getString(ix+1))
buffer += f(arr)
}
opt = Some(buffer.toArray(c))
} catch {
case e:Throwable => e.printStackTrace()
} finally {
_close(rst, pst, con)
}
opt
}
}
//伴生对象
object MySqlDao{
def apply(path:String): MySqlDao = new MySqlDao(path)
}
1.6测试类:(批量插入)
在测试类之前,先创建mysql表格,在用测试类导入数据
score_kb16(stu_name,stu_gender,java_score,mysql_score)
import scala.util.Random
object Test {
def main(args: Array[String]): Unit = {
val batch:Batch = new MySqlDao("config/datasource.properties").createBatch()
val sql:String ="insert into score_kb16(stu_name,stu_gender,java_score,mysql_score) values(?,?,?,?)"
batch.init(sql,100)
Array(
("henry","m",55+Random.nextInt(45),55+Random.nextInt(45)),
("ariel","f",55+Random.nextInt(45),55+Random.nextInt(45)),
("pola","f",55+Random.nextInt(45),55+Random.nextInt(45)),
("jack","m",55+Random.nextInt(45),55+Random.nextInt(45)),
("rose","f",55+Random.nextInt(45),55+Random.nextInt(45))
).foreach(tp4=>batch.addBatch(tp4._1,tp4._2,tp4._3,tp4._4))
val affectedRows:Long =batch.close
println(affectedRows)
}
}
1.7数据插入成功了,可以打jar包,以后要用mysql上传数据,就可以用这个jar包了
在idea上双击install,会在里的本地仓库生成jar包,点开pom.xml,里面的groupId,artifactId,version,下次导入到需要使用这个jar包的maven工程pom.xml的dependency里就可以使用jar包了
?二、创建想的maven工程,创建scala对象,使用SparkContext从hdfs上获取数据做SQL练习,结果批处理上传到mysql上
import cn.kgc.util.mysql.{Batch, MySqlDao}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
/*
SparkContext:应用
*/
object App {
def main(args: Array[String]): Unit = {
val config: SparkConf = new SparkConf()
.setAppName("spark_rdd_03")//设置任务名称
.setMaster("local[*]")//设置Master,本地模式,“ * ” 表示cup资源有多少,用多少;也可以用local[2],表示2个核
val sc = new SparkContext(config)
//统计用户的观影数,重复看最多的前三名movieId,个人热点观影时段(每3个小时内的时间段(1,2)或(2,3)或(1,2,3)占三个小时总观影数>80%)
//数据:tags.csv
//样例类
case class Tag(userId: Int, movieId: Int, tag: String, timestamp: Long)
//用正则处理数据
val r1 = "(.*?),(.*?),(.*?),(.*?)".r //正常数据匹配
val r2 = "(.*?),(.*?),(\".*?,.*?\"+),(.*?)".r //异常数据匹配
//偏函数处理数据异常
val pf: PartialFunction[String, (Int, Tag)] = (line: String) => line match {
case r2(a, b, c, d) => (a.toInt, Tag(a.toInt, b.toInt, c, d.toLong))
case r1(a, b, c, d) => (a.toInt, Tag(a.toInt, b.toInt, c, d.toLong))
}
//补全每天数据
def fillIfNot24(arr:Array[(String,(Int,Int))]):Iterator[(String,(Int,Int,Int))]={
val map: Map[Int, Int] = arr.map(_._2)
.groupBy(_._1) //以小时为单位分组
.map(tp2 => (tp2._1, tp2._2.map(_._2).distinct.size))//(小时,每小时内的观看不同的影数量)
val seqToTp3=(seq:IndexedSeq[(Int,Int)])=>(seq(0)._2,seq(1)._2,seq(2)._2)
(0 to 23 )
//补全每天缺失的小时数
.map(hour=>(hour,map.getOrElse(hour,0)))
.sliding(3,1)
.map(seq=>{
val h3: String = seq.map(_._1).mkString("_")
//(连续3小时,每连续3小时内的观看不同的影数量(步进1))
(h3,seqToTp3(seq))
})
}
val int3OutTp2=(map:Map[String,(Int,Int,Int)])=>{
val fstHourSum: Int = map.map(_._2._1).sum
val sndHourSum: Int = map.map(_._2._2).sum
val trdHourSum: Int = map.map(_._2._3).sum
val sum =fstHourSum+sndHourSum+trdHourSum
val h3 = map.map(_._1).toArray.apply(0)
val h3s= h3.split("_")
var accSum =0.0
val buffer = new ArrayBuffer[Int](1)
import scala.util.control.Breaks._
var lastIx = -1
//找出3个连续小时内达到要求的(>80%)小时对应的下标
breakable(
Array(fstHourSum, sndHourSum, trdHourSum)
.zipWithIndex
//按观影数降序排列
.sortWith(_._1>_._1)
.foreach(tp2=>{
if(Math.abs(lastIx-tp2._2)<=1){
accSum +=tp2._1
buffer.append(tp2._2)
}
//达到80%跳出
if(accSum/sum>=0.8){
lastIx = -1
break
}
lastIx=tp2._2
})
)
//对达到要求的小时的下标进行升序排序后再拼接
(sum,if(lastIx != -1) h3 else buffer
.sortWith(_<_)
.map(h3s(_))
.mkString("_"))
}
implicit class LongExpand(v:Long){ //隐式类
def toDate()="%tF".format(v) //提取日期yyyy-MM-dd
def toHour()="%tT".format(v).substring(0,2).toInt //提取小时
}
/*
事先要提前在mysql上创建好表格
show databases;
use test;
show tables;
create table tags_user_analysis(
userId int primary key ,
uqMovieCount int,
top3Movie varchar(100),
hotWatchHours varchar(8)
);
*/
//批处理上传数据,MySqlDao是自己编的jar包里的类,用于批处理上传数据到mysql
val batch:Batch = new MySqlDao("config/datasource.properties").createBatch()
val sql:String = "insert into tags_user_analysis(userId,uqMovieCount,top3Movie,hotWatchHours) values(?,?,?,?)"
batch.init(sql,100)
// tags_user_analysis(userId ,uqMovieCount ,top3Movie ,hotWatchHours)
sc
.textFile("hdfs://single01:9000/spark/resource/src/tags.csv", 3)
.mapPartitionsWithIndex((index, it) => { //给每个分区建立索引了
if (index == 0) it.drop(1) //删除第一个分区的表头
it.collect(pf) //把每行数据解析
})
//shuffle 分3个分区,以键聚合 同一个userId
.groupByKey(3)
.mapValues(it=>{
val arr = it.toArray //迭代器转成数组 (三个指标,重复使用)
val uqMovieCount: Int= arr.map(_.movieId).distinct.size //第一个指标
val top3MovieId: String = arr //第二个指标
.map(tag => (tag.movieId, 1))
.groupBy(_._1).map(tp2 => (tp2._1, tp2._2.size))//(movieId,重复观看电影数)
.toArray
.sortWith(_._2 > _._2)
.take(3)
.mkString("-")
val tp2: (Int, String) = arr
.map(tag => (tag.timestamp.toDate, (tag.timestamp.toHour, tag.movieId)))
//以天分组
.groupBy(_._1)
.flatMap(tp2 => fillIfNot24(tp2._2)) //(String,((Int,Int),(Int,Int),(Int,Int))) 一天的数据
//对所有天数以小时分组
.groupBy(_._1)
.mapValues(int3OutTp2)
.map(_._2)
.toArray
.sortWith(_._1 > _._1)
.apply(0)
(uqMovieCount,top3MovieId,tp2._2)
})
// .foreach(println)
// 把处理过得数据并写入到hdfs里
// .saveAsTextFile(s"hdfs://single01:9000/spark/source/src/tags_${System.currentTimeMillis().toDate()}_${System.currentTimeMillis()}")
//把处理过的数据写入到mysql表 tags_user_analysis中
.coalesce(1,false)
.collect()
.foreach(tp2 => batch.addBatch(tp2._1, tp2._2._1, tp2._2._2, tp2._2._3))
val affectedRows:Long = batch.close
println(s"AFFECTED ROWS : $affectedRows")
sc.stop()
}
}
好了,完成了1
|