IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark农产品分析 -> 正文阅读

[大数据]Spark农产品分析

先声明:不会公布数据,不要问我要数据,仅公布代码以供学习

数据格式如下

草鱼 15.00 2022/4/1 北京某某综合市场 北京 朝阳
黄鳝 37.00 2022/4/1 北京朝阳路综合市场 北京 朝阳

一、农产品市场分析

1.统计每个省份的农产品市场总数

package AAAAA

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Atest01 {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)
    //TODO 每个省份的市场总数
    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len.length > 4
      }
    }
    val splitRDD: RDD[(String, String)] = filterRDD.map {
      action => {
        val datas: Array[String] = action.split("\\s+")
        if(datas.length == 6){
          (datas(3), datas(4))//(市场, 省份)
        }else {
          (datas(2), datas(3)) //缺少价格的脏数据
        }
      }
    }.distinct()

    val resultRDD: RDD[(String, Int)] = splitRDD.map {
      case (market, privice) => {
        (privice, 1) //(省份,1)
      }
    }.reduceByKey(_ + _).sortBy(_._2, false)
    resultRDD.collect().foreach(println)

    sc.stop()
  }

}

2. 统计数据中缺失农产品市场的省份有哪些

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Atest02 {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)

    val dataRDD = sc.textFile("datas/product.txt")
    val filetRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len.length < 3
      }
    }
    filetRDD.foreach(println)

    sc.stop()
  }

}

3. 根据农产品种类数量,统计每个省份排名前3名的农产品市场

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Atest03 { //
  def main(args: Array[String]): Unit = { 
    //TODO 统计每个省份排名前3名的农产品市场
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)

    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len.length > 4
      }
    }
    val mapRDD: RDD[((String, String), Int)] = filterRDD.map {
      action => {
        val datas: Array[String] = action.split("\\s+")
        if (datas.length == 6) {
          ((datas(3), datas(4)), 1) //(市场, 省份)
        } else {
          ((datas(2), datas(3)), 1) //缺少价格的脏数据
        }
      }
    }.reduceByKey(_ + _)
    val groupRDD: RDD[(String, Iterable[(String, (String, Int))])] = mapRDD.map {
      case ((market, province), count) => {
        (province, (market, count))
      }
    }.groupBy(_._1)
    val resultRDD = groupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2._2)(Ordering.Int.reverse).take(3)
      }
    ).flatMap(_._2).groupByKey()

    resultRDD.foreach(println)

    sc.stop()
  }

}

4.统计山东和山西两省售卖土豆的农产品市场总数

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Atest04 {  //d)	统计山东和山西两省售卖土豆的农产品市场总数
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)

    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
         (len.length == 6 && (len(4) == "山东" || len(4) == "山西") && len(0) == "土豆") || (len.length == 5 && (len(3) == "山东" || len(3) == "山西") && len(0) == "土豆")
      }
    }
    println(filterRDD.count())

    sc.stop()

  }
}

二、农产品分析

1.统计每个省份的农产品种类总数

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object AAtest01 {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)

    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len.length > 4
      }
    }
    val mapRDD: RDD[(String, String)] = filterRDD.map {
      action => {
        val datas: Array[String] = action.split("\\s+")
        if (datas.length == 6) {
          (datas(4), datas(0)) //(省份, 产品)
        } else {
          (datas(3), datas(0)) //缺少价格的脏数据
        }
      }
    }.distinct()
    val resultRDD: RDD[(String, Int)] = mapRDD.map {
      case (province, product) => {
        (province, 1)
      }
    }.reduceByKey(_ + _)
    resultRDD.foreach(println)

    sc.stop()
  }
}

2.统计哪些农产品市场在售卖樱桃,要标明农产品市场所在省份与城市

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object AAtest02 {
  def main(args: Array[String]): Unit = {
    //TODO 统计哪些农产品市场在售卖樱桃,要标明农产品市场所在省份与城市
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)

    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len(0) == "樱桃"
      }
    }
    val resultRDD: RDD[(String, String, String, String)] = filterRDD.map {
      data => {
        val datas: Array[String] = data.split("\\s+")
        (datas(0), datas(3), datas(4), datas(5))
      }
    }
    resultRDD.foreach(println)
    sc.stop()
  }
}

3. 统计山东省售卖蛤蜊的农产品市场占全省农产品市场的比例

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object AAtest03 {
  def main(args: Array[String]): Unit = {
    //TODO 	统计山东省售卖蛤蜊的农产品市场占全省农产品市场的比例
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)
    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")

    val ProvinceProduct: RDD[(String, Int)] = ProvinceProductSum(dataRDD)

    val cnt: Int = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len(0) == "蛤蜊" && len(4) == "山东"
      }
    }.count().toInt

    val sum: Int = ProvinceProduct.filter {
      data => data._1.contains("山东")
    }.map(_._2).first()

    println(cnt*1.0 / sum)
    sc.stop()


  }

  def ProvinceProductSum(dataRDD: RDD[String]): RDD[(String, Int)] = {
    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len.length > 4
      }
    }
    val splitRDD: RDD[(String, String)] = filterRDD.map {
      action => {
        val datas: Array[String] = action.split("\\s+")
        if(datas.length == 6){
          (datas(3), datas(4))//(市场, 省份)
        }else {
          (datas(2), datas(3)) //缺少价格的脏数据
        }
      }
    }.distinct()

    splitRDD.map {
      case (market, privice) => {
        (privice, 1) //(省份,1)
      }
    }.reduceByKey(_ + _).sortBy(_._2, false)
  }

}

