| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> spark05 -> 正文阅读 |
|
[大数据]spark05 |
自定义排序spark中对简单的数据类型可以直接排序,但是对于一些复杂的条件加以利用自定义排序实现 键值对rdd数据分区spark目前支持hash分区和range分区,用户也可以自定义分区,hash分区为当前的默认分区,spark中分区器直接决定了rdd中分区的个数,rdd中每条数据经过shuffle过程属于那个分区和reduce的个数 hashpartitioner spark中非常重要的一个分区器,也是默认分区器,默认用于90%以上的rdd相关api上 功能:依据rdd中的key值的hashcode的值将数据取模后得到该key值对应的下一个rdd的分区id值,支持key值为null的情况,当key为null的时候返回0;该分区器基本上适合所有rdd数据类型进行分区操作,但是需要注意的是,由于java中数组的hashcode基本数组对象本身的,不是基于数组内容的,所以如果rdd的key 是数组类型,那么可能导致数据类容一致的数据key没法分配到一同一个rdd分区中,这个时候最好自定义分区器,采用数组内容进行分区或者将数组的内容转换为集合, 对于给定的key,计算hashcode ,并用于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的的值就是这个key所属的分区ID rangepartitioner sparkcore中除了hashpartitioner分区器外另一个比较重要的已经实现的分区器,主要用于rdd的数据排序相关API中 比如sortByKey底层使用的数据分区器就是rangepartitioner分区器;该分区实现方式主要是通过两个步骤来实现,第一步,先从整个rdd中抽取出样本数据,将样本数据排序,计算每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;第二步:判断key在rangebounds中所处的范围,给出key值在下一个rdd中的分区id下标,;分区器要求rdd中的key类型必须是可以排序的, 将一定范围内的数映射到一个分区内,在实现中,分界(rangebounds)算法尤为重要,用到了水塘抽样算法 其实rangepartition的重点是在于构建rangebounds数组对象,主要步骤是: 1.如果分区数量小于2或者rdd中不存在数据的情况下,直接返回一个空的数组,不需要计算range的边界,如果翻去数据大于1的情况下,而且rdd中有数据的情况下,才需要计算数据对象, 2.计算总体数据抽象大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1M的数据量 3.根据sampleSize和分区数量计算每个分区的数据抽样样本数量sampleSizePrePartition 4.调用RangeParititioner的sketch函数进行数据抽样,计算拿出每个分区的样本, 5.计算样本的整体占比以及数据量过多的数据分区,防止数据倾斜, 6.对于数据量比较多的rdd分区调用rdd的sample函数api重新进行数据抽取 7.将最终的样本数据通过rangepartitioner的determinebounds函数进行数据排序分配,计算出rangebounds rangepartitioner的sketch函数的作用是对rdd的数据uanzhoa需要的样本数据进行抽取,主要嗲用samplingUtils类的reservoirSampleAndCount方法对每个分区进行数据抽取,抽取后计算出整体所有分区的数量大小;reservoirSampleAndCount方法的抽取方式是先从迭代器中获取样本数量个数据(顺序获取),然后对剩余的数据进行判断,替换之前的样本数据,最终达到数据抽样的效果 总结: 一般而言,使用默认的hashpartitioner即可,rangepartitioner的使用有一定的局限性 自定义分区spark内部提供了hashpartitioner和rangepartitioner两种分区策略,这两种很多情况都是和我们的场景,但是有些情况,spark内部不能符合我们的需求,这时候我们需要自定义分区策略 实现自定义分区器需要继承org.apache.spark.Partitioner类,并实现三个方法 numPartitions:Int:返回创建出来的分区数 getPartition(key:Any):Int:返回给定键的分区编号(0到numPartitions-1) equals():java判断相等性的标准方法,这个方法非常重要,spark需要用到这个方法来检查你的分区器对象是否和其他的分区实例相同,这样的spark才可以判断两个rdd的分区是否相同 总结: 1.分区主要面对kv结构数据,spark内部提供了两个比较重要的分区器hash分区器和range分区器 2.hash分区主要通过key的hashcode来对分区数求余,hash分区可能导致数据切斜问题,range分区是通过水塘抽象的算法来将数据均匀的分配到各个分区中, 3.自定义分区主要通过继承partitioner抽象类来实现,必须实现两个方法: numPartitions和getPartition(key:Any) Accumulator累加器累加器用来对信息进行聚合,通常在向spark传递函数时,比如使用map()函数或者对filter()传条件时,可以使用驱动程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量,如果我们想实现所有的分片处理是跟心共享变量的功能,name累加器可以实现我们想要的结果 1.spark提供了一个默认的累加器,只能用于求和, 2.如何使用: 2.1.通过sparkcontext对象.accumulator(0) var sum = sc.accumulator(0) 通过accumulator声明一个累加器,0为初始化的值 2.2通过转换或者行动操作,通过sum+=n来使用 2.3如何取值,在driver程序中,通过sum.value来获取值 3.累加器是懒执行,需要行动触发 总结: 1.累加器的创建 1.1创建一个累加器的实例 1.2通过sc.register()注册一个累加器 1.3通过累加器实例名.add来添加数据 1.4通过累加器实例名.value来获取累加器的值 2.最好不要在转换操作中访问累加器(因为血缘系统的关系可能执行多次),最好在行动操作中访问 作用: 1能够精确统计数据的各种数据,例如: 可以统计出符合userID的记录数,在同一个时间段内产生了多少次购买,可以使用etl进行数据清洗,并使员工accumulator来进行数据的统计 2.作为调试工具,能够观察每个task的信息,通过累加器可以子sparkUI观察到每个task所处理的记录数 broadcast广播变量广播变量用来高效分发较大的数据对象,向所有工作节点发送一个较大的只读值,以供一个或多个spark操作使用,比如,你的应用需要向所有的节点发送一个较大 的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起得来都很顺手, 广播变量的好处,不是每个task一份变量副本,而是变成每个节点的executor才一份副本,这样的话,就可以让变量 副本大大减少 task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的executor对应的blockmanager中, 尝试获取变量副本,如果本地没有,那么就从driver远程拉取变量副本,并保存在本地的blockmanager中, 此后这个executor上的task都会直接使用本地的blockmanager中的副本 HttpBroadcast TorrentBroadcast(默认) BlockManager 管理某个executor对应的内存你和磁盘上的数据,尝试在本地的blockmanager中找map 总结: 广播变量的过程如下, 1)通过对一个类型T 的对象调用sparkContext.broadcast创建出一个broadcast[T]对象,任何可序列化的类型都可以实现, 2)通过value属性访问该对象的值(在java中为value()方法) 3)变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点) 能不能将rdd使用广播广播出去? 不能因为rdd是不存储数据的,可以将rdd的结果广播出去 广播只能定义在driver端,不能在executor端定义 广播变量的好处:对性能有提升广播变量会比数据在集群中处理减少更多的内存,更少的网络传输的消耗 spark文本文件的输入输出1文本文件的输入输出 textfile进行文本文件的读取 :如果传输目录,则将目录下的所有文件都读作rdd saveAsTextfile进行文本文件的输出 将传入的路径作为目录对待,会在那个目录下输出多个文件 json文件输入输出 json文件中每一行就是一个json记录,那么可以通过json文件当做文本文件来读取,,然后利用相关的json库对每一条数据进行json解析 需要在程序中进行手动编码和解码 csv文件输入输出 读取csv数据和读取json数据相似,都需要将文件当做文本文件来读取数据,然后通过将每行进行进行解析实现对csv的读取 sequenceFile文件输入输出 sequenceFile文件是hadoop用来存储二进制形式的kv对设计 saveassequencefile存储文件数据 可以直接调用saveassequencefile(path)保存pairrdd他会帮你写出数据,需要键和值能自动转换为writable类型 总结: spark整个生态系统与hadoop是完全兼容的,所以对于hadoop所支持的文件类型,或者数据库类型,spark也同样支持,另外,由于hadoop的api有新旧两个版本,所以spark为了能够兼容hadoop所有的版本,也提供了 两套创建操作接口,对于外部存储创建操作而言,hadooprdd和newhadooprdd是最为抽象的两个函数接口,主要包含四个参数 1.输入格式(inputformat);制定数据的类型,如textinputformat等 新旧两个版本所引用的版本分别是 org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat) 2.键类型:指定[k,v]键值对中的k的类型 3.值类型:指定[k,v]键值对中的v的类型 4.分区值:指定有外部存储生成的rdd的partition数量的最小值,如果没有指定,系统会使用默认值,defaultMinSplits jdbcrdd spark提供了 一个rdd来处理对jdbc的连接,但是,十分的鸡肋,这个rdd只能进行查询,不能增删改,很少用,而且必须是查询范围内的sql sparkcore总结1.基础概念rdd分布式弹性数据集 1.1rdd可以看做是一些列的partition所组成 1.2rdd之间有依赖关系 1.3算子是作用在partition上的 1.4分区器是作用在kv形式的rdd上的 1.5partition提供的最佳计算位置,利于数据处理的本地化,计算量移动,而不是移动数据 2.rdd算子分为两个类型,transformation和action,常用算子32个 3.rdd的窄依赖关系的和血统 4.rdd缓存方式cache和checkpoint 5.rdd的dag和stage 6.自定义分区,自定义累加器,广播变量,文件读取和输出 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/18 20:56:37- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |