IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark 车流量项目实战 -> 正文阅读

[大数据]spark 车流量项目实战

spark 车流量项目实战

1、车流向项目介绍

1.1. 数据采集

数据从哪儿来?

  1. 我们知道数据来来源,比如网站,APP或者工业设备(比如卡口拍摄设备)实现实时数据采集,它首先有非常重要的一点就是所谓的埋点,也就是说,埋点,在网站的哪个页面哪些操作发生时,前端的代码比如javascript或者app android/ios,就通过网络请求Ajax; socket向后端的服务器发送日志数据。
  2. 如果是卡口信息,那么每次拍摄的信息都会传输到服务器端。
  3. 首先就是说网站或者页面设置埋点,那么就是你要跟前端的开发人员约定好,在哪些页面哪些操作发生的时候,网站的话就通过ajax引擎,APP的话就通过Socket网络请求,向后端的服务器发送指定格式的日志数据。卡口数据的话,是和厂商定制数据格式的,数据以指定的格式向服务器发送实时的数据。
  4. 接着通过Flume监控指定的文件夹,转移到HDFS里面去,实际大多数是放在Hive中因为Hive还有计算的能力,还有另外一条流程,实时数据,通常都是从分布式消息队列集群中读取的,比如Kafka,实时的log,实时的写入消息队列中,然后再由我们后端实时数据处理程序(storm、spark streaming),实时从kafka中读取数据,log日志
  5. 数据除了从Flume中来,也有可能直接使用kafka 的producer角色往kafka中直接生产数据
  6. 接下来就是大数据实时计算系统,比如说用storm、spark streaming开发的,可以实时的从kafka中拉取数据,然后对实时的数据进行处理和计算,这里可以有非常复杂的业务逻辑,甚至调用复杂的机器学习,数据挖掘,智能推荐的算法!然后实现实时的车辆调度,实时推荐等等。

1.2. 模块介绍

  • 卡扣流量分析 Spark Core
  • 卡扣车流量转化率 Spark Core
  • 各区域车流量最高top5的道路统计 SparkSQL
  • 稽查布控,道路实时拥堵统计 SparkStreaming

1.2.1. 卡扣流量分析模块介绍

根据使用者(平台使用者)指定的某些条件,筛选出指定的一批卡扣信息(比如根据区域、时间筛选)

检测卡扣状态,对于筛选出来的所有的卡口(不代表一个摄像头)信息统计

  1. 卡口正常数
  2. 异常数
  3. camera的正常数
  4. camera的异常数
  5. camera的详细信息(monitor_id:camera_id)
  6. 车流量最多的TonN卡扣号,延伸获取每一个卡扣的详细信息(Top5 )
  7. 随机抽取N个车辆信息,对这些数据可以进行多维度分析(因为随机抽取出来的N个车辆信息可以很权威的代表整个区域的车辆)
  8. 计算出经常高速通过的TopN卡口 (查看哪些卡扣经常被高速通过,高速,中速,正常,低速 根据三个速度段进行四次排序,高速通过的车辆数相同就比较中速通过的车辆数,以此来推)

1.3. 项目架构介绍

使用架构

J2EE平台,前端页面,在页面中可以指定任务类型,提交任务的参数(比如时间范围,区域设定)平台会接受到用户的提交请求,会调用底层封装的Spark-submit的shell脚本,怎么调用?运行的作业可以获取到用户指定的筛选条件,然后根据筛选条件进行计算。Spark任务的计算结果会写入到数据库中,比如MySQL,Redis等
最后J2EE平台可以通过前端页面,展示结果(表格或者图表的方式展示数据库中的结果)。

1.4. 数据介绍

1.4.1. 基本概念

卡扣号:在一条道路相同位置会有两个卡扣,这两个卡扣的编号是不同的,分别拍摄不同方向的车辆

摄像头编号:每一个卡扣拍摄的是一个方向的车辆,每一个方向都会有多个不同的车道,每一个车道对应一个摄像头,所以卡扣号与摄像头的对应关系是一对多的关系。

1.4.2. 表

monitor_flow_action表监控到的车流信息表
date日期 单位:天
monitor_id卡口号
camera_id摄像头编号
car车牌
action_time某个摄像头拍摄时间 单位:秒
speed通过卡扣的速度
road_id道路id
area_id区域ID
monitor_camera_info表每个卡扣对应的摄像头编号(标准表)
monitor_id卡扣编号
camera_id摄像头编号

