IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark KV类型算子案例详解二 -> 正文阅读

[大数据]Spark KV类型算子案例详解二

6.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object _06TestAggregateByKey_exercise {
? ? def main(args: Array[String]): Unit = {
? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]")
? ? ? ? val sc = new SparkContext(conf)
? ? ? ? /**
? ? ? ? ?* 使用aggreateByKey计算每个key出现的次数,与value之和 ? 从而可以计算平均值
? ? ? ? ?*/
? ? ? ? val rdd1: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)

? ? ? ? /**
? ? ? ? ?* 从需求分析中,可知,返回的数据应该是次数与value和,那么能存这样的数据,元组是比较合适的
? ? ? ? ?*/
? ? ? ? val result: RDD[(String, (Int, Int))] = rdd1.aggregateByKey((0, 0))(
? ? ? ? ? ? (x, y) => (x._1 + 1, x._2 + y),
? ? ? ? ? ? (x, y) => (x._1 + y._1, x._2 + y._2)
? ? ? ? )
? ? ? ? //继续求平均值
? ? ? ? val result1: RDD[(String, Double)] = result.map(x => {
? ? ? ? ? ? var t = x._2
? ? ? ? ? ? var avg = t._2 / t._1.toDouble
? ? ? ? ? ? (x._1, avg)
? ? ? ? })


? ? ? ? result1.collect().foreach(println)
? ? ? ? // (b,4.0)
? ? ? ? //(a,1.6666666666666667)
? ? }
}


7.
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

//作用: 将kv对形式的RDD的v映射成别的类型
object _07MapValueDemo {
? ? def main(args: Array[String]): Unit = {
? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]")
? ? ? ? val sc = new SparkContext(conf)

? ? ? ? val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)

? ? ? ? //需求,按key先分组 ,再求和
? ? ? ? val value: RDD[(String, Iterable[Int])] = rdd.groupByKey()
? ? ? ? val value1: RDD[(String, Int)] = value.mapValues(_.sum)
? ? ? ? value1.collect().foreach(println)
? ? ? ? //(b,12)
? ? ? ? //(a,5)

? ? ? ? println("*****************")
? ? ? ? //将rdd*10进行输出
? ? ? ? val value2: RDD[(String, Int)] = rdd.mapValues(_ * 10)
? ? ? ? value2.collect().foreach(println)
? ? ? ? //(a,10)
? ? ? ? //(a,20)
? ? ? ? //(b,30)
? ? ? ? //(a,20)
? ? ? ? //(b,40)
? ? ? ? //(b,50)
? ? }

}


8.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
// ? ? ? ? * ? 第一个参数:就是第一个value的转换操作,使之当成默认值
// ? ? ? ? * ? 第二个参数:用于指定分区内的计算逻辑:
// ? ? ? ? * ? 第三个参数:用于指定分区间的计算逻辑
object _08ConbineByKeyDemo {
? ? def main(args: Array[String]): Unit = {
? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]")
? ? ? ? val sc = new SparkContext(conf)

? ? ? ? val rdd: RDD[(String,Int)] = sc.makeRDD(List(("a",1), ("a",2),("b",3), ("a",2), ("b",4),("b",5)), 2)
? ? ? ? // ?由于第一个参数是一个函数,而不是一个普通的值,因此对于其他两个参数来说,是动态获取的,那么应该指定一下
? ? ? ? val result: RDD[(String, Int)] = rdd.combineByKey(x => x, (x: Int, y: Int) => math.max(x, y), (x: Int, y: Int) => x + y)
? ? ? ? result.collect().foreach(println)
? ? ? ? sc.stop()
? ? ? ? // (b,8)
? ? ? ? //(a,4)


? ? }
}


9.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object _09ReduceAggregateFlodCombineByKeyDemo {
? ? ? ? def main(args: Array[String]): Unit = {
? ? ? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]")
? ? ? ? ? ? val sc = new SparkContext(conf)

? ? ? ? ? ? val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("a", 2), ("b", 4), ("b", 5)), 2)

? ? ? ? ? ? rdd.reduceByKey(_+_)
? ? ? ? ? ? rdd.aggregateByKey(0)(_+_,_+_)
? ? ? ? ? ? rdd.foldByKey(0)(_+_)
? ? ? ? ? ? val value: RDD[(String, Int)] = rdd.combineByKey(x=>x, (x: Int, y: Int) => math.max(x, y), (x: Int, y: Int) => x + y)
? ? ? ? ? ? value.collect().foreach(println)

