IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark的RDD转换算子-key-value型-aggregateByKey、foldByKey、combineByKey、sortByKey、join、cogroup -> 正文阅读

[大数据]Spark的RDD转换算子-key-value型-aggregateByKey、foldByKey、combineByKey、sortByKey、join、cogroup

Spark的RDD转换算子-key-value型-aggregateByKey、foldByKey、combineByKey、sortByKey、join、cogroup

一、aggregateByKey

函数签名

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, 
  combOp: (U, U) => U): RDD[(K, U)] 

将数据根据不同的规则进行分区内计算和分区间计算

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) 
val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_) 

取出每个分区内相同 key 的最大值然后分区间相加

// TODO : 取出每个分区内相同 key 的最大值然后分区间相加 
// aggregateByKey 算子是函数柯里化,存在两个参数列表 
// 1. 第一个参数列表中的参数表示初始值 
// 2. 第二个参数列表中含有两个参数 
//    2.1 第一个参数表示分区内的计算规则 
//    2.2 第二个参数表示分区间的计算规则 
val rdd = 
    sc.makeRDD(List( 
        ("a",1),("a",2),("c",3), 
        ("b",4),("c",5),("c",6) 
    ),2) 
// 0:("a",1),("a",2),("c",3) => (a,10)(c,10) 
//                                         => (a,10)(b,10)(c,20) 
// 1:("b",4),("c",5),("c",6) => (b,10)(c,10) 
 
val resultRDD = 
    rdd.aggregateByKey(10)( 
        (x, y) => math.max(x,y), 
        (x, y) => x + y 
    ) 
 
resultRDD.collect().foreach(println)

考虑:

如果要求分区内和分区间的计算规则相同怎么办?

二、foldByKey

函数签名

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] 

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) 
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)

三、combineByKey

函数签名

def combineByKey[C]( 
  createCombiner: V => C, 
  mergeValue: (C, V) => C, 
  mergeCombiners: (C, C) => C): RDD[(K, C)] 

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

传入的3个参数:

第一个参数:将相同key的第一个数值进行结构转换

第二个参数:分区内的计算规则

第三个参数:分区间的计算规则

小练习:将数据 List((“a”, 88), (“b”, 95), (“a”, 91), (“b”, 93), (“a”, 95), (“b”, 98))求每个 key 的平均值

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), 
("a", 95), ("b", 98)) 

val input: RDD[(String, Int)] = sc.makeRDD(list, 2) 

val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey( 
    (_, 1), // 将相同key的第一个数值进行结构转换
    // 分区内的计算规则
    (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), 
	//分区间的计算规则
    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 
)

四、sortByKey

函数签名

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) 
  : RDD[(K, V)] 

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) 
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true) 
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false) 

五、join

函数签名

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] 

两个不同类型的数据源的数据,相同的key的value会连在一起,行成元组,如果两个数据源中key没有匹配上,那么数据不会出现在结果中。如果两个数据源中的key有多个相同的,会一次匹配,出现笛卡尔乘积,数据量几何增长,会有内存风险

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"))) 
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) 
rdd.join(rdd1).collect().foreach(println) 

扩展:

lefuOuterJoin和rightOuterJoin

上面两种就是左连接和右连接,无匹配的时候就是None

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) 
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) 
 
//左连接
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2) 

六、cogroup

函数签名

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] 

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3))) 
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3))) 

val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =  

dataRDD1.cogroup(dataRDD2)
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-31 15:30:56  更:2021-08-31 15:32:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:48:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码