具体内容见建表语句

1.5. 需求分析

  • 按条件筛选卡扣信息
    • 可以指定 不同的条件,时间范围、区域范围、卡扣号等 可以灵活的分析不同区域的卡扣信息
  • 监测卡扣状态
    • 对符合条件的卡扣信息,可以动态的检查每一个卡扣的状态,查看卡扣是否正常工作,也可以查看摄像头
  • 车流量最多的TonN卡扣
    • 查看哪些卡扣的车流量最高,为什么会出现这么高的车流量。分析原因,例如今天出城的车辆非常多,啥原因,今天进城的车辆非常多,啥原因? 集会还是聚集? 这个功能点里面也会拿到具体的车辆的信息,分析一下本地车牌造成的还是外地车牌?
  • 在符合条件的卡扣信息中随机抽取N个车辆信息
    • 随机抽取N辆车的信息,可以权威的代表整个区域的车辆,这时候可以分析这些车的轨迹,看一下在不同的时间点车辆的流动方向。以便于道路的规划。
  • 计算出经常高速通过的TopN卡口
    • 统计出是否存在飙车现象,或者经常进行超速行驶,可以在此处安装违章拍摄设备

2、数据流程

数据处理流程:

在这里插入图片描述

公司有集群没有数据
分布式爬取数据,多节点爬取数据,一般将数据爬取到flume中,或者将数据直接爬取放入HDFS中。
公司有集群有数据 
每天每时每刻在产生数据,数据直接清洗放在HBase或者HDFS中。或者日志数据直接使用flum导入分布式文件中。

一般有了数据之后又分为两个大的方向处理数据:

  1. 假设数据放在了HDFS集群中之后,一般下一步就要清洗数据,可以将数据通过Hive清洗,当然这里Hive一般使用外表,这样做的目的是可以将相同的数据只在HDFS中存入一份,避免过多的重复数据。清洗完成的数据一般又会放入Hive表中或者以结构化的数据放在HDFS上。得到清洗后的数据后一般会使用MR或者使用Spark来对数据进行分析处理,也可以对清洗后的数据使用SparkSQL来进行处理分析。之后,将分析完成的数据放入数据库中,如Redis,Mysql,Oracle中,供前端查询展示。
  2. 如果数据放入了flume中,一般将数据sink到kafka中,不同数据的种类放入不同的topic中。然后对打入kafka中的数据进行流式处理,一般可以使用storm或者SparkStreaming对数据进行清洗,分析处理,然后将结果放到数据库中,如Redis,Mysql,Oracle中,以供前端页面来查询展示。

3、spark任务

如何将Spark任务提交到集群运行?

最次也是脚本化执行Spark任务。

平台化提交Spark任务。流程图如下:

在这里插入图片描述

submit后先将数据存入Mysql中,task当做唯一主键,这样做是为了简化任务执行失败时,可以直接在数据库中查询之前的提交的业务参数,当任务失败后,下次retry时方便执行。

submit后可以使用java调用liunx系统脚本,通过taskId得到系统中的业务参数数据。

注意: 假如使用tomcat实现平台化,那么tomcat应该部署在客户端。

l java代码中如何执行liunx脚本?

Process proc = Runtime.**getRuntime**().exec(“sh 脚本”);
proc.waitFor();

在这里插入图片描述

4、模块功能

在这里插入图片描述

4.1、卡口流量分析

  • 全部使用SparkCore实现。

4.1.1. 卡口状态监控

在这里插入图片描述

4.1.1.1. 统计卡口坏摄像头
  def main(args: Array[String]): Unit = {
    //获取数据源
    val sparkSession = ContextUtils.getSparkSession("Hello01MonitorState")
    //读取数据
    MockDataUtil.mock2view(sparkSession)

    //------------------------------统计卡口摄像头通过的车辆的合计----------------------------
    import sparkSession.implicits._
    //开始读取数据
    val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-19' ")
    //开始操作
    val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1) + ":" + ele.getString(2), 1)).rdd
    //开始进行合并
    val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)


    //------------------------------统计卡口所有的摄像头----------------------------
    val cameraDataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION)
    val cameraRdd: RDD[(String, Int)] = cameraDataFrame.map(ele => ((ele.getString(0) + ":" + ele.getString(1)), 1)).rdd


    //------------------------------合并车流量和摄像头RDD----------------------------
    val allRDD: RDD[(String, (Option[Int], Int))] = flowRdd.rightOuterJoin(cameraRdd).filter(ele => ele._2._1.isEmpty)

    allRDD.foreach(println)
  }

