IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 合并小文件hdfs -> 正文阅读

[大数据]合并小文件hdfs

运行脚本,获取文件数据按照 400M一个block 存储 数据,每个省份 需要分成几块

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
import java.text.SimpleDateFormat
import java.util.Date
import java.io.{FileSystem => _, _}
import org.apache.hadoop.fs._
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
import java.net.URI
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.sql.SaveMode
import java.text.SimpleDateFormat
import java.util.Date
import scala.math._


val map = new java.util.HashMap[String,Int]()
//分别计算各省应该合并到多少个小文件,广东、江苏、浙江这些文件数量比较大的,可以不做合并
//输入目录 如:"********/day=20210620/prov="
val dir_name="********/day=20210701/prov="
//希望缩小到的文件大小(单位:M) 400
val file_size=400

val fs = FileSystem.get(new Configuration())
val prov_list = Array("11","12","13","14","15","21","22","23","31","32","33","34","35","36","37","41","42","43","44","45","46","50","51","52","53","54","61","62","63","64","65")
for (prov_id <- prov_list){
? ? val hdfs_path=s"${dir_name}"+prov_id
? ? def list_sum(list:List[Long])={var sum=0L;list.foreach(sum+=_);sum}
? ? val input_files = fs.listStatus(new Path(hdfs_path))
? ? val list_details ?= input_files.map(m => m.getLen.toLong).toList
? ? val p_nums = ceil(list_sum(list_details).toFloat/1024/1024/file_size).toInt + 1
? ? println("map.put(\""+prov_id+"\","+p_nums+")")
}

然后根据 生成的 map 参数 运行下面的程序
??

#!/bin/bash

sd_hour=$1

if [ "X${sd_hour}" == "X" ]; then
  sd_hour=`date -d "-1 day" +"%Y%m%d%H"`
fi

day=${sd_hour:0:8}
mo2wei=${sd_hour:6:8}
nextday=`date -d "$day 1 day" +"%Y%m%d"`
lastday=`date -d "$day -1 day" +"%Y%m%d"`
month=${sd_hour:0:6}
lastmonth=`date -d "$day -1 month" +"%Y%m"`
last7day=`date -d "$day -7 day" +"%Y%m%d"`


kerberos_conf="**************"

conf="--conf spark.files.ignoreMissingFiles=true  
--conf spark.sql.files.ignoreMissingFiles=true  
--conf spark.files.ignoreCorruptFiles=true 
--conf spark.sql.files.ignoreCorruptFiles=true 
--conf spark.sql.shuffle.partitions=200 
--conf spark.default.parallelism=600 
--conf spark.driver.maxResultSize=5g 
--conf spark.yarn.executor.memoryOverhead=1024 
--conf spark.executor.extraJavaOptions=-Xss4m 
--conf spark.driver.extraJavaOptions=-Xss4m 
--conf spark.port.maxRetries=100"

task_name=**************
driver_memory=10g
num_executors=20
executor_memory=8g
executor_cores=1
master=yarn


shell="/usr/spark-client/bin/spark-shell \
--conf spark.ui.filters= \
--queue ************** \
--conf spark.hadoop.dfs.replication=2 \
--name ${task_name} \
--master ${master} \
--executor-memory ${executor_memory} \
--executor-cores ${executor_cores} \
--num-executors ${num_executors} \
--driver-memory ${driver_memory} \ 
${conf} \
${kerberos_conf} "


echo "*************************** Program run begin, begin time: `date` ***********************************"
eval $shell <<!EOF
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
import java.text.SimpleDateFormat
import java.util.Date
import java.io.{FileSystem => _, _}
import org.apache.hadoop.fs._
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
import java.net.URI
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.sql.SaveMode
import java.text.SimpleDateFormat
import java.util.Date

val map = new java.util.HashMap[String,Int]()
map.put("11",60)
map.put("12",20)
map.put("13",100)
map.put("14",60)
map.put("15",50)
map.put("21",80)
map.put("22",40)
map.put("23",30)
map.put("31",40)
map.put("32",200)
map.put("33",200)
map.put("34",80)
map.put("35",60)
map.put("36",60)
map.put("37",100)
map.put("41",100)
map.put("42",100)
map.put("43",40)
map.put("44",400)
map.put("45",70)
map.put("46",20)
map.put("50",50)
map.put("51",133)
map.put("52",50)
map.put("53",100)
map.put("54",3)
map.put("61",20)
map.put("62",25)
map.put("63",10)
map.put("64",10)
map.put("65",25)

val prov_list = Array("11","12","13","14","15","21","22","23","31","34","35","36","37","41","42","43","45","46","50","51","52","53","54","61","62","63","64","65")
 for (prov_id <- prov_list){
 println("start:"+prov_id+"    "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()))
 var num = map.get(prov_id)
if(num<20){
spark.read.parquet(s"**************/day=$day/prov="+prov_id).repartition(num).write.mode("overwrite").option("compression","gzip").parquet(s"**************/tmp_day=$day/prov="+prov_id)
}
else{
spark.read.parquet(s"**************/day=$day/prov="+prov_id).coalesce(num).write.mode("overwrite").option("compression","gzip").parquet(s"**************/tmp_day=$day/prov="+prov_id)
}
println("end:"+prov_id+"    "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()))
}

!EOF
hdfs dfs -mv **************/day=$day/prov=32 **************/tmp_day=$day/
hdfs dfs -mv **************/day=$day/prov=33 **************/tmp_day=$day/
hdfs dfs -mv **************/day=$day/prov=44 **************/tmp_day=$day/
hdfs dfs -mv **************/day=$day  **************/
hdfs dfs -mv **************/tmp_day=$day  **************/day=$day
echo "*************************** Program run over, over time:   `date` ***********************************"
exit 0


??
??

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-25 12:16:45  更:2021-08-25 12:17:49 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:56:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码