大数据量mysql数据分区(时间)导入hive,Spark,scala实现
说明:代码包含了mysql分区导入hive,hive导入mysql,scala编写
package datasource
import java.sql.{Connection, DriverManager}
import java.text.SimpleDateFormat
import java.util
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
object DataBaseRead {
def main(args: Array[String]) = {
val url = "jdbc:mysql://localhost:3306/test"
val tablename = "stu1"
val user = "root"
val password = "root"
val column = "createTime"
val coltype = "date"
val partition = "50"
ReadSql(url, tablename, user, password, column, coltype, partition)
}
def ReadSql(url: String, tablename: String, user: String, password: String, column: String, coltype: String, partition: String) = {
val spark: SparkSession = getSpark
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
prop.put("url", url)
prop.put("dbtable", tablename)
prop.put("user", user)
prop.put("password", password)
if (coltype.toLowerCase() == "long") {
val ab = LongTypeConn("com.mysql.jdbc.Driver", url, user, password, column, tablename)
val lowerNum = ab(0)
val upperNum = ab(1)
val longFrame = spark.read.jdbc(
prop.getProperty("url"),
prop.getProperty("dbtable"),
column, lowerNum, upperNum,
partition.toInt, prop
)
}
else if (coltype.toLowerCase() == "date") {
var arr2 = DateTypeConn("com.mysql.jdbc.Driver", url, user, password, column, tablename, partition.toInt)
val strings: Array[String] = arr2.toArray[String]
for (elem <- strings) {
println(elem)
}
val dateFrame = spark.read.jdbc(
prop.getProperty("url"),
prop.getProperty("dbtable"),
strings,prop)
dateFrame.createTempView("temp")
val dtypes: Array[(String, String)] = dateFrame.dtypes
var arr=""
for (elem <- dtypes) {
arr+=elem
}
val ctr ="create table if not exists "+tablename+"("+"\n\t"
val str: String = arr.replace("IntegerType", "int").replace("StringType","string")
.replace("DoubleType", "double").replace("DateType","date").replace("LongType","bigint").replace("TimestampType","string")
.replace(","," ").replace(")",",\n\t").replace("(","").dropRight(3)
var end ="\n\t)"
var fengen=","
var fenge="\nrow format delimited fields terminated by '"+fengen+"'"
var sparkSql=ctr+str+end+fenge
println(sparkSql)
spark.sql(sparkSql)
spark.sql("insert into table "+tablename+" select * from temp")
spark.close()
}
def LongTypeConn(driver: String, url: String, user: String, password: String, column: String, tablename: String): ArrayBuffer[Long] = {
var conn: Connection = null
val array = new ArrayBuffer[Long]()
try {
Class.forName(driver)
conn = DriverManager.getConnection(url, user, password)
val stat = conn.createStatement()
val rs = stat.executeQuery("select min(" + column + ") as minNum,max(" + column + ") as maxNum from " + tablename)
while (rs.next()) {
val minNum = rs.getLong("minNum")
val maxNum = rs.getLong("maxNum")
array.append(minNum)
array.append(maxNum)
}
return array
} catch {
case e: Exception => e.printStackTrace()
return array
}
conn.close()
return array
}
}
def DateTypeConn(driver: String, url: String, user: String, password: String, column: String, tablename: String, partition: Int):ArrayBuffer[String] ={
var conn:Connection = null
val array = new ArrayBuffer[String]()
val resArray = ArrayBuffer[(String,String)]()
var lastArray = ArrayBuffer[String]()
try{
Class.forName(driver)
conn = DriverManager.getConnection(url,user,password)
val stat = conn.createStatement()
val rs = stat.executeQuery("select min(" +column +") as minNum,max(" + column + ") as maxNum from " + tablename)
while (rs.next()){
val minNum = rs.getString("minNum")
val maxNum = rs.getString("maxNum")
array.append(minNum)
array.append(maxNum)
}
if(array(0).contains("-")){
val sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var hehe=sf.parse(array(0)).getTime()
var minTime = sf.parse(array(0)).getTime()
val maxTime = sf.parse(array(1)).getTime()
val subNum = (maxTime - minTime)/partition.toLong
var midNum = minTime
for(i <- 0 to partition - 1){
minTime = midNum
midNum = midNum + subNum
if(i == 0){
resArray.append(sf.format(minTime) -> sf.format(midNum))
}else if(i == partition - 1){
resArray.append(sf.format(minTime) -> sf.format(maxTime))
}else{
resArray.append(sf.format(minTime) -> sf.format(midNum))
}
}
for (elem <- resArray) {
if(elem._1.toString==sf.format(hehe)){
lastArray+= column.toString + ">= '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'"
}else{
lastArray+= column.toString + "> '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'"
}
}
return lastArray
}else{
val sf = new SimpleDateFormat("yyyyMMdd HH:mm:ss")
var minTime = sf.parse(array(0)).getTime()
val maxTime = sf.parse(array(1)).getTime()
val subNum = (maxTime - minTime)/partition.toLong
var midNum = minTime
for(i <- 0 to partition - 1){
minTime = midNum
midNum = midNum + subNum
if(i == 0){
resArray.append(sf.format(minTime) -> sf.format(midNum))
}else if(i == partition - 1){
resArray.append(sf.format(minTime) -> sf.format(maxTime))
}else{
resArray.append(sf.format(minTime) -> sf.format(midNum))
}
}
for (elem <- resArray) {
if(elem._1.toString==sf.format(hehe)){
lastArray+= column.toString + ">= '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'"
}else{
lastArray+= column.toString + "> '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'"
}
}
return lastArray
}
return lastArray
}catch {
case e:Exception => e.printStackTrace()
return lastArray
}
conn.close()
return lastArray
}
def getSpark={
val conf=new SparkConf().set("spark.sql.inMemoryColumnarStorage.Compressed","true").set("spark.sql.crossJoin","true").setAppName("mysql in hive").setMaster("local[*]")
val hiveSpark=SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
hiveSpark
}
def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: String) = {
var table = "result"
val prop = new Properties
prop.setProperty("user", "root")
prop.setProperty("password", "root")
prop.setProperty("driver", "com.mysql.jdbc.Driver")
prop.setProperty("url", "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf-8")
if (saveMode == SaveMode.Overwrite) {
var conn: Connection = null
try {
conn = DriverManager.getConnection(
prop.getProperty("url"),
prop.getProperty("user"),
prop.getProperty("password")
)
val stmt = conn.createStatement
table = table.toUpperCase
stmt.execute(s"truncate table $table")
conn.close()
}
catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}
}
dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), table, prop)
}
}
|