在这里插入图片描述

4.1.1.2. 统计每个区域车流量
  def main(args: Array[String]): Unit = {
    //获取数据源
    val sparkSession = ContextUtils.getSparkSession("Hello02MonitorFlowCount")
    //读取数据
    MockDataUtil.mock2view(sparkSession)

    //------------------------------统计卡口通过的车辆的合计----------------------------
    import sparkSession.implicits._
    //开始读取数据
    val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ")
    //开始操作
    val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1), 1)).rdd
    //开始进行合并
    val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)


    flowRdd.foreach(println)
  }

在这里插入图片描述

4.1.1.3. 统计每个区域的摄像头
  def main(args: Array[String]): Unit = {
    val sparkSession = ContextUtils.getSparkSession("Hello03MonitorStateAnalyze")
    MockDataUtil.mock2view(sparkSession)
    //---------------------开始操作车流量信息,假设任务编号为1 日期参数为今天
    val flowInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd.map(row => (row.getString(1), row)).groupByKey().map(ele => {
      val monitorId: String = ele._1
      val cameraIdSet = new mutable.HashSet[String]()
      ele._2.foreach(row => cameraIdSet.add(row.getString(2)))
      //拼接字符串
      val info: String = Constants.FIELD_MONITOR_ID + "=" + monitorId + "|" + Constants.FIELD_AREA_ID + "=浦东新区|" + Constants.FIELD_CAMERA_IDS + "=" + cameraIdSet.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + cameraIdSet.size + "|" + Constants.FIELD_CAR_COUNT + "=" + ele._2.size
      //返回结果
      (monitorId, info)
    })
    //-----------------------开始操作摄像头数据
    val monitorInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION).rdd.map(row => (row.getString(0), row.getString(1))).groupByKey().map(ele => {
      val monitorId: String = ele._1
      //拼接字符串
      val info: String = Constants.FIELD_CAMERA_IDS + "=" + ele._2.toList.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + ele._2.size
      //返回结果
      (monitorId, info)
    })
    //-----------------------将数据Join到一起
    monitorInfo.leftOuterJoin(flowInfo).foreach(println)
  }

在这里插入图片描述

4.1.1.5 卡口流量分析 代码解析

在这里插入图片描述

4.1.2. 区域车流量Top3及其速度

  • 区域车流量top3
  def main(args: Array[String]): Unit = {

    val sparkSession = ContextUtils.getSparkSession("AreaTop3Road")
    MockDataUtil.mock2view(sparkSession)

    //开始计算
    val fRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd


    fRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {
      val area_road_random = ele._1
      val count = ele._2
      (area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)
    })
      .groupByKey()
      .map(ele => {
      val map = new mutable.HashMap[String, Int]()
      ele._2.foreach(e => {
        val key = e.split("_")(0)
        val value = e.split("_")(1).toInt
        map.put(key, map.get(key).getOrElse(0) + value)
      })
      "区划【" + ele._1 + "】车辆最多的三条道路分别为:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")
    })
      .foreach(println)

  }

在这里插入图片描述

  • 区域各路速度
  def main(args: Array[String]): Unit = {
    val sparkSession = ContextUtils.getSparkSession("AreaTop3Speed")
    MockDataUtil.mock2view(sparkSession)

    val sRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd

    sRdd.map(e=>{
      ((e.getString(7),e.getString(6)),e.getString(5).toInt)
    })
      .groupByKey()
      .map(e=>{
        val list: List[Int] = e._2.toList
        val i: Int = list.sum/list.size
        (e._1._1,(e._1._2,i))
      })
      .groupByKey()
      .map(e=>{
        val tuples = e._2.toList.sortBy(_._2).reverse.take(3)
        var strBui: StringBuilder = new StringBuilder
        for (i <- tuples ){
          val str: String = i._1 + "-均速度为:" + i._2
          strBui.append(">>>"+str)
        }
        (e._1,strBui)
      })
      .foreach(println)
  }

