下面应用到的三个表数据部分如下,仅供参考
?customers.csv:
"1","Richard","Hernandez","XXXXXXXXX","XXXXXXXXX","6303 Heather Plaza","Brownsville","TX","78521"
"2","Mary","Barrett","XXXXXXXXX","XXXXXXXXX","9526 Noble Embers Ridge","Littleton","CO","80126"
"3","Ann","Smith","XXXXXXXXX","XXXXXXXXX","3422 Blue Pioneer Bend","Caguas","PR","00725"
"4","Mary","Jones","XXXXXXXXX","XXXXXXXXX","8324 Little Common","San Marcos","CA","92069"
"5","Robert","Hudson","XXXXXXXXX","XXXXXXXXX","10 Crystal River Mall ","Caguas","PR","00725"
"6","Mary","Smith","XXXXXXXXX","XXXXXXXXX","3151 Sleepy Quail Promenade","Passaic","NJ","07055"
"7","Melissa","Wilcox","XXXXXXXXX","XXXXXXXXX","9453 High Concession","Caguas","PR","00725"
"8","Megan","Smith","XXXXXXXXX","XXXXXXXXX","3047 Foggy Forest Plaza","Lawrence","MA","01841"
"9","Mary","Perez","XXXXXXXXX","XXXXXXXXX","3616 Quaking Street","Caguas","PR","00725"
"10","Melissa","Smith","XXXXXXXXX","XXXXXXXXX","8598 Harvest Beacon Plaza","Stafford","VA","22554"
"11","Mary","Huffman","XXXXXXXXX","XXXXXXXXX","3169 Stony Woods","Caguas","PR","00725"
"12","Christopher","Smith","XXXXXXXXX","XXXXXXXXX","5594 Jagged Embers By-pass","San Antonio","TX","78227"
"13","Mary","Baldwin","XXXXXXXXX","XXXXXXXXX","7922 Iron Oak Gardens","Caguas","PR","00725"
"14","Katherine","Smith","XXXXXXXXX","XXXXXXXXX","5666 Hazy Pony Square","Pico Rivera","CA","90660"
"15","Jane","Luna","XXXXXXXXX","XXXXXXXXX","673 Burning Glen","Fontana","CA","92336"
"16","Tiffany","Smith","XXXXXXXXX","XXXXXXXXX","6651 Iron Port","Caguas","PR","00725"
"17","Mary","Robinson","XXXXXXXXX","XXXXXXXXX","1325 Noble Pike","Taylor","MI","48180"
"18","Robert","Smith","XXXXXXXXX","XXXXXXXXX","2734 Hazy Butterfly Circle","Martinez","CA","94553"
"19","Stephanie","Mitchell","XXXXXXXXX","XXXXXXXXX","3543 Red Treasure Bay","Caguas","PR","00725"
"20","Mary","Ellis","XXXXXXXXX","XXXXXXXXX","4703 Old Route","West New York","NJ","07093"
orders.csv:
"1","2013-07-25 00:00:00","11599","CLOSED"
"2","2013-07-25 00:00:00","256","PENDING_PAYMENT"
"3","2013-07-25 00:00:00","12111","COMPLETE"
"4","2013-07-25 00:00:00","8827","CLOSED"
"5","2013-07-25 00:00:00","11318","COMPLETE"
"6","2013-07-25 00:00:00","7130","COMPLETE"
"7","2013-07-25 00:00:00","4530","COMPLETE"
"8","2013-07-25 00:00:00","2911","PROCESSING"
"9","2013-07-25 00:00:00","5657","PENDING_PAYMENT"
"10","2013-07-25 00:00:00","5648","PENDING_PAYMENT"
"11","2013-07-25 00:00:00","918","PAYMENT_REVIEW"
"12","2013-07-25 00:00:00","1837","CLOSED"
"13","2013-07-25 00:00:00","9149","PENDING_PAYMENT"
"14","2013-07-25 00:00:00","9842","PROCESSING"
"15","2013-07-25 00:00:00","2568","COMPLETE"
"16","2013-07-25 00:00:00","7276","PENDING_PAYMENT"
"17","2013-07-25 00:00:00","2667","COMPLETE"
"18","2013-07-25 00:00:00","1205","CLOSED"
"19","2013-07-25 00:00:00","9488","PENDING_PAYMENT"
"20","2013-07-25 00:00:00","9198","PROCESSING"
"21","2013-07-25 00:00:00","2711","PENDING"
"22","2013-07-25 00:00:00","333","COMPLETE"
order_items.csv:
1,1,971,1,26.87,26.87
2,1,217,3,23.61,7.87
3,1,98,2,33.1,16.55
4,2,998,1,86.39,86.39
5,2,813,1,100.93,100.93
6,2,171,2,191.08,95.54
7,2,639,1,23.8,23.8
8,3,273,3,21.36,7.12
9,4,285,1,48.57,48.57
10,4,628,3,40.53,13.51
11,4,1,3,109.62,36.54
12,4,354,3,95.43,31.81
13,5,426,3,203.07,67.69
14,5,534,3,251.04001,83.68
15,5,688,1,102.22,102.22
16,5,539,2,16.04,8.02
17,6,608,2,26.68,13.34
18,6,535,1,91.71,91.71
19,6,276,2,131.6,65.8
20,7,36,2,52.34,26.17
21,7,40,2,97.28,48.64
22,7,451,1,63.39,63.39
23,8,156,1,104.62,104.62
24,8,105,1,9.88,9.88
25,8,165,3,114.72,38.24
26,8,428,2,182.72,91.36
27,8,734,3,283.44,94.48
28,9,917,3,73.8,24.6
29,10,613,1,25.84,25.84
30,11,949,2,166.74,83.37
31,12,694,2,149.6,74.8
32,12,914,1,72.57,72.57
33,12,999,3,306.66,102.22
34,13,746,1,79.34,79.34
35,13,483,3,144.24,48.08
36,14,254,3,247.98001,82.66
37,14,263,3,66.12,22.04
import java.math.MathContext
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import scala.math.BigDecimal.RoundingMode
/*
SparkSession: rdd算子 SQL练习
*/
object App2 {
val HDFS_ROOT="hdfs://single01:9000"
//SparkSession既可以做面向rdd的计算或也可以做面向sql的运算
def sparkSubmit(fs:SparkSession=>Unit*)={
val spark:SparkSession=SparkSession
.builder()
.appName("spark_sql_01")
.master("local[*]")
//开启动态分区,默认是false
// .config("hive.exec.dynamic.partition","true")
//开启允许所有分区都是动态的,否则必须要有静态分区才能使用
// .config("hive.exec.dynamic.partition.mode","nonstrict")
// .enableHiveSupport()
.getOrCreate()
fs.foreach(f=>f(spark))
spark.close()
}
//write.config("spark.sql.parquet.writeLegacyFormat",true).mode(SaveMode.Append).saveAsTable(table)
//write.mode(SaveMode.Append).insertInto(table)
//交集、并集、差集
def unionintersectionsub (spark:SparkSession)={
val sc:SparkContext=spark.sparkContext
//makeRDD造数据,Seq类型,里面可以放样例类或元组
val rdd1: RDD[(Int, String)] = sc.makeRDD(Seq((1, "henry"), (2, "pola"), (4, "ariel"), (7, "jack"), (10, "rose")))
val rdd2: RDD[(Int, String)] = sc.makeRDD(Seq((3, "mary"), (2, "pola"), (5, "sherry"), (7, "jack"), (10, "rose")))
//并集
rdd1.union(rdd2).foreach(println)
//交集
rdd1.intersection(rdd2).foreach(println)
//差集
rdd1.subtract(rdd2).foreach(println)
rdd2.subtract(rdd1).foreach(println)
}
//rdd存储
def persistCheckpoint(sparkSession: SparkSession):Unit={
//血统(血缘关系)
//计算过程复杂的rdd1
sparkSession.sparkContext.setCheckpointDir("hdfs://single01:9000/spark/checkpoint/app_name")
val rdd1:RDD[String]=null
rdd1.cache()//等同于=> persist(StorageLevel.MEMORY_ONLY)
rdd1.persist(StorageLevel.MEMORY_AND_DISK_SER_2)//内存放不下,可以放磁盘,序列化,多备份一份
rdd1.checkpoint()
val checkpointed: Boolean = rdd1.isCheckpointed
val file: Option[String] = rdd1.getCheckpointFile
//rdd2.union(rdd1)
//rdd3.join(rdd1)
//rdd1.map
}
/*
SQL练习:两表(两个rdd算子)
1.根据hadoop上的表信息,创建样例类
2.根据表数据和样例类,创建隐式类,做类型扩展(便于表数据转化为样例类实例)
3.根据后面SQL需求,对rdd算子预处理
4.对rdd算子做SQL练习
*/
// 1.根据hadoop上的表信息,创建样例类************************************************************************************
// # 顾客信息 id,姓,名,邮箱,密码,详细地址,城市,国家,邮政编码
case class Customer(id :Int,fname:String
,lname:String,email:String,
password:String,street:String,
city:String,state:String,zipcode:String)
// # 订单目录 id,订单日期,订单顾客编号,订单状态
case class Order(id:Int,date:String,customer_id:Int,status:String)
// #订单项目信息 id,订单id,产品id,数量,总价,单价
case class OrderItem(id:Int,order_id:Int,product_id:Int,
quantity:Short,subtotal:Float,product_price:Float)
/*
隐式类:做某种类型的扩展
隐式函数:1、类型转化;通过特质,动态混入:既可以面向类,也可以面向对象
*/
// 2.根据表数据和样例类,创建隐式类,做类型扩展(便于表数据转化为样例类实例)***********************************************
implicit class StringTo(line:String){
private val cut=(line:String)=>{
line.split( ",").map(_.replaceAll("\"",""))
}
def toCustomer:Customer={
val arr: Array[String] = cut(line)
Customer(arr(0).toInt,arr(1),arr(2),arr(3),arr(4),arr(5),arr(6),arr(7),arr(8))
}
def toOrder:Order={
val arr: Array[String] = cut(line)
Order(arr(0).toInt,arr(1),arr(2).toInt,arr(3))
}
def toOrderItem:OrderItem={
val arr: Array[String] = cut(line)
OrderItem(arr(0).toInt,arr(1).toInt,arr(2).toInt,arr(3).toShort,arr(4).toFloat,arr(5).toFloat)
}
}
//3.根据后面SQL需求,对rdd算子预处理************************************************************************************
def join(sparkSession: SparkSession)={
val sc=sparkSession.sparkContext
val rddCus: RDD[(Int,String)] = sc.textFile(s"${HDFS_ROOT}/spark/resource/electronic_business/customers.csv", 3)
.map(_.toCustomer)//转换为样例类Customer
.map(cus=>(cus.id,s"${cus.lname} ${cus.fname}"))
val rddOrd: RDD[Order] = sc.textFile(s"${HDFS_ROOT}/spark/resource/electronic_business/orders.csv", 3)
.map(_.toOrder)//转换为样例类Order
val rddOI: RDD[(Int,Float)] = sc.textFile(s"${HDFS_ROOT}/spark/resource/electronic_business/order_items.csv", 3)
.map(_.toOrderItem)//转换为样例类OrderItem
.map(oi=>(oi.order_id,oi.subtotal))
val rddOrd1: RDD[(Int, Int)] = rddOrd
.filter(_.status.equalsIgnoreCase("complete"))//过滤行(缩减数据规模)
.map(o => (o.id, o.customer_id))//列裁剪(缩减数据规模)
// 4.对rdd算子做SQL练习***********************************************************************************************
// 查找谁是最大买“货”?(谁购买的最多,以¥¥¥算)
println(rddOrd1
//相同的只保留一份
.join(rddOI)
// (o.customer_id,oi.subtotal)
.map(_._2)
//按键分组,同一个键相邻的值一次聚合
.reduceByKey(_ + _)
//(customer_id,(sum(subtotal),customer_name))
.join(rddCus)
//金额升序前三名
// .sortBy(_._2._1)
//金额降序前三名
.sortBy(_._2._1,false)
.take(3).mkString("\n"))
//右外连接,左边Option;左外右边Option
/* println(rddOrd1
.rightOuterJoin(rddOI)
//(order_id,(Option(customer_id),subtotal)) customer_id因为过滤了部分数据,所以为Option类型,要转换类型,在填充数据
.map(tp2 => {
val opt: Option[Int] = tp2._2._1
(if (opt.isEmpty) 0 else opt.get, tp2._2._2)
})
//(customer_id,subtotal)
.reduceByKey(_ + _)
.join(rddCus)
//总额降序
.sortBy(_._2._1, false)
//过滤有名字的,查看rightOuterJoin有没有差集
// (如果opt.isEmpty,customer_id会有0,就没有对应的customer_name)
.filter(_._2._2.equals("null"))
.take(20).mkString("\n"))*/
//右外连接,左边Option;左外右边Option
/* println(rddOrd1
.rightOuterJoin(rddOI)
//(order_id,(Option(customer_id),subtotal))
// customer_id因为过滤了部分数据,所以为Option类型,要转换类型,在填充数据
.map(tp2 => {
val opt: Option[Int] = tp2._2._1
(if (opt.isEmpty) 0 else opt.get, tp2._2._2)
})
.reduceByKey(_ + _)
.leftOuterJoin(rddCus)
.map(tp2=>{
val opt: Option[String] = tp2._2._2
// BigDecimal(tp2._2._1,MathContext.DECIMAL32).setScale(2,RoundingMode.HALF_UP).floatValue)
(tp2._1,(tp2._2._1.formatted("%.2f"),if(opt.isEmpty) "Nobody" else opt.get))
})
//总额降序
.sortBy(_._2._1, false)
//过滤查看差集
.filter(_._2._2.equals("Nobody"))
.take(20).mkString("\n"))*/
}
/*
count :元素的个数
countByKey :不同的键的个数
countByValue:不同的键值对个数
*/
def testCount(sparkSession: SparkSession)={
val value: RDD[(Int, Int)] = sparkSession
.sparkContext
.makeRDD(Seq((1, 2), (1, 3), (1, 2), (2, 3), (3, 4)), 1)
println(value.count())
println(value.countByKey())
println(value.countByValue())
}
def groupTest(spark: SparkSession)={
val sc:SparkContext=spark.sparkContext
val rdd1: RDD[(Int, String)] = sc.makeRDD(Seq((1, "henry"), (2, "pola"), (4, "ariel"), (7, "jack"), (10, "rose")))
val rdd2: RDD[(Int, String)] = sc.makeRDD(Seq((3, "mary"), (2, "pola"), (5, "sherry"), (7, "jack"), (10, "rose")))
//同键合并,键在前,值变成二元组里的迭代器
// cogroup可以分区,groupWith不行,其他一样
/* rdd1.groupWith(rdd2)
.mapValues(tp2=>s"${tp2._1.mkString(",")}_${tp2._2.mkString(",")}")
.foreach(println)*/
rdd1.cogroup(rdd2)
.mapValues(tp2=>s"${tp2._1.mkString(",")}_${tp2._2.mkString(",")}")
.foreach(println)
}
def main(args: Array[String]): Unit = {
// sparkSubmit(unionintersectionsub)
// sparkSubmit(join)
// sparkSubmit(testCount)
sparkSubmit(groupTest)
}
}
|