0 背景
因为项目项目需要实现修改json数据后重新写入,原本解析json使用的json4s(Scala3貌似可以去使用circe-json ),但是要实现替换json中的值然后重新写回到文件中实现起来方便(没有找到相对应的借口),于是去寻找了一个相对易用的包。找到了org.json ,于是就用此包实现了相对应的功能。
1 数据准备(读入数据)
包:
import java.io._
import org.json._
import org.apache.commons.io.FileUtils
import java.io.File
//写入的包
import java.io.FileOutputStream
import java.io.OutputStreamWriter
读入文件并转为jsonObject 对象:
val filePath = "/user/kafka.json"
val file = new File(filePath)
val content= FileUtils.readFileToString(file,"UTF-8")
val jsonObject:JSONObject =new JSONObject(content)
2 解析json文件
例子如下:
{
"description": "This is json File",
"stu.info": [
{
"name":"xiaoming",
"sex":"boy",
"grade":98,
"isOverseasStudent":false
},
{
"name":"xiaohua",
"sex":"gril",
"grade":89,
"isOverseasStudent":true
}],
"school":["USTC", "ZJU", "PK", "QH"]
}
解析代码如下:
jsonObject.getString("description")
val studInfoJsonArray = jsonObject.getJSONArray("stu.info")
var i: Int = 0
while ( i < studInfoJsonArray.length) {
val tmpJsonOject = studInfoJsonArray.getJSONObject (i)
println(tmpJsonOject.getString("name"))
println(tmpJsonOject.getBoolean("isOverseasStudent"))
i += 1
}
val schoolJsonArray = jsonObject.getJSONArray("school")
var i: Int = 0
while ( i < schoolJsonArray.length) {
val school = schoolJsonArray.getString (i)
println(school)
i += 1
}
3 组装json
这里通过实现替换xiaoming 的成绩为100分和替换学校中的PK 为SJ ,来展示组装功能.
var writeJsonObject =new JSONObject(content)
var tmpStuInfoJsonArray = new JSONArray()
var tmpSchoolJsonArray = new JSONArray()
writeJsonObject.put("description", jsonObject.getString("description"))
val studInfoJsonArray = jsonObject.getJSONArray("stu.info")
var i: Int = 0
while ( i < studInfoJsonArray.length) {
val tmpJsonOject = studInfoJsonArray.getJSONObject (i)
if(tmpJsonOject.getString("name") == "xiaoming"){
tmpJsonOject.put("grade", 80)
}else{}
tmpStuInfoJsonArray.put(tmpJsonOject)
i += 1
}
writeJsonObject.put("stu.info", tmpStuInfoJsonArray)
val schoolJsonArray = jsonObject.getJSONArray("school")
var i: Int = 0
while ( i < schoolJsonArray.length) {
val school = schoolJsonArray.getString (i)
if(school == "PK"){
tmpSchoolJsonArray.put("SJ")
}else{
tmpSchoolJsonArray.put(school)
}
i += 1
}
writeJsonObject.put("school", tmpSchoolJsonArray)
4 json写入到文件
4.1 直接写入
val osw:OutputStreamWriter = new OutputStreamWriter(new FileOutputStream(orderFilePath, false), "UTF-8")
osw.write(writeJsonObject.toString)
osw.flush
写入后的结果如下:
{"description":"This is json File","stu.info":[{"name":"xiaoming","sex":"boy","grade":80,"isOverseasStudent":false},{"name":"xiaohua","sex":"gril","grade":89,"isOverseasStudent":true}],"school":["USTC","ZJU","SJ","QH"]}
4.2 格式化json字符串后写入
格式化json的类(可以根据自己的需求,增改对字符的判断):
import scala.util.control.Breaks.{break, breakable}
class JsonFormatTool {
private val isTab = false
def stringToJSON(strJson: String): String = {
var isString = false
var tabNum = 0
val jsonFormat = new StringBuffer
val length = strJson.length
var i: Int = 0
while ( {
i < length
}) {
breakable {
val c = strJson.charAt(i)
if(c == '"'){
if(isString){
isString = false
}else{
isString = true
}
jsonFormat.append(c)
}else if (c == '{' || c == '[') {
if(isString == false){
tabNum += 1
jsonFormat.append(c + "\n")
jsonFormat.append(getSpaceOrTab(tabNum))
}else{
jsonFormat.append(c)
}
}
else if (c == '}' || c == ']') {
if(isString == false){
tabNum -= 1
jsonFormat.append("\n")
jsonFormat.append(getSpaceOrTab(tabNum))
}
jsonFormat.append(c)
}
else if (c == ',') {
if (isString == false) {
jsonFormat.append(c + "\n")
jsonFormat.append(getSpaceOrTab(tabNum))
} else {
jsonFormat.append(c)
}
}
else if(c == '\\'){
if(strJson.charAt(i + 1) == 'n' || strJson.charAt(i + 1) == 't'){
i += 2
break
}else{
jsonFormat.append(c)
}
}
else{
jsonFormat.append(c)
}
i += 1
}
}
jsonFormat.toString
}
def getSpaceOrTab(tabNum: Int): String = {
val sbTab = new StringBuffer
for (i <- 0 until tabNum) {
if (isTab) sbTab.append('\t')
else sbTab.append(" ")
}
sbTab.toString
}
}
使用json格式化工具类,写入文件:
val tableSchema = "{\"id\": \"int\", \"trans_code\": \"string\", \"account_id\": \"string\", \"pay_channel_code\": \"string\", \"total_trans_price\": \"double\", \"total_trans_cost\": \"double\", \"create_time\": \"string\", \"update_time\": \"string\", \"_hoodie_is_deleted\": \"boolean\", \"hudi_delta_streamer_update_time\": \"string\", \"hudi_delta_streamer_ingest_date\": \"string\"}"
writeJsonObject.put("table.schema", tableSchema)
val orderFilePath = "/user/kafka2.json"
var tool = new JsonFormatTool()
val jsonString = tool.stringToJSON(writeJsonObject.toString)
val osw:OutputStreamWriter = new OutputStreamWriter(new FileOutputStream(orderFilePath, false), "UTF-8")
osw.write(jsonString)
osw.flush
写入后的结果为:
{
"description":"This is json File",
"stu.info":[
{
"name":"xiaoming",
"sex":"boy",
"grade":80,
"isOverseasStudent":false
},
{
"name":"xiaohua",
"sex":"gril",
"grade":89,
"isOverseasStudent":true
}
],
"school":[
"USTC",
"ZJU",
"SJ",
"QH"
],
"table.schema":"{\"id\": \"int\", \"trans_code\": \"string\", \"account_id\": \"string\", \"pay_channel_code\": \"string\", \"total_trans_price\": \"double\", \"total_trans_cost\": \"double\", \"create_time\": \"string\", \"update_time\": \"string\", \"_hoodie_is_deleted\": \"boolean\", \"hudi_delta_streamer_update_time\": \"string\", \"hudi_delta_streamer_ingest_date\": \"string\"}"
4 使用json4s
包:
import org.json4s._
import scala.io.Source
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
读取文件并转换为json对象:
val paramKafakaHudiFilePath = "/software/member/config/kafkaHudi.json"
val kafakaHudiFileStr = Source.fromFile(paramKafakaHudiFilePath).getLines.toList.mkString("\n")
val jsonData = parse(kafakaHudiFileStr)
解析json对象:
val JString(sparkSessionMaster) = (jsonData \ "description")
解析jsonArray中的对象:
val kafkaHudiList = for{
JObject(hoodieParam) <- jsonData
JField("hoodie.deltastreamer.source.kafka.topic", JString(topic)) <- hoodieParam
JField("hoodie.deltastreamer.write.file.path", JString(savePath)) <- hoodieParam
JField("hoodie.datasource.write.table.name", JString(tableName)) <- hoodieParam
JField("hoodie.datasource.write.recordkey.field", JString(recordkey)) <- hoodieParam
JField("hoodie.datasource.write.precombine.field", JString(precombine)) <- hoodieParam
JField("hoodie.datasource.write.partitionpath.field", JString(partitionpath)) <- hoodieParam
JField("hoodie.datasource.write.table.type", JString(writeTableType)) <- hoodieParam
JField("hoodie.datasource.write.operation", JString(writeOperation)) <- hoodieParam
JField("table.schema", JString(tableSchema)) <- hoodieParam
} yield (topic, savePath, tableName, recordkey, precombine, partitionpath, writeTableType, writeOperation,tableSchema)
解析的json字符串:
{
"description": "Write Kafka multi topic data to Hudi table",
"hoodie.param": [
{
"hoodie.deltastreamer.source.kafka.topic": "jk_test_model5.jk_test2.jk34.output",
"hoodie.deltastreamer.write.file.path": "/test_hudi/jk_test/sub_trans17",
"hoodie.datasource.write.table.name": "tb_sub_trans_detail_mor",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.precombine.field": "hudi_delta_streamer_update_time",
"hoodie.datasource.write.partitionpath.field": "hudi_delta_streamer_ingest_date",
"table.schema": "{\"id\": \"int\", \"trans_code\": \"string\", \"account_id\": \"string\", \"client_trans_code\": \"string\", \"com_name\": \"string\", \"trans_number\": \"int\", \"trans_price\": \"double\", \"trans_cost\": \"double\", \"total_trans_price\": \"double\", \"total_trans_cost\": \"double\", \"create_time\": \"string\", \"update_time\": \"string\", \"_hoodie_is_deleted\": \"boolean\", \"hudi_delta_streamer_update_time\": \"string\", \"hudi_delta_streamer_ingest_date\": \"string\"}",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert"
}]
}
|