在这里插入图片描述

4.1.3. 区域中高低速数量

在这里插入图片描述

object Hello04MonitorTopNSpeed {
  def main(args: Array[String]): Unit = {
    val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")
    MockDataUtil.mock2view(sparkSession)
    //---------------------开始操作车流量信息,假设任务编号为1 日期参数为今天
    val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-20' ").rdd

    val monitor2speedRDD: RDD[(String, Iterable[String])] = flowRdd.map(row => (row.getString(1), row.getString(5))).groupByKey()

    val speedCount2monitorRDD: RDD[(SpeedCount, String)] = monitor2speedRDD.map(ele => {
      //获取卡口号
      val monitorId: String = ele._1
      //声明一个Map[0,60,100,120]
      var high = 0;
      var normal = 0;
      var low = 0;
      //获取所有的速度的车辆技术
      ele._2.foreach(speed => {
        //判断速度
        if (speed.toInt > 100) {
          high += 1
        } else if (speed.toInt > 60) {
          normal += 1
        } else {
          low += 1
        }
      })
      //创建速度对象
      (SpeedCount(high, normal, low), monitorId)
    })

    speedCount2monitorRDD.sortByKey(false).map(x => (x._2, x._1)).foreach(println)

  }
}

case class SpeedCount(high: Int, normal: Int, low: Int) extends Ordered[SpeedCount] with KryoRegistrator {
  override def compare(that: SpeedCount): Int = {
    var result = this.high - that.high
    if (result == 0) {
      result = this.normal - that.normal
      if (result == 0) {
        result = this.low - that.low
      }
    }
    return result
  }

  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(SpeedCount.getClass)
  }
}

在这里插入图片描述

4.1.3. 指定卡口对应卡口车辆轨迹

在这里插入图片描述

def main(args: Array[String]): Unit = {
    val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")
    MockDataUtil.mock2view(sparkSession)
    //获取数据
    val area01Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '01' ").rdd
    val area02Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '02' ").rdd

    val area01CarRdd = area01Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()
    val area02CarRdd = area02Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()

    area01CarRdd.join(area02CarRdd).foreach(println)

  }

在这里插入图片描述

4.2、行车轨迹

4.2.1. 车辆行车轨迹

在这里插入图片描述

 def main(args: Array[String]): Unit = {    
 	val sparkSession = ContextUtils.getSparkSession("AreaCar")    
 	MockDataUtil.mock2view(sparkSession)
	 //查询 车子行驶轨迹 跟车分析    
 	val c1Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd    
 	val carRdd: RDD[(String, StringBuilder)] = c1Rdd.map(e => {      (e.getString(3), (e.getString(4), e.getString(6), e.getString(2)))    })
 		.groupByKey()      
 		.map(e => {        
 			val tuples: List[(String, String, String)] = e._2.toList.sortBy(_._1)        
 			val list = new StringBuilder        
 			for (i <- tuples) {          
 				//println(i)          
 				val str: String = i._2 + ":" + i._3          
 				list.append(str + "-")        
 			}        
 			(e._1, list)      
 	})    
 	//carRdd.foreach(println) 
}

在这里插入图片描述

4.2.2. 车辆套牌

在这里插入图片描述

  def main(args: Array[String]): Unit = {

    val sparkSession = ContextUtils.getSparkSession("AreaCar")
    MockDataUtil.mock2view(sparkSession)
//假设任何的卡口距离都是 10分钟车程 ,如果同一分钟出现在不同的卡口就怀疑是套牌
    
    val deckRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd
    deckRdd.map(e => {
        val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        (e.getString(3), (dateFormat.parse(e.getString(4)),e.getString(1)))
    }).groupByKey(1)
      .map(e => {
          val list: List[(util.Date, String)] = e._2.toList.sortBy(x=>x._1)
          var bool = false
          var d: util.Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-23 00:00:00")
          var mid="?"
          for (i <- list) {
              if (d.getTime - i._1.getTime < 600000 && i._2!=mid )
              bool = true
              d = i._1
              mid=i._2
          }
          (e._1, bool)
      })
      .filter(f => f._2)
      .foreach(println)
  }

在这里插入图片描述