4.计算山西省的每种农产品的价格波动趋势,即计算价格均值。

某种农产品的价格均值计算公式:PAVG = (PM1+PM2+…+PMn-max§-min§)/(N-2),
其中,P表示价格,Mn表示market,即农产品市场.
PM1表示M1农产品市场的该产品价格,max§表示价格最大值,min§价格最小值。

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object AAtest04 {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)

    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")

    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        if(len.length > 4 && len(4) == "山西")  true
        else false
      }
    }
    val mapRDD: RDD[(String, String)] = filterRDD.map {
      data => {
        val datas: Array[String] = data.split("\\s+")
        datas(0) -> datas(1).toDouble //(产品, 价格)
      }
    }.groupByKey()
      .map {
        data => {
          if (data._2.size > 2) {
            (data._1, ((data._2.sum - data._2.max - data._2.min) / (data._2.size - 2)).formatted("%.2f"))
          } else {
            (data._1, (data._2.sum / data._2.size).formatted("%.2f"))
          }
        }
      }
    mapRDD.foreach(println)

    sc.stop()

  }

}

5.统计排名前3的省份共同拥有的农产品类型

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object AAtest05 {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)

    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len.length > 4
      }
    }
    val mapRDD: RDD[(String, String)] = filterRDD.map {
      action => {
        val datas: Array[String] = action.split("\\s+")
        if (datas.length == 6) {
          (datas(4), datas(0)) //(省份, 产品)
        } else {
          (datas(3), datas(0)) //缺少价格的脏数据
        }
      }
    }.distinct()
    val Top3RDD = mapRDD.map {
      case (province, product) => {
        (province, 1)
      }
    }.reduceByKey(_ + _).sortBy(_._2, false).take(3).map(_._1)

    val gourpRDD: RDD[(String, Iterable[String])] = mapRDD.groupByKey()

    val Top3FilterRDD: RDD[(String, Iterable[String])] = gourpRDD.filter {
      data => Top3RDD.contains(data._1)
    }
    val listRDD: RDD[String] = Top3FilterRDD.flatMap {
      data => {
        data._2.toList
      }
    }
    val commonRDD: RDD[String] = listRDD.map {
      data => {
        (data, 1)
      }
    }.reduceByKey(_ + _)
      .filter {
        data => {
          data._2 == 3
        }
      }
      .map(_._1)

    commonRDD.foreach(println)
    sc.stop()
  }

}

6. 根据农产品类型数量,统计排名前5名的省份

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object AAtest06 {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)

    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len.length > 4
      }
    }
    val mapRDD: RDD[(String, String)] = filterRDD.map {
      action => {
        val datas: Array[String] = action.split("\\s+")
        if (datas.length == 6) {
          (datas(4), datas(0)) //(省份, 产品)
        } else {
          (datas(3), datas(0)) //缺少价格的脏数据
        }
      }
    }.distinct()
    val resultRDD: Array[String] = mapRDD.map {
      case (province, product) => {
        (province, 1)
      }
    }.reduceByKey(_ + _).sortBy(_._2,false).take(5).map(_._1)

    resultRDD.foreach(println)

    sc.stop()
  }

}

7.东北三省农产品最高价格降序排列,统计前十名的农产品有哪些

package AAAAA

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object AAtest07 {
  def main(args: Array[String]): Unit = {
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("quesion1")
    val sc = new SparkContext(sparConf)

    val dataRDD: RDD[String] = sc.textFile("datas/product.txt")
    val filterRDD: RDD[String] = dataRDD.filter {
      data => {
        val len: Array[String] = data.split("\\s+")
        len.length == 6 && (len(4) == "辽宁" || len(4) == "吉林" || len(4) == "黑龙江")
      }
    }
    val mapRDD: RDD[Iterable[(String, Double)]] = filterRDD.map {
      data => {
        val datas: Array[String] = data.split("\\s+")
        (datas(0), datas(1).toDouble) //(产品,价格)
      }
    }.groupBy(_._1).map(_._2) //CompactBuffer((生菜,2.3), (生菜,3.5), (生菜,2.5), (生菜,3.5), (生菜,2.8), (生菜,1.2), (生菜,4.0))
//    mapRDD.foreach(println)

    val aloneRDD: Array[(String, Double)] = mapRDD.flatMap {
      data => {
        data.toList.sortBy(_._2)(Ordering.Double.reverse).take(1) //选取第一个,就是当前菜品的最高价格
      }
    }.sortBy(_._2, false).take(10)
    aloneRDD.foreach(println)

    sc.stop()
  }

}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-08 19:06:49  更:2022-06-08 19:08:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/29 7:25:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码