1.写入hive的指定分区中写入数据
ResulToOra2.createTempView("tmp")
spark.sql(
s"""
|insert overwrite table pdr.T_PSSC_TRAN_PORTRAIT_CENTER partition(dt='${sdfDay.format(date)}') select * from tmp
|
""".stripMargin)
println("write hive end")
2.写入HDFS
ResulToES
.coalesce(1)
.write.format("csv")
.mode("overwrite")
.option("header", "true")
.option("encoding", "utf-8")
.save(s"hdfs://master01:8020/tmp/tran_es_${sdfMonth.format(date)}")
println("write hdfs end")
2.1 这里coalesce(1)的作用是:一次生成一个文件,如果数据量太大,建议多建各个分区
3.写入ES
val cfg = Map(
"es.nodes" -> FileProperties.ES_NODES,
"es.port" -> FileProperties.ES_PORT,
"es.resource" -> s"pssc_tran_portrait-${sdfMonth.format(date)}",
"es.mapping.id" -> "TRAN_ID",
"es.write.operation" -> "upsert",
"es.net.http.auth.user" -> "账号",
"es.net.http.auth.pass" -> "密码"
)
EsSparkSQL.saveToEs(ResulToES, cfg)
println("write es end")
3.1DataFrame写入ES只需要写一个map
3.2导入import org.elasticsearch.spark.sql.EsSparkSQL
3.3EsSparkSQL.saveToEs(ResulToES, cfg)
3.4前3步都是代码中的,ES中也需要写一个模板,下面是我工作中写过的一个模板,可以参考一下
PUT /_template/pssc_tran_portrait_template
{
"index_patterns": "pssc_tran_portrait-*",
"settings": {
"number_of_shards": 3
},
"mappings": {
"_source": {
"enabled": true
},
"properties": {
"10102": {
"type": "keyword",
"store": true
},
"10107": {
"type": "keyword",
"store": true
},
"10108": {
"type": "keyword",
"store": true
},
"10109": {
"type": "keyword",
"store": true
},
"10110": {
"type": "keyword",
"store": true
},
"10111": {
"type": "keyword",
"store": true
},
"10112": {
"type": "keyword",
"store": true
},
"10113": {
"type": "keyword",
"store": true
},
"10114": {
"type": "keyword",
"store": true
},
"10115": {
"type": "keyword",
"store": true
},
"10116": {
"type": "keyword",
"store": true
},
"10117": {
"type": "keyword",
"store": true
},
"10118": {
"type": "keyword",
"store": true
},
"10119": {
"type": "keyword",
"store": true
},
"10120": {
"type": "keyword",
"store": true
},
"10121": {
"type": "keyword",
"store": true
},
"10210": {
"type": "keyword",
"store": true
},
"10213": {
"type": "keyword",
"store": true
},
"10214": {
"type": "keyword",
"store": true
},
"10215": {
"type": "keyword",
"store": true
},
"10216": {
"type": "keyword",
"store": true
},
"10217": {
"type": "keyword",
"store": true
},
"10220": {
"type": "keyword",
"store": true
},
"10221": {
"type": "keyword",
"store": true
},
"30102": {
"type": "keyword",
"store": true
},
"30107": {
"type": "keyword",
"store": true
},
"tran_id": {
"type": "keyword",
"store": true,
"ignore_above": 80
},
"tran_name": {
"type": "keyword",
"store": true
},
"currYearPQ": {
"type": "nested",
"properties": {
"year_month": {
"type": "integer",
"store": true
},
"P_MONTH_MAX": {
"type": "double",
"store": true
},
"Q_MONTH_MAX": {
"type": "double",
"store": true
}
}
},
"lastYearPQ": {
"type": "nested",
"properties": {
"last_year_month": {
"type": "integer",
"store": true
},
"LAST_P_MONTH_MAX": {
"type": "double",
"store": true
},
"LAST_Q_MONTH_MAX": {
"type": "double",
"store": true
}
}
}
}
},
"aliases": {
"search-pssc_tran_portrait": { }
}
}
4.写入oracle
4.1公司中一般对删除Oracle中数据进行封装
def delAnyTable(tabName: String, condition: String): Unit = {
var conn: Connection = null
var delPS: PreparedStatement = null
val delSql = s"delete $tabName where $condition "
println("delSql:"+delSql)
try {
Class.forName(FileProperties.oracleDriver)
conn = DriverManager.getConnection(ORA_PDR_URL, ORA_PDR_CONNP)
delPS = conn.prepareStatement(delSql)
delPS.execute()
delPS.close()
println(s"====================删除 $tabName 完毕!")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (delPS != null) {
delPS.close()
}
if (conn != null) {
conn.close()
}
}
}
4.2 这里删除Oracle数据时,因为Oracle的url和配置被写死,只需要传入了表名和过滤条件,就可以对表进去删除操作,但无法对不同的Oracle数据库进行删除,你也可以增加参数url和Properties,来达到对不同数据库进行删除操作
def delAnyTable(tabName: String, condition: String,url :String,connp :Properties): Unit = {
var conn: Connection = null
var delPS: PreparedStatement = null
val delSql = s"delete $tabName where $condition "
println("delSql:"+delSql)
try {
Class.forName(FileProperties.oracleDriver)
conn = DriverManager.getConnection(url, connp)
delPS = conn.prepareStatement(delSql)
delPS.execute()
delPS.close()
println(s"====================删除 $tabName 完毕!")
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (delPS != null) {
delPS.close()
}
if (conn != null) {
conn.close()
}
}
}
4.3同样对插入Oracle中的数据进行封装
def toOracleInsert(df: DataFrame, tableName: String) {
val schema: Array[String] = getType(df)
val fields = df.schema.fieldNames.mkString(",")
val fields1 = schema.map(r => "?").mkString(",")
val sql = s"""insert into $tableName ($fields) values($fields1)"""
df.repartition(16).foreachPartition(row => {
var conn: Connection = null
var ps: PreparedStatement = null
try {
Class.forName(FileProperties.oracleDriver)
conn = DriverManager.getConnection(ORA_PDR_URL, ORA_PDR_CONNP)
ps = conn.prepareStatement(sql)
conn.setAutoCommit(false)
var iCnt: Int = 0
row.foreach(data => {
iCnt += 1
ExecPS(data, ps, schema)
ps.addBatch()
if (iCnt % FileProperties.BATCH_COMMIT_CNT == 0 && iCnt != 0) {
println(s"============================$iCnt")
ps.executeBatch()
ps.clearBatch()
}
if (iCnt % FileProperties.OVER_COMMIT_CNT == 0 && iCnt != 0){
conn.commit()
}
})
println(s"============================$iCnt")
ps.executeBatch()
ps.clearBatch()
conn.commit()
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
})
}
4.4 同样也可以增加参数,达到对不同Oracle数据库中的数据进行删除操作
def toOracleInsert(df: DataFrame, tableName: String,url :String,connp :Properties) {
val schema = getType(df)
val fields = df.schema.fieldNames.mkString(",")
val fields1 = schema.map(r => "?").mkString(",")
val sql = s"""insert into $tableName ($fields) values($fields1)"""
df.repartition(16).foreachPartition(row => {
var conn: Connection = null
var ps: PreparedStatement = null
try {
Class.forName(FileProperties.oracleDriver)
conn = DriverManager.getConnection(url, connp)
ps = conn.prepareStatement(sql)
conn.setAutoCommit(false)
var iCnt: Int = 0
row.foreach(data => {
iCnt += 1
ExecPS(data, ps, schema)
ps.addBatch()
if (iCnt % FileProperties.BATCH_COMMIT_CNT == 0 && iCnt != 0) {
println(s"============================$iCnt")
ps.executeBatch()
ps.clearBatch()
}
if (iCnt % FileProperties.OVER_COMMIT_CNT == 0 && iCnt != 0){
conn.commit()
}
})
println(s"============================$iCnt")
ps.executeBatch()
ps.clearBatch()
conn.commit()
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
})
}
4.5 以上的方法一般同时调用
ResaveToOracle.delAnyTable("T_PSSC_TRAN_PORTRAIT_CENTER", s"STAT_DATE=${sdfMonth.format(date)}")
ResaveToOracle.toOracleInsert(ResulToOra2, "T_PSSC_TRAN_PORTRAIT_CENTER")
5.总结
以上都是企业实战,支持回写。
|