4.2.3. 车辆抽样–蓄水池抽样算法

在这里插入图片描述

    def main(args: Array[String]): Unit = {
    
      val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")
      MockDataUtil.mock2view(sparkSession)
    
      //获取数据
      val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-21' ").rdd
      //yyyy-MM-dd_HH , row
      val hourRDD: RDD[(String, Row)] = flowRdd.map(row => (DateUtils.getDateHour(row.getString(4)), row))
    
      //车流量的总数,并进行广播
      val flowAllCount: Long = hourRDD.count()
      val broadcastFlowAllCount: Broadcast[Long] = sparkSession.sparkContext.broadcast(flowAllCount)
    
      //计算每个小时的比例 并进行广播
      val hourRatio: collection.Map[String, Double] = hourRDD.countByKey().map(e => {
          (e._1, e._2 * 1.0 / broadcastFlowAllCount.value)
      })
      val broadcastHourRatio: Broadcast[collection.Map[String, Double]] = sparkSession.sparkContext.broadcast(hourRatio)
    
      //开始进行抽样
      val sampleRDD: RDD[Row] = hourRDD.groupByKey().flatMap(ele => {
          val hour: String = ele._1
          val list: List[Row] = ele._2.iterator.toList
          
          //计算本时段要抽样的数据量
          val sampleRatio: Double = broadcastHourRatio.value.get(hour).getOrElse(0)
          val sampleNum: Long = Math.round(sampleRatio * 100)
          //开始进行取样(蓄水池抽样)
          val sampleList: ListBuffer[Row] = new ListBuffer[Row]()
          sampleList.appendAll(list.take(sampleNum.toInt))
          for (i <- sampleNum until list.size) {
              //随机生成一个数字
              val num = (Math.random() * list.size).toInt
              if (num < sampleNum) {
                  sampleList.update(num, list(i.toInt))
              }
          }
          sampleList
      })
      sampleRDD.foreach(println)
  }

在这里插入图片描述

4.2.4. 道路转换率

在这里插入图片描述

  def main(args: Array[String]): Unit = {
     
    //创建会话
    val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")
    MockDataUtil.mock2view(sparkSession)
    //开始计算
    val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd
     
    //计算每个卡口的总通车量
    val monitorCountMap: collection.Map[String, Long] = flowRdd.map(row => (row.getString(1), row)).countByKey()
       
    //计算卡口到卡口的通行率
    val sortRDD: RDD[(String, List[Row])] = flowRdd.map(row => (row.getString(3), row)).groupByKey().map(ele => (ele._1, ele._2.iterator.toList.sortBy(_.getString(4))))
    val m2mMap: collection.Map[String, Long] = sortRDD.flatMap(ele => {
        //存放映射关系
        val map: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]()
        val list: List[Row] = ele._2.toList
        for (i <- 0 until list.size; j <- i + 1 until list.size) {
            //拼接Key
            val key = list(i).getString(1) + "->" + list(j).getString(1)
            map.put(key, map.get(key).getOrElse(0) + 1);
        }

        //返回结果
        map.toList
    }).countByKey()
     
    //开始进行计算
    m2mMap.foreach(ele => {
        println("卡口[" + ele._1 + "]的转换率为:" + ele._2.toDouble / monitorCountMap.get(ele._1.split("->")(0)).get)
    })
}

在这里插入图片描述

4.3、区域道路流量Top3

  • 数据倾斜问题
    • key添加后缀扩组,减小数据倾斜

4.3.1 RDD解决

  def main(args: Array[String]): Unit = {
    
    //创建会话
    val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")
    MockDataUtil.mock2view(sparkSession)
    //开始计算
    val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd
    
    //开始计算
    flowRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {
        val area_road_random = ele._1
        val count = ele._2
        (area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)
    }).groupByKey().map(ele => {
        val map = new mutable.HashMap[String, Int]()
        ele._2.foreach(e => {
            val key = e.split("_")(0)
            val value = e.split("_")(1).toInt
            map.put(key, map.get(key).getOrElse(0) + value)
        })
        "区划【" + ele._1 + "】车辆最多的三条道路分别为:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")
    }).foreach(println)
}

4.3.2. Java连接Hive—SQL解决

