本篇将从实际编程中遇到的一些问题,总结的部分经验来理解spark
一、spark提交脚本的参数选择
注:仅从实际常用需求阐述,并不包含所有参数 (1)程序跑的太慢 相信每个人最容易碰到的情况就是资源分配不够,程序迟迟运行不完,而此时只能苦苦等待(不是),所以我们在分配资源时一定要合理,个人认为如果资源充足,就多分配一些 举个例子,我们知道,一般默认一个核运行两到三个task,一个task一般几百兆到一个G,所以默认一个核一般跑2-3个G
--driver-memory 4G --executor-memory 40g --num-executors 8 --executor-cores 10 \
所以我们对于要处理的数据大小和其中的join等操作次数要有了解,进而才能根据数据大小合理的分配资源 假设我们要处理的csv文件为800M,如果join另一个800M的文件,那么就是64G.而一个核运行2-3G,所以这里分配60核就可以 (2)程序内容已经跑出来,但是迟迟不结束 查看日志后发现,执行程序超时 这里需要修改的参数为
--conf spark.executor.heartbeatInterval=540s \
之前太高,现在调整成60
二、使程序一直跑下去
(1)如何一直跑 1.既然要一直跑,也就是说需要反复调用程序,也就是说要使用循环,这里我们使用while循环来完成
boolean flag = true;
while (flag) {
}
2.然后为了让程序能够按照我们需要的循环时间来进行,我们传入两个参数,分别代表开始时间和结束时间
String BeginTime = args[0];
String EndTime = args[1];
3.此时我们可以简单的进行循环,然后BeginTime 递增,
此处代码部分不做示范
4.但是由于一些需求它要求读取按小时为单位的数据(例如从hive数据库中读取),此时我们需要保证所读取的数据是完整的一个小时的,而非未满一个小时的数据,因此,这里我们先获取到一个当前时间
String[] quotaTime = Util.getConf("/lujing/conf.csv");
采用一个if判断(此处判断执行时间BeginTime与当前时间差距70以上)
if (DateUtils.getMinBetweenDate(quotaTime[0], BeginTime) > 70) {
(2)中的内容放在这里
if (BeginTime==EndTime){
flag=false;
}
BeginTime = getNextTime(BeginTime, 60, "minute").substring(0, 10);
}else {
try {
Thread.sleep(1000 * 60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void sleep(int millisecond) {
try {
Thread.sleep(millisecond);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
(2)文件的追加
if (指定文件存在) {
JavaPairRDD<String, String> old =jsc.textFile(旧文件路径).filter(new Function<String, Boolean>() {
@Override
public Boolean call(String v1) throws Exception {
return v1.split(",", -1).length == 2;
}
}).mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) throws Exception {
return new Tuple2<>(s.split(",", -1)[0], s.split(",", -1)[1]);
}
}).persist(StorageLevel.MEMORY_AND_DISK());
JavaRDD<String> result= 现有数据.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) throws Exception {
return new Tuple2<>(s.split(",", -1)[0], s.split(",", -1)[1]);
}
}).leftOuterJoin(old).filter(new Function<Tuple2<String, Tuple2<String, Optional<String>>>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Tuple2<String, Optional<String>>> v1) throws Exception {
return !v1._2()._2().isPresent();
}
}).map(new Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>() {
@Override
public String call(Tuple2<String, Tuple2<String, Optional<String>>> v1) throws Exception {
return v1._1 + "," + v1._2._1();
}
});
HdfsUtil.writeRddFile(result, "输出文件路径");
}else{
HdfsUtil.writeRddFile(现有数据, "输出文件路径");
}
(3)何时获取,何时关闭 1.获取配置文件(即最新时间)要放在循环内每次获取, 2.关闭JavaSparkContext和要在循环结束以后
|