运行脚本,获取文件数据按照 400M一个block 存储 数据,每个省份 需要分成几块
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
import java.text.SimpleDateFormat
import java.util.Date
import java.io.{FileSystem => _, _}
import org.apache.hadoop.fs._
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
import java.net.URI
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.sql.SaveMode
import java.text.SimpleDateFormat
import java.util.Date
import scala.math._
val map = new java.util.HashMap[String,Int]()
//分别计算各省应该合并到多少个小文件,广东、江苏、浙江这些文件数量比较大的,可以不做合并
//输入目录 如:"********/day=20210620/prov="
val dir_name="********/day=20210701/prov="
//希望缩小到的文件大小(单位:M) 400
val file_size=400
val fs = FileSystem.get(new Configuration())
val prov_list = Array("11","12","13","14","15","21","22","23","31","32","33","34","35","36","37","41","42","43","44","45","46","50","51","52","53","54","61","62","63","64","65")
for (prov_id <- prov_list){
? ? val hdfs_path=s"${dir_name}"+prov_id
? ? def list_sum(list:List[Long])={var sum=0L;list.foreach(sum+=_);sum}
? ? val input_files = fs.listStatus(new Path(hdfs_path))
? ? val list_details ?= input_files.map(m => m.getLen.toLong).toList
? ? val p_nums = ceil(list_sum(list_details).toFloat/1024/1024/file_size).toInt + 1
? ? println("map.put(\""+prov_id+"\","+p_nums+")")
}
然后根据 生成的 map 参数 运行下面的程序 ??
#!/bin/bash
sd_hour=$1
if [ "X${sd_hour}" == "X" ]; then
sd_hour=`date -d "-1 day" +"%Y%m%d%H"`
fi
day=${sd_hour:0:8}
mo2wei=${sd_hour:6:8}
nextday=`date -d "$day 1 day" +"%Y%m%d"`
lastday=`date -d "$day -1 day" +"%Y%m%d"`
month=${sd_hour:0:6}
lastmonth=`date -d "$day -1 month" +"%Y%m"`
last7day=`date -d "$day -7 day" +"%Y%m%d"`
kerberos_conf="**************"
conf="--conf spark.files.ignoreMissingFiles=true
--conf spark.sql.files.ignoreMissingFiles=true
--conf spark.files.ignoreCorruptFiles=true
--conf spark.sql.files.ignoreCorruptFiles=true
--conf spark.sql.shuffle.partitions=200
--conf spark.default.parallelism=600
--conf spark.driver.maxResultSize=5g
--conf spark.yarn.executor.memoryOverhead=1024
--conf spark.executor.extraJavaOptions=-Xss4m
--conf spark.driver.extraJavaOptions=-Xss4m
--conf spark.port.maxRetries=100"
task_name=**************
driver_memory=10g
num_executors=20
executor_memory=8g
executor_cores=1
master=yarn
shell="/usr/spark-client/bin/spark-shell \
--conf spark.ui.filters= \
--queue ************** \
--conf spark.hadoop.dfs.replication=2 \
--name ${task_name} \
--master ${master} \
--executor-memory ${executor_memory} \
--executor-cores ${executor_cores} \
--num-executors ${num_executors} \
--driver-memory ${driver_memory} \
${conf} \
${kerberos_conf} "
echo "*************************** Program run begin, begin time: `date` ***********************************"
eval $shell <<!EOF
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
import java.text.SimpleDateFormat
import java.util.Date
import java.io.{FileSystem => _, _}
import org.apache.hadoop.fs._
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
import java.net.URI
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.sql.SaveMode
import java.text.SimpleDateFormat
import java.util.Date
val map = new java.util.HashMap[String,Int]()
map.put("11",60)
map.put("12",20)
map.put("13",100)
map.put("14",60)
map.put("15",50)
map.put("21",80)
map.put("22",40)
map.put("23",30)
map.put("31",40)
map.put("32",200)
map.put("33",200)
map.put("34",80)
map.put("35",60)
map.put("36",60)
map.put("37",100)
map.put("41",100)
map.put("42",100)
map.put("43",40)
map.put("44",400)
map.put("45",70)
map.put("46",20)
map.put("50",50)
map.put("51",133)
map.put("52",50)
map.put("53",100)
map.put("54",3)
map.put("61",20)
map.put("62",25)
map.put("63",10)
map.put("64",10)
map.put("65",25)
val prov_list = Array("11","12","13","14","15","21","22","23","31","34","35","36","37","41","42","43","45","46","50","51","52","53","54","61","62","63","64","65")
for (prov_id <- prov_list){
println("start:"+prov_id+" "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()))
var num = map.get(prov_id)
if(num<20){
spark.read.parquet(s"**************/day=$day/prov="+prov_id).repartition(num).write.mode("overwrite").option("compression","gzip").parquet(s"**************/tmp_day=$day/prov="+prov_id)
}
else{
spark.read.parquet(s"**************/day=$day/prov="+prov_id).coalesce(num).write.mode("overwrite").option("compression","gzip").parquet(s"**************/tmp_day=$day/prov="+prov_id)
}
println("end:"+prov_id+" "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()))
}
!EOF
hdfs dfs -mv **************/day=$day/prov=32 **************/tmp_day=$day/
hdfs dfs -mv **************/day=$day/prov=33 **************/tmp_day=$day/
hdfs dfs -mv **************/day=$day/prov=44 **************/tmp_day=$day/
hdfs dfs -mv **************/day=$day **************/
hdfs dfs -mv **************/tmp_day=$day **************/day=$day
echo "*************************** Program run over, over time: `date` ***********************************"
exit 0
?? ??
|