在这里插入图片描述

  • hive中导入模拟数据

  1. 打开hive手动创建database 为traffic 。

  2. 生产模拟数据,将模拟数据提交到linux上。

  3. 将data2hive代码 打包,提交到客户端运行。

  4. 查询hive中数据库为traffic中monitor_flow_action和monitor_camera_info两张数据库表是否导入数据。

若出现mysql数据库乱码的问题

在安装mysql的linux节点路径/etc/my.cnf中加入:

  • 在[client]下添加
    default-character-set=utf8
  • 在[mysqld]下添加
    default-character-set=utf8

如图:

在这里插入图片描述

运行项目中AreaTop3RoadFlowAnalyze代码

在这里插入图片描述

4.4、Streaming 实时

4.4.1. 道路实时拥堵情况 --kafka

  • 生产者
public class MockRealTimeData extends Thread {

    private static final Random random = new Random();
    private static final String[] locations = new String[]{"鲁", "沪", "沪", "沪", "沪", "京", "京", "深", "京", "京"};
    private static final String topic = "RoadRealTimeLog";
    private KafkaProducer<String, String> producer;

    
    public MockRealTimeData() {
        
        //创建配置文件列表
        Properties properties = new Properties();
        // kafka地址,多个地址用逗号分割
        properties.put("bootstrap.servers", "192.168.100.101:9092,192.168.100.102:9092,192.168.100.103:9092");
        //设置写出数据的格式
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //写出的应答方式
        properties.put("acks", "all");
        //错误重试
        properties.put("retries", 1);
        //批量写出
        properties.put("batch.size", 16384);
        //创建生产者对象
        producer = new KafkaProducer<String, String>(properties);
    }
    
    public void run() {
        while (true) {
            String date = DateUtils.getTodayDate();
            String baseActionTime = date + " " + StringUtils.fullFill(random.nextInt(24) + "");
            baseActionTime = date + " " + StringUtils.fullFill((Integer.parseInt(baseActionTime.split(" ")[1]) + 1) + "");
            String actionTime = baseActionTime + ":" + StringUtils.fullFill(random.nextInt(60) + "") + ":" + StringUtils.fullFill(random.nextInt(60) + "");
            String monitorId = StringUtils.fullFill(4, random.nextInt(9) + "");
            String car = locations[random.nextInt(10)] + (char) (65 + random.nextInt(26)) + StringUtils.fullFill(5, random.nextInt(99999) + "");
            String speed = random.nextInt(260) + "";
            String roadId = random.nextInt(50) + 1 + "";
            String cameraId = StringUtils.fullFill(5, random.nextInt(9999) + "");
            String areaId = StringUtils.fullFill(2, random.nextInt(8) + "");
            
            //封装消息对象
            ProducerRecord<String, String> banRecordBlue = new ProducerRecord<>(topic, "traffic_" + monitorId, date + "\t" + monitorId + "\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t" + areaId);
            //发送消息
            producer.send(banRecordBlue);
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 启动Kafka Producer
     *
     * @param args
     */
    public static void main(String[] args) {
        MockRealTimeData mockRealTimeData = new MockRealTimeData();
        mockRealTimeData.start();
    }
}
  • 消费者
object Hello10RealRoadState {
    def main(args: Array[String]): Unit = {

      //创建Conf
        val sparkConf = new SparkConf().setAppName("Hello10RealRoadState").setMaster("local[2]")
        val streamingContext = new StreamingContext(sparkConf, Seconds(2))

        //创建Kafka读取数据
        //配置信息
        val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "traffic",
        "auto.offset.reset" -> "earliest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val topics = Array("RoadRealTimeLog")
        //开始创建Kafka
        val linesDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

       //窗口函数
       linesDStream.map(_.value())
            .window(Seconds(10), Seconds(10))
            .map(ele => (ele.split("\t")(1), ele.split("\t")(5).toInt))
            .groupByKey()
            .map(ele => {
                (ele._1, ele._2.toList.sum.toDouble / ele._2.size)
            })
            .foreachRDD(rdd => {
                rdd.foreach(ele => {
                    println(ele._1 + "--" + ele._2)
                })
            })

        //启动任务
        streamingContext.start()
        streamingContext.awaitTermination()  
    }
}

在这里插入图片描述

4.4.2. 动态改变广播变量(布控)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-29 09:09:56  更:2021-08-29 09:28:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 15:55:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码