IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 使用Java编写Spark Streaming来做大数据处理(二) -> 正文阅读

[大数据]使用Java编写Spark Streaming来做大数据处理(二)

本篇将从实际编程中遇到的一些问题,总结的部分经验来理解spark

一、spark提交脚本的参数选择

注:仅从实际常用需求阐述,并不包含所有参数
(1)程序跑的太慢
相信每个人最容易碰到的情况就是资源分配不够,程序迟迟运行不完,而此时只能苦苦等待(不是),所以我们在分配资源时一定要合理,个人认为如果资源充足,就多分配一些
举个例子,我们知道,一般默认一个核运行两到三个task,一个task一般几百兆到一个G,所以默认一个核一般跑2-3个G

--driver-memory 4G --executor-memory 40g --num-executors 8 --executor-cores 10 \

所以我们对于要处理的数据大小和其中的join等操作次数要有了解,进而才能根据数据大小合理的分配资源
假设我们要处理的csv文件为800M,如果join另一个800M的文件,那么就是64G.而一个核运行2-3G,所以这里分配60核就可以
(2)程序内容已经跑出来,但是迟迟不结束
查看日志后发现,执行程序超时
这里需要修改的参数为

--conf spark.executor.heartbeatInterval=540s \

之前太高,现在调整成60

二、使程序一直跑下去

(1)如何一直跑
1.既然要一直跑,也就是说需要反复调用程序,也就是说要使用循环,这里我们使用while循环来完成

boolean flag = true;
while (flag) {

}

2.然后为了让程序能够按照我们需要的循环时间来进行,我们传入两个参数,分别代表开始时间和结束时间

String BeginTime = args[0];
String EndTime = args[1];

3.此时我们可以简单的进行循环,然后BeginTime 递增,

   // 获取下一个时间点,更新BeginTime
           此处代码部分不做示范

4.但是由于一些需求它要求读取按小时为单位的数据(例如从hive数据库中读取),此时我们需要保证所读取的数据是完整的一个小时的,而非未满一个小时的数据,因此,这里我们先获取到一个当前时间

//此处读取一个配置文件,其中包含了当前数据库更新的最新时间
 String[] quotaTime = Util.getConf("/lujing/conf.csv");

采用一个if判断(此处判断执行时间BeginTime与当前时间差距70以上)

if (DateUtils.getMinBetweenDate(quotaTime[0], BeginTime) > 70) {2)中的内容放在这里
//当开始时间等于结束时间,那么循环结束
 if (BeginTime==EndTime){
                    flag=false;
                     }
     //根据需求更新下一执行时间,(分钟,小时,天,月),下方为示例代码,具体方法不做说明
	BeginTime = getNextTime(BeginTime, 60, "minute").substring(0, 10);
}else {
	//如果还未满足条件,那么休眠一分钟
                try {
                    Thread.sleep(1000 * 60);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

    /**
     * 睡眠
     *
     * @param millisecond
     */
    public static void sleep(int millisecond) {
        try {
            Thread.sleep(millisecond);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

(2)文件的追加

if (指定文件存在) {
//读取旧文件获取
JavaPairRDD<String, String> old =jsc.textFile(旧文件路径).filter(new Function<String, Boolean>() {
                        @Override
                        public Boolean call(String v1) throws Exception {
                            return v1.split(",", -1).length == 2;
                        }
                    }).mapToPair(new PairFunction<String, String, String>() {
                        @Override
                        public Tuple2<String, String> call(String s) throws Exception {
                            return new Tuple2<>(s.split(",", -1)[0], s.split(",", -1)[1]);
                        }
                    }).persist(StorageLevel.MEMORY_AND_DISK());
//现有数据和旧文件中的数据join取差集以追加
 JavaRDD<String> result= 现有数据.mapToPair(new PairFunction<String, String, String>() {
                        @Override
                        //获取数据饼返回键值对(二元组类型)
                        public Tuple2<String, String> call(String s) throws Exception {
                            return new Tuple2<>(s.split(",", -1)[0], s.split(",", -1)[1]);
                        }
                    }).leftOuterJoin(old).filter(new Function<Tuple2<String, Tuple2<String, Optional<String>>>, Boolean>() {
                        @Override
                        //过滤掉没有匹配到的,取差集
                        public Boolean call(Tuple2<String, Tuple2<String, Optional<String>>> v1) throws Exception {
                            return !v1._2()._2().isPresent();
                        }
                    }).map(new Function<Tuple2<String, Tuple2<String, Optional<String>>>, String>() {
                        @Override
                        //返回Sring类型的结果
                        public String call(Tuple2<String, Tuple2<String, Optional<String>>> v1) throws Exception {
                            return v1._1 + "," + v1._2._1();

                        }
                    });
                    HdfsUtil.writeRddFile(result, "输出文件路径");
}else{
 HdfsUtil.writeRddFile(现有数据, "输出文件路径");
}

(3)何时获取,何时关闭
1.获取配置文件(即最新时间)要放在循环内每次获取,
2.关闭JavaSparkContext和要在循环结束以后

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-25 12:16:45  更:2021-08-25 12:18:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:14:15-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码