? ? ? ? ? ? //(b,8)
? ? ? ? ? ? //(a,4)
? ? ? ? }
? ? }


10.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object _10JoinDemo {
? ? ? ? def main(args: Array[String]): Unit = {
? ? ? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]")
? ? ? ? ? ? val sc = new SparkContext(conf)

? ? ? ? ? ? val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 4),("d",5)), 2)
? ? ? ? ? ? val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 6), ("b", 7), ("c", 8),("e",9)), 2)
? ? ? ? ? ? /**
? ? ? ? ? ? ?* join/leftOuterJoin/rightOuterJoin算子
? ? ? ? ? ? ?* 作用:就是让两个pairRDD进行内连接/左外连接/右外连接
? ? ? ? ? ? ?* ? ? ?通过key连接
? ? ? ? ? ? ?*
? ? ? ? ? ? ?*/
? ? ? ? ? ? val value1: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
? ? ? ? ? ? val value2: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
? ? ? ? ? ? val value3: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)
? ? ? ? ? ? value1.collect().foreach(println)
? ? ? ? ? ? println("----------------------------")
? ? ? ? ? ? value2.collect().foreach(println)
? ? ? ? ? ? println("----------------------------")
? ? ? ? ? ? value3.collect().foreach(println)

? ? ? ? ? ? //(b,(3,6))
? ? ? ? ? ? //(b,(3,7))
? ? ? ? ? ? //(a,(1,5))
? ? ? ? ? ? //(a,(2,5))
? ? ? ? ? ? //(c,(4,8))
? ? ? ? ? ? //----------------------------
? ? ? ? ? ? //(d,(5,None))
? ? ? ? ? ? //(b,(3,Some(6)))
? ? ? ? ? ? //(b,(3,Some(7)))
? ? ? ? ? ? //(a,(1,Some(5)))
? ? ? ? ? ? //(a,(2,Some(5)))
? ? ? ? ? ? //(c,(4,Some(8)))
? ? ? ? ? ? //----------------------------
? ? ? ? ? ? //(b,(Some(3),6))
? ? ? ? ? ? //(b,(Some(3),7))
? ? ? ? ? ? //(e,(None,9))
? ? ? ? ? ? //(a,(Some(1),5))
? ? ? ? ? ? //(a,(Some(2),5))
? ? ? ? ? ? //(c,(Some(4),8))
? ? ? ? }
? ? }

11.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object _11CogroupDemo {
? ? ? ? def main(args: Array[String]): Unit = {
? ? ? ? ? ? val conf = new SparkConf().setAppName("test").setMaster("local[*]")
? ? ? ? ? ? val sc = new SparkContext(conf)

? ? ? ? ? ? val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 4),("d",5)), 2)
? ? ? ? ? ? val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 5), ("b", 6), ("b", 7), ("c", 8),("e",9)), 2)

? ? ? ? ? ? /**
? ? ? ? ? ? ?*
? ? ? ? ? ? ?* cogroup:
? ? ? ? ? ? ?*
? ? ? ? ? ? ?* 作用:
? ? ? ? ? ? ?* ? ?相当于两个rdd先各自分组(groupByKey),再进行全外jion。
? ? ? ? ? ? ?* ? ?参考输出结果
? ? ? ? ? ? ?*
? ? ? ? ? ? ?* (a,(List(1,2),List(5)))
? ? ? ? ? ? ?* (b,(List(3),List(6,7)))
? ? ? ? ? ? ?* (c,(List(4),List(8)))
? ? ? ? ? ? ?*
? ? ? ? ? ? ?*/
? ? ? ? ? ? val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
? ? ? ? ? ? value.collect().foreach(println)
? ? ? ? ? ? // (d,(CompactBuffer(5),CompactBuffer()))
? ? ? ? ? ? //(b,(CompactBuffer(3),CompactBuffer(6, 7)))
? ? ? ? ? ? //(e,(CompactBuffer(),CompactBuffer(9)))
? ? ? ? ? ? //(a,(CompactBuffer(1, 2),CompactBuffer(5)))
? ? ? ? ? ? //(c,(CompactBuffer(4),CompactBuffer(8)))
? ? ? ? }
? ? }

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-21 12:15:18  更:2021-10-21 12:15:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 6:03:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码