Spark常用算子分析与应用
1 Value型转换算子
? 1) map o 类比于mapreduce中的map操作,给定一个输入通过map函数映到成一个新的元素输出 ? case_1 val first = sc.parallelize(List(“Hello”,“World”,“哈哈哈”,“大数据”),2) val second= first.map(_.length) second.collect ? case_2 val first = sc.parallelize(1 to 5,2) first.map(1 to _).collect ? 2) flatMap o 给定一个输入,将返回的所有结果打平成一个一维集合结构 ? case_1 val first = sc.parallelize(1 to 5,2) first.flatMap(1 to _).collect ? case_2 val first = sc.parallelize(List(“one”,“two”,“three”),2) first.flatMap(x => List(x,x,x)).collect ? case_3 val first = sc.parallelize(List(“one”,“two”,“three”),2) first.flatMap(x => List(x+"_1",x+"_2",x+"_3")).collect ? 3) mapPartitions o 以分区为单位进行计算处理,而map是以每个元素为单位进行计算处理。 o 当在map过程中需要频繁创建额外对象时,如文件输出流操作、jdbc操作、Socket操作等时,当用mapPartitions算子 ? case_1 val rdd=sc.parallelize(Seq(1,2,3,4,5),3) var rdd2=rdd.mapPartitions(partition=>{ //在此处加入jdbc等一次初始化多次使用的代码 partition.map(num => num * num) } ) rdd2.max ? case_2 val rdd=sc.parallelize(Seq(1,2,3,4,5),3) var rdd2=rdd.mapPartitions(partition=>{ //在此处加入jdbc等一次初始化多次使用的代码 partition.flatMap(1 to ) } ) rdd2.count ? 4) glom 以分区为单位,将每个分区的值形成一个数组 val a = sc.parallelize(Seq(“one”,“two”,“three”,“four”,“five”,“six”,“seven”),3) a.glom.collect ? 5) union算子 o 将两个RDD合并成一个RDD,并不去重 o 会发生多分区合并成一个分区的情况 val a = sc.parallelize(1 to 4, 2) val b = sc.parallelize(3 to 6, 2) a.union(b).collect (a ++ b).collect (a union b).collect ? 6) groupBy算子 输入分区与输出分区多对多型 val a = sc.parallelize(Seq(1,3,4,5,9,100,200), 3) a.groupBy(x => { if (x > 10) “>10” else “<=10” }).collect ? 7) filter算子 输出分区为输入分区子集型 val a = sc.parallelize(1 to 21, 3) val b = a.filter( % 4 == 0) b.collect ? 8 ) distinct算子 输出分区为输入分区子集型,全局去重 ? case_1 val a = sc.parallelize(1 to 4, 2) val b = sc.parallelize(3 to 6, 2) a.union(b).distinct().collect ? case_2 val c = sc.parallelize(List(“张三”, “李四”, “李四”, “王五”), 2) c.distinct.collect ? 9) cache算子 o cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能。 o 主要应用在当RDD数据反复被使用的场景下 ? case_1 val a = sc.parallelize(1 to 4, 2) val b = sc.parallelize(3 to 6, 2)
a.union(b).count a.union(b).distinct().collect ? case_2 val a = sc.parallelize(1 to 4, 2) val b = sc.parallelize(3 to 6, 2) val c=a.union(b).cache c.count c.distinct().collect
2 Key-Value型转换算子
其处理的数据是Key-Value型 ? 1)mapValues算子 o 输入分区与输出分区一对一 o 针对(Key,Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。 val first = sc.parallelize(List((“张一”,1),(“张二”,2),(“张三”,3),(“张四”,4)),2) val second= first.mapValues(x=>x+1) second.collect ? 2)combineByKey算子 o 定义 def combineByKey[C]( createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(String, C)] ? createCombiner:对每个分区内的同组元素如何聚合,形成一个累加器 ? mergeValue:将前边的累加器与新遇到的值进行合并的方法 ? mergeCombiners:每个分区都是独立处理,故同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,用方法将各个分区的结果进行合并。 o case_1 val first = sc.parallelize(List((“张一”,1),(“李一”,1),(“张一”,2),(“张一”,3),(“李一”,3),(“李三”,3),(“张四”,4)),2) val second= first.combineByKey(List(), (x:List[Int], y:Int) => y :: x, (x:List[Int], y:List[Int]) => x ::: y) second.collect ? 3)reduceByKey算子 o 按key聚合后对组进行归约处理,如求和、连接等操作 val first = sc.parallelize(List(“小米”, “华为”, “小米”, “小米”, “华为”, “苹果”), 2) val second = first.map(x => (x, 1)) second.reduceByKey( + _).collect ? 4)join算子 o 对Key-Value结构的RDD进行按Key的join操作,最后将V部分做flatmap打平操作。 val first = sc.parallelize(List((“张一”,11),(“李二”,12)),2) val second = sc.parallelize(List((“张一”,21),(“李二”,22),(“王五”,23)),2) first.join(second).collect
3 行动算子(Action)
? 此种算子会触发SparkContext提交作业。触发了RDD DAG 的执行。 ? 1) 无输出型:不落地到文件或是hdfs的作用 o foreach算子 val first = sc.parallelize(List(“小米”, “华为”, “小米”, “小米”, “华为”, “苹果”), 2) first.foreach(println ) ? 2) HDFS输出型 o saveAsTextFile算子 val first = sc.parallelize(List(“小米”, “华为”, “小米”, “小米”, “华为”, “苹果”), 2) //指定本地保存的目录 first.saveAsTextFile(“file:///home/spark/text”) //指定hdfs保存的目录,默认路径是hdfs系统中/user/当前用户路径下 first.saveAsTextFile(“spark_shell_output_1”) ? Scala集合和数据类型 o 3) collect算子 ? 相当于toArray操作,将分布式RDD返回成为一个scala array数组结果,实际是Driver所在的机器节点,再针对该结果操作 val first = sc.parallelize(List(“小米”, “华为”, “小米”, “小米”, “华为”, “苹果”), 2) first.collect ? 4) collectAsMap算子 o 相当于toMap操作,将分布式RDD的kv对形式返回成为一个的scala map集合,实际是Driver所在的机器节点,再针对该结果操作 val first = sc.parallelize(List((“张一”,1),(“李一”,1),(“张一”,2),(“张一”,3),(“李一”,3),(“李三”,3),(“张四”,4)),2) first.collectAsMap ? 5)lookup算子 o 对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。 val first = sc.parallelize(List(“小米”, “华为”, “华米”, “大米”, “苹果”,“米老鼠”), 2) val second=first.map(x=>({if(x.contains(“米”)) “有米” else “无米”},x)) second.lookup(“有米”) ? 6) reduce算子 o 先对两个元素进行reduce函数操作,然后将结果和迭代器取出的下一个元素进行reduce函数操作,直到迭代器遍历完所有元素,得到最后结果。 //求value型列表的和 val a = sc.parallelize(1 to 10, 2) a.reduce( + ) //求key-value型列表的value的和 val a = sc.parallelize(List((“one”,1),(“two”,2),(“three”,3),(“four”,4)), 2) a.reduce((x,y)=>(“sum”,x.2 + y.2)).2 ? 7) fold算子 o fold算子签名: def fold(zeroValue: T)(op: (T, T) => T): T o 其实就是先对rdd分区的每一个分区进行op函数,在调用op函数过程中将zeroValue参与计算,最后在对所有分区的结果调用op函数,同理此处zeroValue再次参与计算。 //和是41,公式=(1+2+3+4+5+6+10)+10 sc.parallelize(List(1, 2, 3, 4, 5, 6), 1).fold(10)(+) //和是51,公式=(1+2+3+10)+(4+5+6+10)+10=51 sc.parallelize(List(1, 2, 3, 4, 5, 6), 2).fold(10)(+) //和是61,公式=(1+2+10)+(3+4+10)+(5+6+10)+10=61 sc.parallelize(List(1, 2, 3, 4, 5, 6), 3).fold(10)(+)
|