combineByKey
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(List(("a", 1), ("b", 1), ("c", 1), ("a", 1)), 2)
val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
v => (v, 1),
(t: (Int, Int), v) => (t._1 + v, t._2 + 1),
(t1:(Int,Int), t2:(Int,Int)) => (t1._1 + t2._1, t1._2 + t2._2)
)
newRDD.collect().foreach(println)
sc.stop()
}
join
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("d",7)))
val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6),("a",8)))
val joinRDD = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
}
leftOuterJoin
类似于 SQL 语句的左外连接
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5)))
val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
leftJoinRDD.collect().foreach(println)
}
cogroup
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5)))
val cogroupRDD = rdd1.cogroup(rdd2)
cogroupRDD.collect().foreach(println)
}
实现统计每个省份的广告阅读量
package operator.transform
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Rep {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val dataRDD: RDD[String] = sc.textFile("datas/agent.log")
val mapRDD: RDD[((String, String), Int)] = dataRDD.map(
line => {
val datas: Array[String] = line.split(" ")
((datas(1), datas(4)), 1)
}
)
val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)
val newMapRDD: RDD[(String, (String, Int))] = reduceRDD.map {
case ((prv, ad), sum) => {
(prv, (ad, sum))
}
}
val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()
val resultRDD = groupRDD.mapValues(
iter => iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
)
resultRDD.collect().foreach(println)
}
}
转换算子
reduce
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
collect
在驱动程序中,以数组 Array 的形式返回数据集的所有元素
count
返回 RDD 中元素的个数
first
返回 RDD 中的第一个元素
take
返回一个由 RDD 的前 n 个元素组成的数组
takeOrdered
返回该 RDD 排序后的前 n 个元素组成的数组
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] =sc.makeRDD(List(1,2,3,4))
val i = rdd.reduce(_ + _)
println(i)
val ints=rdd.collect()
println(ints.mkString(","))
val first=rdd.first()
println(first)
val take=rdd.take(3);
println(take.mkString(","))
val takeOrdered = rdd.takeOrdered(3)
println(takeOrdered.mkString(","))
sc.stop()
}
aggregate
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] =sc.makeRDD(List(1,2,3,4),2)
val result=rdd.aggregate(10)(_+_,_+_)
println(result)
sc.stop()
}
fold
折叠操作,aggregate 的简化版操作
countByValue
统计每种 key 的个数
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] =sc.makeRDD(List(1,2,3,4),2)
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
println(intToLong)
sc.stop()
}
countByKey
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val rdd:RDD[(String,Int)]=sc.makeRDD(List(("a",1),("a",2),("a",3)))
val intToLong: collection.Map[String, Long] = rdd.countByKey()
println(intToLong)
sc.stop()
}
不同方法实现wordCount
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
wordCount8(sc)
sc.stop()
}
def wordCount1(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val group: RDD[(String, Iterable[String])] = words.groupBy(word => word)
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
def wordCount2(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val group = wordOne.groupByKey()
val wordCount: RDD[(String, Int)] = group.mapValues(iter => iter.size)
}
def wordCount3(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne = words.map((_, 1))
val wordCOunt = wordOne.aggregateByKey(0)(_ + _, _ + _)
}
def wordCount4(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne: RDD[(String, Int)] = words.map((_, 1))
val countWord: RDD[(String, Int)] = wordOne.reduceByKey(_ + _)
}
def wordCount5(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne: RDD[(String, Int)] = words.map((_, 1))
val wordCount: RDD[(String, Int)] = wordOne.combineByKey(
v => v,
(x: Int, y) => x + y,
(x: Int, y: Int) => x + y
)
}
def wordCount6(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordOne: RDD[(String, Int)] = words.map((_, 1))
val countWord: collection.Map[String, Long] = wordOne.countByKey()
}
def wordCount7(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val stringToLong: collection.Map[String, Long] = words.countByValue()
}
def wordCount8(sc: SparkContext): Unit = {
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
val words: RDD[String] = rdd.flatMap(_.split(" "))
val mapWord = words.map(
word => {
mutable.Map[String, Long]((word, 1))
}
)
val wordCount = mapWord.reduce(
(map1, map2) => {
map2.foreach({
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount)
}
})
map1
}
)
println(wordCount)
}
save
将数据保存到不同格式的文件中
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.map((_,1)).saveAsSequenceFile("output2")
foreach
分布式遍历 RDD 中的每一个元素,调用指定函数
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
rdd.collect().foreach(println)
println("******************")
rdd.foreach(println)
sc.stop()
}
RDD 序列化
从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就 形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列 化,这个操作我们称之为闭包检测
package operator.transform.action
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object serializable02_function {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new
SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark",
"hive", "atguigu"))
val search = new Search("hello")
search.getMatch1(rdd).collect().foreach(println)
search.getMatch2(rdd).collect().foreach(println)
sc.stop()
}
}
class Search(query: String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
class User extends Serializable {
val age: Int = 30
}
def getMatch2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
}
后缀表达式
中缀转后缀表达式:
1)初始化两个栈:运算符栈s1和储存中间结果的栈s2;
2)从左至右扫描中缀表达式;
3)遇到操作数时,将其压s2;
4)遇到运算符时,比较其与s1栈顶运算符的优先级:
? (1)如果s1为空,或栈顶运算符为左括号“(”,则直接将此运算符入栈;
? (2)否则,若优先级比栈顶运算符的高,也将运算符压入s1;
? 否则,将s1栈顶的运算符弹出并压入到s2中,再次转到(4-1)与s1中新的栈顶运算符相比较;
5)遇到括号时: (1) 如果是左括号“(”,则直接压入s1 (2) 如果是右括号“)”,则依次弹出s1栈顶的运算符,并压入s2,直到遇到左括号为止,此时将这一对 括号丢弃
6)重复步骤2至5,直到表达式的最右边
7)将s1中剩余的运算符依次弹出并压入s2
8)依次弹出s2中的元素并输出,结果的逆序即为中缀表达式对应的后缀表达式
package stack;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Stack;
public class PolandNotation {
public static void main(String[] args) {
String expression = "1+((2+3)*4)-5";
List<String> infixExpressionList = toInfixExpressionList(expression);
System.out.println(expression+"="+calculator(parseSuffixExpressionList(infixExpressionList)));
}
public static List<String> toInfixExpressionList(String s) {
List<String> ls = new ArrayList<>();
int i = 0;
String str;
char c;
do {
if ((c = s.charAt(i)) < 48 || (c = s.charAt(i)) > 57) {
ls.add("" + c);
i++;
} else {
str = "";
while (i < s.length() && (c = s.charAt(i)) >= 48 && (c = s.charAt(i)) <= 57) {
str += c;
i++;
}
ls.add(str);
}
} while (i < s.length());
return ls;
}
public static List<String> parseSuffixExpressionList(List<String> ls) {
Stack<String> s1 = new Stack<String>();
List<String> s2 = new ArrayList<>();
for (String item : ls) {
if (item.matches("\\d+")) {
s2.add(item);
} else if (item.equals("(")) {
s1.push(item);
} else if (item.equals(")")) {
while (!s1.peek().equals("(")) {
s2.add(s1.pop());
}
s1.pop();
} else {
while (s1.size() != 0 && Operation.getValue(s1.peek()) >= Operation.getValue(item)) {
s2.add(s1.pop());
}
s1.push(item);
}
}
while(s1.size()!=0){
s2.add(s1.pop());
}
return s2;
}
@Test
public void test1() {
String suffixExpression = "3 4 + 5 * 6 -";
List<String> rpnList = getListString(suffixExpression);
int res = calculator(rpnList);
System.out.println(res);
}
public static List<String> getListString(String suffixExpression) {
String[] split = suffixExpression.split(" ");
return new ArrayList<String>(Arrays.asList(split));
}
public static int calculator(List<String> ls) {
Stack<String> stack = new Stack<>();
for (String item : ls) {
if (item.matches("\\d+")) {
stack.push(item);
} else {
int num2 = Integer.parseInt(stack.pop());
int num1 = Integer.parseInt(stack.pop());
int res = 0;
if (item.equals("+")) {
res = num1 + num2;
} else if (item.equals("-")) {
res = num1 - num2;
} else if (item.equals("*")) {
res = num1 * num2;
} else if (item.equals("/")) {
res = num1 / num2;
} else {
throw new RuntimeException("运算符有误"+item);
}
stack.push("" + res);
}
}
return Integer.parseInt(stack.pop());
}
}
class Operation {
private static final int ADD = 1;
private static final int SUB = 1;
private static final int MUL = 2;
private static final int DIV = 2;
public static int getValue(String operation) {
int result = 0;
switch (operation) {
case "+":
result = ADD;
break;
case "-":
result = SUB;
break;
case "/":
result = DIV;
break;
case "*":
result = MUL;
break;
default:
System.out.println("不存在该运算符"+operation);
break;
}
return result;
}
}
递归
递归需要遵守的重要规则
1)执行一个方法时,就创建一个新的受保护的独立空间(栈空间)
2)方法的局部变量是独立的,不会相互影响, 比如n变量
3)如果方法中使用的是引用类型变量(比如数组),就会共享该引用类型的数据.
4)递归必须向退出递归的条件逼近,否则就是无限递归,出现StackOverflowError,死龟了:)
5)当一个方法执行完毕,或者遇到return,就会返回,遵守谁调用,就将结果返回给谁,同时当方法执行完毕或者返回时,该方法也就执行完毕。
八皇后
package recursion;
public class Queue8 {
int max = 8;
int[] array = new int[max];
static int count=0;
public static void main(String[] args) {
Queue8 queue8 = new Queue8();
queue8.check(0);
System.out.println("一共有"+count+"种解法");
}
private void check(int n) {
if (n == max) {
print();
count++;
return;
}
for (int i = 0; i < max; i++) {
array[n] = i;
if (judge(n)) {
check(n + 1);
}
}
}
private boolean judge(int n) {
for (int i = 0; i < n; i++) {
if (array[i] == array[n] || Math.abs(n - i) == Math.abs(array[n] - array[i])) {
return false;
}
}
return true;
}
private void print() {
for (int i = 0; i < array.length; i++) {
System.out.print(array[i] + " ");
}
System.out.println();
}
}
迷宫
package recursion;
public class MiGong {
public static void main(String[] args) {
int[][] map = new int[8][7];
for (int i = 0; i < 7; i++) {
map[0][i] = 1;
map[7][i] = 1;
}
for (int i = 0; i < 8; i++) {
map[i][0] = 1;
map[i][6] = 1;
}
map[3][1] = 1;
map[3][2] = 1;
map[4][3] = 1;
System.out.println("迷宫");
for (int i = 0; i < 8; i++) {
for (int j = 0; j < 7; j++) {
System.out.print(map[i][j] + " ");
}
System.out.println();
}
setWay(map, 1, 1);
System.out.println("解迷");
for (int i = 0; i < 8; i++) {
for (int j = 0; j < 7; j++) {
System.out.print(map[i][j] + " ");
}
System.out.println();
}
}
public static boolean setWay(int[][] map, int i, int j) {
if (map[6][5] == 2) {
return true;
} else {
if (map[i][j] == 0) {
map[i][j] = 2;
if (setWay(map, i + 1, j)) {
return true;
} else if (setWay(map, i, j + 1)) {
return true;
} else if (setWay(map, i - 1, j)) {
return true;
} else if (setWay(map, i, j - 1)) {
return true;
} else {
map[i][j] = 3;
return false;
}
} else {
return false;
}
}
}
public static boolean setWay2(int[][] map, int i, int j) {
if (map[6][5] == 2) {
return true;
} else {
if (map[i][j] == 0) {
map[i][j] = 2;
if (setWay(map, i - 1, j)) {
return true;
} else if (setWay(map, i, j + 1)) {
return true;
} else if (setWay(map, i + 1, j)) {
return true;
} else if (setWay(map, i, j - 1)) {
return true;
} else {
map[i][j] = 3;
return false;
}
} else {
return false;
}
}
}
}
|