一.WordCount
数据
hadoop mapreduce yarn
hdfs hadoop mapreduce
mapreduce yarn lagou
lagou
lagou
案例: Scala版
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName(this.getClass.getCanonicalName)
conf.setMaster("local[*]")
var sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile("file:Users/guchenfei/IdeaProjects/Demo/src/main/data/wc.txt")
val words: RDD[String] = lines.flatMap(line => line.split("\\s+"))
val wordsMap: RDD[(String, Int)] = words.map((_, 1))
val result: RDD[(String, Int)] = wordsMap.reduceByKey(_ + _)
result.foreach(println)
sc.stop()
}
集群打包测试案例 注意上述程序不要指定Master
#conf.setMaster("local[*]")
修改输入文件方式为参数
if (args.length == 0) return
val lines: RDD[String] = sc.textFile(args(0))
启动staondlone模式提价任务
[root@Linux121 guchenfei]# spark-submit --class WordCount --deploy-mode cluster ~/original-Demo-1.0-SNAPSHOT.jar /wcinput/wc.txt
22/02/28 22:13:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Java版
public class JavaWordCount {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("JavaWordCount");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
javaSparkContext.setLogLevel("warn");
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("file:Users/guchenfei/IdeaProjects/Demo/src/main/data/wc.txt");
JavaRDD<String> words = stringJavaRDD.flatMap(line -> Arrays.stream((line.split("\\s+"))).iterator());
JavaPairRDD<String, Integer> wordMap = words.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> results = wordMap.reduceByKey((x, y) -> x + y);
results.foreach(elem -> System.out.println(elem));
javaSparkContext.stop();
}
}
二.计算圆周率
object SparkPi {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName(this.getClass.getCanonicalName)
var sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val slices = if(args.length > 0 ) args(0).toInt else 100
val N = 100000000
val count = sc.makeRDD(1 to N,slices).map(idx => {
val (x,y) = (random,random)
if (x*x + y*y <= 1) 1 else 0
}).reduce(_+_)
println(s"Pi is roughly ${4.0 * count/N}")
}
}
Pi is roughly 3.14165928
三.广告数据统计
数据格式
1562085629823 Hebei Shijiazhuang 643 3
1562085629828 Hubei Wuhan 69 1
1562085629829 Henan Luoyang 718 6
1562085629830 Hebei Zhangjiakou 606 1
1562085629830 Jiangsu Nanjing 42 4
1562085629831 Hunan Changsha 351 3
1562085629831 Hebei Shijiazhuang 419 4
1562085629832 Hubei Jingzhou 535 1
1562085629832 Henan Zhengzhou 176 8
1562085629833 Henan Luoyang 333 4
1562085629833 Hunan Xiangtan 51 2
1562085629834 Jiangsu Suzhou 540 9
1562085629834 Hebei Zhangjiakou 71 4
1562085629835 Hunan Xiangtan 297 4
1562085629835 Jiangsu Nanjing 160 0
1562085629836 Henan Luoyang 319 1
1562085629836 Hubei Jingzhou 190 5
1562085629837 Hunan Xiangtan 729 5
1562085629839 Jiangsu Suzhou 57 6
1562085629839 Jiangsu Suzhou 104 9
1562085629840 Jiangsu Suzhou 102 7
1562085629840 Henan Zhengzhou 552 0
1562085629841 Hunan Xiangtan 136 1
1562085629842 Hunan Changsha 183 6
1562085629843 Hunan Xiangtan 143 8
1562085629844 Jiangsu Nanjing 990 9
1562085629844 Hunan Xiangtan 172 0
1562085629845 Hunan Changsha 957 9
1562085629845 Hubei Wuhan 466 5
1562085629846 Hunan Changsha 658 3
1562085629846 Hunan Xiangtan 342 4
1562085629847 Hunan Changsha 315 1
1562085629847 Jiangsu Suzhou 352 3
1562085629847 Henan Luoyang 966 5
1562085629848 Hebei Shijiazhuang 776 3
1562085629848 Hunan Changsha 824 2
1562085629849 Hunan Xiangtan 545 2
1562085629849 Hebei Shijiazhuang 403 2
1562085629850 Hebei Zhangjiakou 336 3
1562085629851 Henan Luoyang 181 2
1562085629851 Henan Luoyang 765 1
1562085629852 Hunan Xiangtan 871 6
1562085629852 Hubei Jingzhou 800 2
1562085629852 Henan Luoyang 67 7
1562085629853 Hubei Wuhan 997 3
1562085629854 Hebei Zhangjiakou 53 0
1562085629854 Henan Zhengzhou 24 0
1562085629854 Hunan Changsha 771 4
1562085629855 Hebei Shijiazhuang 641 2
1562085629856 Jiangsu Suzhou 141 8
1562085629856 Hebei Zhangjiakou 753 8
1562085629857 Hubei Wuhan 348 3
1562085629859 Henan Luoyang 235 2
1562085629859 Hubei Wuhan 790 6
1562085629860 Hebei Zhangjiakou 379 8
1562085629861 Henan Luoyang 712 6
1562085629862 Henan Luoyang 793 2
1562085629862 Hubei Wuhan 946 7
1562085629863 Hubei Wuhan 223 3
1562085629863 Hunan Changsha 255 6
1562085629864 Hebei Shijiazhuang 702 8
1562085629864 Hebei Shijiazhuang 681 8
1562085629865 Hubei Wuhan 781 7
1562085629865 Hunan Changsha 928 7
1562085629866 Henan Zhengzhou 137 8
1562085629866 Hebei Zhangjiakou 388 2
1562085629867 Hunan Changsha 741 9
1562085629867 Hebei Shijiazhuang 142 2
1562085629867 Jiangsu Suzhou 564 1
1562085629868 Jiangsu Suzhou 379 2
1562085629868 Hubei Wuhan 153 1
1562085629869 Henan Zhengzhou 120 5
1562085629869 Hunan Changsha 29 4
1562085629869 Hubei Wuhan 502 8
1562085629870 Hubei Jingzhou 73 0
1562085629871 Hebei Shijiazhuang 63 5
1562085629871 Henan Zhengzhou 863 9
1562085629872 Henan Zhengzhou 246 8
1562085629872 Hubei Jingzhou 973 8
1562085629873 Henan Zhengzhou 891 6
1562085629873 Hebei Shijiazhuang 577 2
1562085629873 Hunan Changsha 76 0
1562085629875 Henan Luoyang 509 5
1562085629875 Hunan Xiangtan 605 8
1562085629876 Hunan Xiangtan 20 7
1562085629877 Hebei Zhangjiakou 264 0
1562085629878 Jiangsu Suzhou 307 1
1562085629879 Hunan Changsha 757 2
1562085629879 Hebei Zhangjiakou 552 9
1562085629879 Hunan Xiangtan 283 9
1562085629880 Jiangsu Nanjing 110 2
1562085629880 Jiangsu Nanjing 60 5
1562085629881 Henan Luoyang 799 9
1562085629881 Hunan Xiangtan 717 3
1562085629882 Hunan Xiangtan 543 3
1562085629882 Hubei Jingzhou 987 6
1562085629883 Jiangsu Suzhou 631 1
1562085629884 Hubei Jingzhou 934 3
1562085629884 Henan Luoyang 728 3
1562085629885 Hunan Changsha 209 2
1562085629886 Jiangsu Suzhou 499 9
1562085629886 Hubei Jingzhou 951 8
1562085629887 Jiangsu Nanjing 39 9
1562085629887 Hebei Zhangjiakou 375 4
1562085629888 Hunan Changsha 530 8
1562085629888 Henan Luoyang 201 1
1562085629889 Hebei Shijiazhuang 30 3
1562085629889 Hubei Wuhan 431 6
1562085629890 Hebei Zhangjiakou 918 7
1562085629890 Hubei Wuhan 220 3
1562085629892 Hunan Xiangtan 565 5
1562085629892 Hunan Changsha 911 6
1562085629893 Henan Zhengzhou 525 7
1562085629894 Hebei Zhangjiakou 737 6
1562085629895 Hunan Xiangtan 125 2
1562085629896 Hubei Wuhan 434 8
1562085629896 Jiangsu Nanjing 346 6
1562085629897 Henan Zhengzhou 647 1
1562085629898 Henan Luoyang 215 4
1562085629898 Hubei Wuhan 263 9
根据以上格式求
1.每个省份广告点击Top3的广告
object Adstat {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName(this.getClass.getCanonicalName)
conf.setMaster("local[*]")
var sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile("file:Users/guchenfei/IdeaProjects/Demo/src/main/data/advert.log")
val stat1RDD: RDD[(String, String)] = lines.map {
line =>
val fields: Array[String] = line.split("\\s+")
(fields(1), fields(4))
}
val reduce1RDD: RDD[((String, String), Int)] = stat1RDD.map { case (provice, adid) => ((provice, adid), 1) }.reduceByKey(_ + _)
reduce1RDD.map{case ((provice, adid),count)=>(provice,(adid,count))}.groupByKey()
.mapValues(buffer => buffer.toList.sortWith(_._2 > _._2).take(3).map(_._1).mkString(":"))
.foreach(println)
}
}
2.统计每个省份每个小时广告点击Top3
object Adstat {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName(this.getClass.getCanonicalName)
conf.setMaster("local[*]")
var sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile("file:Users/guchenfei/IdeaProjects/Demo/src/main/data/advert.log")
lines.map(line => {
val fileds: Array[String] = line.split("\\s+")
((getHour(fileds(0)), fileds(1), fileds(4)), 1)
}).reduceByKey(_+_).map{case ((hour,provice,adid),count) => ((provice,hour),(adid,count))}
.groupByKey().mapValues(buf=> buf.toList.sortWith(_._2 > _._2).take(3).map(_._1))
.collect().foreach(println)
}
def getHour(string: String): Int = {
val time = new DateTime(string.toLong)
time.getHourOfDay
}
}
四.共同好友
数据
100, 200 300 400 500 600
200, 100 300 400
300, 100 200 400 500
400, 100 200 300
500, 100 300
600, 100
1.法一
object FindFriends {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName(this.getClass.getCanonicalName)
conf.setMaster("local[*]")
var sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile("file:Users/guchenfei/IdeaProjects/Demo/src/main/data/fields.dat")
val friendsRDD: RDD[(String, Array[String])] = lines.map(line => {
val fields: Array[String] = line.split(",")
val userId: String = fields(0).trim
val friends: Array[String] = fields(1).trim.split("\\s+")
(userId, friends)
})
friendsRDD.cartesian(friendsRDD).filter { case ((id1, friends1), (id2, friends2)) => id1 < id2 }
.map{case((id1, friend1), (id2, friend2)) => ((id1,id2),(friend1.intersect(friend2).toBuffer))}
.sortByKey()
.collect().foreach(println)
}
}
((100,200),ArrayBuffer(300, 400))
((100,300),ArrayBuffer(200, 400, 500))
((100,400),ArrayBuffer(200, 300))
((100,500),ArrayBuffer(300))
((100,600),ArrayBuffer())
((200,300),ArrayBuffer(100, 400))
((200,400),ArrayBuffer(100, 300))
((200,500),ArrayBuffer(100, 300))
((200,600),ArrayBuffer(100))
((300,400),ArrayBuffer(100, 200))
((300,500),ArrayBuffer(100))
((300,600),ArrayBuffer(100))
((400,500),ArrayBuffer(100, 300))
((400,600),ArrayBuffer(100))
((500,600),ArrayBuffer(100))
2.法二
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName(this.getClass.getCanonicalName)
conf.setMaster("local[*]")
var sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile("file:Users/guchenfei/IdeaProjects/Demo/src/main/data/fields.dat")
val friendsRDD: RDD[(String, Array[String])] = lines.map(line => {
val fields: Array[String] = line.split(",")
val userId: String = fields(0).trim
val friends: Array[String] = fields(1).trim.split("\\s+")
(userId, friends)
})
friendsRDD.flatMapValues(fridends => fridends.combinations(2)).map{case(k,v)=>(v.mkString(" & "),Set(k))}
.reduceByKey(_ | _)
.sortByKey()
}
五.Super WordCount
object SuperWordCount {
private val stopWord = "in on to from by a an the is are were was i we you your he his".split("\\s+");
private val punctuation = "[\\)\\.,:;`!\\?]"
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName(this.getClass.getCanonicalName)
conf.setMaster("local[*]")
var sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile("/Users/guchenfei/IdeaProjects/Demo/src/main/data/article.txt")
lines.flatMap(_.split("\\s+")).map(_.toLowerCase())
.map(_.replaceAll(punctuation,""))
.filter(word => !stopWord.contains(word)&& word.trim.nonEmpty)
.map((_,1))
.reduceByKey(_+_)
.sortBy(_._2,false)
.collect().foreach(println)
}
}
Scala方式将计算算计导入mysql i
mport java.sql.{Connection, DriverManager, PreparedStatement}
object JDBCDemo {
def main(args: Array[String]): Unit = {
val str = "hadoop spark java scala hbase hive sqoop hue tez atlas datax griffin zk kafka"
val result = str.split("\\s+").zipWithIndex
val userName = "root"
val password = "123456"
val url = "jdbc:mysql://Linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
var conn: Connection = null
var stmt: PreparedStatement = null
val sql = "insert into wordcount values(?,?)"
try {
conn = DriverManager.getConnection(url,userName,password)
stmt = conn.prepareStatement(sql)
result.foreach{
case(word,count)=>
stmt.setString(1,word)
stmt.setInt(2,count)
stmt.executeUpdate()
}
}catch {
case e:Exception => e.printStackTrace()
}finally {
if (stmt!=null) stmt.close()
if (conn!=null)conn.close()
}
}
}
mysql> select * from wordcount;
+---------+-------+
| word | count |
+---------+-------+
| hadoop | 0 |
| spark | 1 |
| java | 2 |
| scala | 3 |
| hbase | 4 |
| hive | 5 |
| sqoop | 6 |
| hue | 7 |
| tez | 8 |
| atlas | 9 |
| datax | 10 |
| griffin | 11 |
| zk | 12 |
| kafka | 13 |
+---------+-------+
14 rows in set (0.00 sec)
整合以上内容将计算的RDD结果导入Mysql
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import java.sql.{Connection, DriverManager, PreparedStatement}
object SuperWordCount2 {
private val stopWord = "in on to from by a an the is are were was i we you your he his".split("\\s+");
private val punctuation = "[\\)\\.,:;`!\\?]"
val userName = "root"
val password = "123456"
val url = "jdbc:mysql://Linux123:3306/ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
def saveMysql(iterator: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var stmt: PreparedStatement = null
val sql = "insert into wordcount values(?,?)"
try {
conn = DriverManager.getConnection(url,userName,password)
stmt = conn.prepareStatement(sql)
iterator.foreach{
case(word,count)=>
stmt.setString(1,word)
stmt.setInt(2,count)
stmt.executeUpdate()
}
}catch {
case e:Exception => e.printStackTrace()
}finally {
if (stmt!=null) stmt.close()
if (conn!=null)conn.close()
}
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName(this.getClass.getCanonicalName)
conf.setMaster("local[*]")
var sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile("/Users/guchenfei/IdeaProjects/Demo/src/main/data/article.txt")
val resultRDD: RDD[(String, Int)] = lines.flatMap(_.split("\\s+")).map(_.toLowerCase())
.map(_.replaceAll(punctuation, ""))
.filter(word => !stopWord.contains(word) && word.trim.nonEmpty)
.map((_, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
resultRDD.saveAsTextFile("file:Users/guchenfei/IdeaProjects/Demo/src/main/data/result")
resultRDD.foreachPartition(saveMysql(_))
}
}
|