先声明:不会公布数据,不要问我要数据,仅公布代码以供学习
数据格式如下
草鱼 15.00 2022/4/1 北京某某综合市场 北京 朝阳 黄鳝 37.00 2022/4/1 北京朝阳路综合市场 北京 朝阳
一、农产品市场分析
1.统计每个省份的农产品市场总数
package AAAAA
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Atest01 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len.length > 4
}
}
val splitRDD: RDD[(String, String)] = filterRDD.map {
action => {
val datas: Array[String] = action.split("\\s+")
if(datas.length == 6){
(datas(3), datas(4))
}else {
(datas(2), datas(3))
}
}
}.distinct()
val resultRDD: RDD[(String, Int)] = splitRDD.map {
case (market, privice) => {
(privice, 1)
}
}.reduceByKey(_ + _).sortBy(_._2, false)
resultRDD.collect().foreach(println)
sc.stop()
}
}
2. 统计数据中缺失农产品市场的省份有哪些
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Atest02 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD = sc.textFile("datas/product.txt")
val filetRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len.length < 3
}
}
filetRDD.foreach(println)
sc.stop()
}
}
3. 根据农产品种类数量,统计每个省份排名前3名的农产品市场
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Atest03 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len.length > 4
}
}
val mapRDD: RDD[((String, String), Int)] = filterRDD.map {
action => {
val datas: Array[String] = action.split("\\s+")
if (datas.length == 6) {
((datas(3), datas(4)), 1)
} else {
((datas(2), datas(3)), 1)
}
}
}.reduceByKey(_ + _)
val groupRDD: RDD[(String, Iterable[(String, (String, Int))])] = mapRDD.map {
case ((market, province), count) => {
(province, (market, count))
}
}.groupBy(_._1)
val resultRDD = groupRDD.mapValues(
iter => {
iter.toList.sortBy(_._2._2)(Ordering.Int.reverse).take(3)
}
).flatMap(_._2).groupByKey()
resultRDD.foreach(println)
sc.stop()
}
}
4.统计山东和山西两省售卖土豆的农产品市场总数
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object Atest04 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
(len.length == 6 && (len(4) == "山东" || len(4) == "山西") && len(0) == "土豆") || (len.length == 5 && (len(3) == "山东" || len(3) == "山西") && len(0) == "土豆")
}
}
println(filterRDD.count())
sc.stop()
}
}
二、农产品分析
1.统计每个省份的农产品种类总数
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AAtest01 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len.length > 4
}
}
val mapRDD: RDD[(String, String)] = filterRDD.map {
action => {
val datas: Array[String] = action.split("\\s+")
if (datas.length == 6) {
(datas(4), datas(0))
} else {
(datas(3), datas(0))
}
}
}.distinct()
val resultRDD: RDD[(String, Int)] = mapRDD.map {
case (province, product) => {
(province, 1)
}
}.reduceByKey(_ + _)
resultRDD.foreach(println)
sc.stop()
}
}
2.统计哪些农产品市场在售卖樱桃,要标明农产品市场所在省份与城市
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AAtest02 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len(0) == "樱桃"
}
}
val resultRDD: RDD[(String, String, String, String)] = filterRDD.map {
data => {
val datas: Array[String] = data.split("\\s+")
(datas(0), datas(3), datas(4), datas(5))
}
}
resultRDD.foreach(println)
sc.stop()
}
}
3. 统计山东省售卖蛤蜊的农产品市场占全省农产品市场的比例
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AAtest03 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val ProvinceProduct: RDD[(String, Int)] = ProvinceProductSum(dataRDD)
val cnt: Int = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len(0) == "蛤蜊" && len(4) == "山东"
}
}.count().toInt
val sum: Int = ProvinceProduct.filter {
data => data._1.contains("山东")
}.map(_._2).first()
println(cnt*1.0 / sum)
sc.stop()
}
def ProvinceProductSum(dataRDD: RDD[String]): RDD[(String, Int)] = {
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len.length > 4
}
}
val splitRDD: RDD[(String, String)] = filterRDD.map {
action => {
val datas: Array[String] = action.split("\\s+")
if(datas.length == 6){
(datas(3), datas(4))
}else {
(datas(2), datas(3))
}
}
}.distinct()
splitRDD.map {
case (market, privice) => {
(privice, 1)
}
}.reduceByKey(_ + _).sortBy(_._2, false)
}
}
4.计算山西省的每种农产品的价格波动趋势,即计算价格均值。
某种农产品的价格均值计算公式:PAVG = (PM1+PM2+…+PMn-max§-min§)/(N-2), 其中,P表示价格,Mn表示market,即农产品市场. PM1表示M1农产品市场的该产品价格,max§表示价格最大值,min§价格最小值。
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AAtest04 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
if(len.length > 4 && len(4) == "山西") true
else false
}
}
val mapRDD: RDD[(String, String)] = filterRDD.map {
data => {
val datas: Array[String] = data.split("\\s+")
datas(0) -> datas(1).toDouble
}
}.groupByKey()
.map {
data => {
if (data._2.size > 2) {
(data._1, ((data._2.sum - data._2.max - data._2.min) / (data._2.size - 2)).formatted("%.2f"))
} else {
(data._1, (data._2.sum / data._2.size).formatted("%.2f"))
}
}
}
mapRDD.foreach(println)
sc.stop()
}
}
5.统计排名前3的省份共同拥有的农产品类型
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AAtest05 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len.length > 4
}
}
val mapRDD: RDD[(String, String)] = filterRDD.map {
action => {
val datas: Array[String] = action.split("\\s+")
if (datas.length == 6) {
(datas(4), datas(0))
} else {
(datas(3), datas(0))
}
}
}.distinct()
val Top3RDD = mapRDD.map {
case (province, product) => {
(province, 1)
}
}.reduceByKey(_ + _).sortBy(_._2, false).take(3).map(_._1)
val gourpRDD: RDD[(String, Iterable[String])] = mapRDD.groupByKey()
val Top3FilterRDD: RDD[(String, Iterable[String])] = gourpRDD.filter {
data => Top3RDD.contains(data._1)
}
val listRDD: RDD[String] = Top3FilterRDD.flatMap {
data => {
data._2.toList
}
}
val commonRDD: RDD[String] = listRDD.map {
data => {
(data, 1)
}
}.reduceByKey(_ + _)
.filter {
data => {
data._2 == 3
}
}
.map(_._1)
commonRDD.foreach(println)
sc.stop()
}
}
6. 根据农产品类型数量,统计排名前5名的省份
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AAtest06 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len.length > 4
}
}
val mapRDD: RDD[(String, String)] = filterRDD.map {
action => {
val datas: Array[String] = action.split("\\s+")
if (datas.length == 6) {
(datas(4), datas(0))
} else {
(datas(3), datas(0))
}
}
}.distinct()
val resultRDD: Array[String] = mapRDD.map {
case (province, product) => {
(province, 1)
}
}.reduceByKey(_ + _).sortBy(_._2,false).take(5).map(_._1)
resultRDD.foreach(println)
sc.stop()
}
}
7.东北三省农产品最高价格降序排列,统计前十名的农产品有哪些
package AAAAA
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object AAtest07 {
def main(args: Array[String]): Unit = {
val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
val sc = new SparkContext(sparConf)
val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
val filterRDD: RDD[String] = dataRDD.filter {
data => {
val len: Array[String] = data.split("\\s+")
len.length == 6 && (len(4) == "辽宁" || len(4) == "吉林" || len(4) == "黑龙江")
}
}
val mapRDD: RDD[Iterable[(String, Double)]] = filterRDD.map {
data => {
val datas: Array[String] = data.split("\\s+")
(datas(0), datas(1).toDouble)
}
}.groupBy(_._1).map(_._2)
val aloneRDD: Array[(String, Double)] = mapRDD.flatMap {
data => {
data.toList.sortBy(_._2)(Ordering.Double.reverse).take(1)
}
}.sortBy(_._2, false).take(10)
aloneRDD.foreach(println)
sc.stop()
}
}
|