spark 车流量项目实战
1、车流向项目介绍
1.1. 数据采集
数据从哪儿来?
- 我们知道数据来来源,比如网站,APP或者工业设备(比如卡口拍摄设备),实现实时数据采集,它首先有非常重要的一点就是所谓的埋点,也就是说,埋点,在网站的哪个页面哪些操作发生时,前端的代码比如javascript或者app android/ios,就通过网络请求Ajax; socket向后端的服务器发送日志数据。
- 如果是卡口信息,那么每次拍摄的信息都会传输到服务器端。
- 首先就是说网站或者页面设置埋点,那么就是你要跟前端的开发人员约定好,在哪些页面哪些操作发生的时候,网站的话就通过ajax引擎,APP的话就通过Socket网络请求,向后端的服务器发送指定格式的日志数据。卡口数据的话,是和厂商定制数据格式的,数据以指定的格式向服务器发送实时的数据。
- 接着通过Flume监控指定的文件夹,转移到HDFS里面去,实际大多数是放在Hive中因为Hive还有计算的能力,还有另外一条流程,实时数据,通常都是从分布式消息队列集群中读取的,比如Kafka,实时的log,实时的写入消息队列中,然后再由我们后端实时数据处理程序(storm、spark streaming),实时从kafka中读取数据,log日志
- 数据除了从Flume中来,也有可能直接使用kafka 的producer角色往kafka中直接生产数据。
- 接下来就是大数据实时计算系统,比如说用storm、spark streaming开发的,可以实时的从kafka中拉取数据,然后对实时的数据进行处理和计算,这里可以有非常复杂的业务逻辑,甚至调用复杂的机器学习,数据挖掘,智能推荐的算法!然后实现实时的车辆调度,实时推荐等等。
1.2. 模块介绍
- 卡扣流量分析 Spark Core
- 卡扣车流量转化率 Spark Core
- 各区域车流量最高top5的道路统计 SparkSQL
- 稽查布控,道路实时拥堵统计 SparkStreaming
1.2.1. 卡扣流量分析模块介绍
根据使用者(平台使用者)指定的某些条件,筛选出指定的一批卡扣信息(比如根据区域、时间筛选)
检测卡扣状态,对于筛选出来的所有的卡口(不代表一个摄像头)信息统计
- 卡口正常数
- 异常数
- camera的正常数
- camera的异常数
- camera的详细信息(monitor_id:camera_id)
- 车流量最多的TonN卡扣号,延伸获取每一个卡扣的详细信息(Top5 )
- 随机抽取N个车辆信息,对这些数据可以进行多维度分析(因为随机抽取出来的N个车辆信息可以很权威的代表整个区域的车辆)
- 计算出经常高速通过的TopN卡口 (查看哪些卡扣经常被高速通过,高速,中速,正常,低速 根据三个速度段进行四次排序,高速通过的车辆数相同就比较中速通过的车辆数,以此来推)
1.3. 项目架构介绍
使用架构
J2EE平台,前端页面,在页面中可以指定任务类型,提交任务的参数(比如时间范围,区域设定)平台会接受到用户的提交请求,会调用底层封装的Spark-submit的shell脚本,怎么调用?运行的作业可以获取到用户指定的筛选条件,然后根据筛选条件进行计算。Spark任务的计算结果会写入到数据库中,比如MySQL,Redis等 最后J2EE平台可以通过前端页面,展示结果(表格或者图表的方式展示数据库中的结果)。
1.4. 数据介绍
1.4.1. 基本概念
卡扣号:在一条道路相同位置会有两个卡扣,这两个卡扣的编号是不同的,分别拍摄不同方向的车辆
摄像头编号:每一个卡扣拍摄的是一个方向的车辆,每一个方向都会有多个不同的车道,每一个车道对应一个摄像头,所以卡扣号与摄像头的对应关系是一对多的关系。
1.4.2. 表
monitor_flow_action表 | 监控到的车流信息表 |
---|
date | 日期 单位:天 | monitor_id | 卡口号 | camera_id | 摄像头编号 | car | 车牌 | action_time | 某个摄像头拍摄时间 单位:秒 | speed | 通过卡扣的速度 | road_id | 道路id | area_id | 区域ID |
monitor_camera_info表 | 每个卡扣对应的摄像头编号(标准表) |
---|
monitor_id | 卡扣编号 | camera_id | 摄像头编号 |
具体内容见建表语句。
1.5. 需求分析
- 按条件筛选卡扣信息
- 可以指定 不同的条件,时间范围、区域范围、卡扣号等 可以灵活的分析不同区域的卡扣信息
- 监测卡扣状态
- 对符合条件的卡扣信息,可以动态的检查每一个卡扣的状态,查看卡扣是否正常工作,也可以查看摄像头
- 车流量最多的TonN卡扣
- 查看哪些卡扣的车流量最高,为什么会出现这么高的车流量。分析原因,例如今天出城的车辆非常多,啥原因,今天进城的车辆非常多,啥原因? 集会还是聚集? 这个功能点里面也会拿到具体的车辆的信息,分析一下本地车牌造成的还是外地车牌?
- 在符合条件的卡扣信息中随机抽取N个车辆信息
- 随机抽取N辆车的信息,可以权威的代表整个区域的车辆,这时候可以分析这些车的轨迹,看一下在不同的时间点车辆的流动方向。以便于道路的规划。
- 计算出经常高速通过的TopN卡口
- 统计出是否存在飙车现象,或者经常进行超速行驶,可以在此处安装违章拍摄设备
2、数据流程
数据处理流程:
公司有集群没有数据
分布式爬取数据,多节点爬取数据,一般将数据爬取到flume中,或者将数据直接爬取放入HDFS中。
公司有集群有数据
每天每时每刻在产生数据,数据直接清洗放在HBase或者HDFS中。或者日志数据直接使用flum导入分布式文件中。
一般有了数据之后又分为两个大的方向处理数据:
- 假设数据放在了HDFS集群中之后,一般下一步就要清洗数据,可以将数据通过Hive清洗,当然这里Hive一般使用外表,这样做的目的是可以将相同的数据只在HDFS中存入一份,避免过多的重复数据。清洗完成的数据一般又会放入Hive表中或者以结构化的数据放在HDFS上。得到清洗后的数据后一般会使用MR或者使用Spark来对数据进行分析处理,也可以对清洗后的数据使用SparkSQL来进行处理分析。之后,将分析完成的数据放入数据库中,如Redis,Mysql,Oracle中,供前端查询展示。
- 如果数据放入了flume中,一般将数据sink到kafka中,不同数据的种类放入不同的topic中。然后对打入kafka中的数据进行流式处理,一般可以使用storm或者SparkStreaming对数据进行清洗,分析处理,然后将结果放到数据库中,如Redis,Mysql,Oracle中,以供前端页面来查询展示。
3、spark任务
如何将Spark任务提交到集群运行?
最次也是脚本化执行Spark任务。
平台化提交Spark任务。流程图如下:
submit后先将数据存入Mysql中,task当做唯一主键,这样做是为了简化任务执行失败时,可以直接在数据库中查询之前的提交的业务参数,当任务失败后,下次retry时方便执行。
submit后可以使用java调用liunx系统脚本,通过taskId得到系统中的业务参数数据。
注意: 假如使用tomcat实现平台化,那么tomcat应该部署在客户端。
l java代码中如何执行liunx脚本?
Process proc = Runtime.**getRuntime**().exec(“sh 脚本”);
proc.waitFor();
4、模块功能
4.1、卡口流量分析
4.1.1. 卡口状态监控
4.1.1.1. 统计卡口坏摄像头
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("Hello01MonitorState")
MockDataUtil.mock2view(sparkSession)
import sparkSession.implicits._
val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-19' ")
val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1) + ":" + ele.getString(2), 1)).rdd
val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)
val cameraDataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION)
val cameraRdd: RDD[(String, Int)] = cameraDataFrame.map(ele => ((ele.getString(0) + ":" + ele.getString(1)), 1)).rdd
val allRDD: RDD[(String, (Option[Int], Int))] = flowRdd.rightOuterJoin(cameraRdd).filter(ele => ele._2._1.isEmpty)
allRDD.foreach(println)
}
4.1.1.2. 统计每个区域车流量
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("Hello02MonitorFlowCount")
MockDataUtil.mock2view(sparkSession)
import sparkSession.implicits._
val dataFrame: DataFrame = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ")
val mcRdd: RDD[(String, Int)] = dataFrame.map(ele => Tuple2(ele.getString(1), 1)).rdd
val flowRdd: RDD[(String, Int)] = mcRdd.reduceByKey(_ + _)
flowRdd.foreach(println)
}
4.1.1.3. 统计每个区域的摄像头
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("Hello03MonitorStateAnalyze")
MockDataUtil.mock2view(sparkSession)
val flowInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd.map(row => (row.getString(1), row)).groupByKey().map(ele => {
val monitorId: String = ele._1
val cameraIdSet = new mutable.HashSet[String]()
ele._2.foreach(row => cameraIdSet.add(row.getString(2)))
val info: String = Constants.FIELD_MONITOR_ID + "=" + monitorId + "|" + Constants.FIELD_AREA_ID + "=浦东新区|" + Constants.FIELD_CAMERA_IDS + "=" + cameraIdSet.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + cameraIdSet.size + "|" + Constants.FIELD_CAR_COUNT + "=" + ele._2.size
(monitorId, info)
})
val monitorInfo: RDD[(String, String)] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_CAMERA_ACTION).rdd.map(row => (row.getString(0), row.getString(1))).groupByKey().map(ele => {
val monitorId: String = ele._1
val info: String = Constants.FIELD_CAMERA_IDS + "=" + ele._2.toList.mkString("-") + "|" + Constants.FIELD_CAMERA_COUNT + "=" + ele._2.size
(monitorId, info)
})
monitorInfo.leftOuterJoin(flowInfo).foreach(println)
}
4.1.1.5 卡口流量分析 代码解析
4.1.2. 区域车流量Top3及其速度
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("AreaTop3Road")
MockDataUtil.mock2view(sparkSession)
val fRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd
fRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {
val area_road_random = ele._1
val count = ele._2
(area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)
})
.groupByKey()
.map(ele => {
val map = new mutable.HashMap[String, Int]()
ele._2.foreach(e => {
val key = e.split("_")(0)
val value = e.split("_")(1).toInt
map.put(key, map.get(key).getOrElse(0) + value)
})
"区划【" + ele._1 + "】车辆最多的三条道路分别为:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")
})
.foreach(println)
}
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("AreaTop3Speed")
MockDataUtil.mock2view(sparkSession)
val sRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd
sRdd.map(e=>{
((e.getString(7),e.getString(6)),e.getString(5).toInt)
})
.groupByKey()
.map(e=>{
val list: List[Int] = e._2.toList
val i: Int = list.sum/list.size
(e._1._1,(e._1._2,i))
})
.groupByKey()
.map(e=>{
val tuples = e._2.toList.sortBy(_._2).reverse.take(3)
var strBui: StringBuilder = new StringBuilder
for (i <- tuples ){
val str: String = i._1 + "-均速度为:" + i._2
strBui.append(">>>"+str)
}
(e._1,strBui)
})
.foreach(println)
}
4.1.3. 区域中高低速数量
object Hello04MonitorTopNSpeed {
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")
MockDataUtil.mock2view(sparkSession)
val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-20' ").rdd
val monitor2speedRDD: RDD[(String, Iterable[String])] = flowRdd.map(row => (row.getString(1), row.getString(5))).groupByKey()
val speedCount2monitorRDD: RDD[(SpeedCount, String)] = monitor2speedRDD.map(ele => {
val monitorId: String = ele._1
var high = 0;
var normal = 0;
var low = 0;
ele._2.foreach(speed => {
if (speed.toInt > 100) {
high += 1
} else if (speed.toInt > 60) {
normal += 1
} else {
low += 1
}
})
(SpeedCount(high, normal, low), monitorId)
})
speedCount2monitorRDD.sortByKey(false).map(x => (x._2, x._1)).foreach(println)
}
}
case class SpeedCount(high: Int, normal: Int, low: Int) extends Ordered[SpeedCount] with KryoRegistrator {
override def compare(that: SpeedCount): Int = {
var result = this.high - that.high
if (result == 0) {
result = this.normal - that.normal
if (result == 0) {
result = this.low - that.low
}
}
return result
}
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(SpeedCount.getClass)
}
}
4.1.3. 指定卡口对应卡口车辆轨迹
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")
MockDataUtil.mock2view(sparkSession)
val area01Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '01' ").rdd
val area02Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' and area_id = '02' ").rdd
val area01CarRdd = area01Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()
val area02CarRdd = area02Rdd.map(row => (row.getString(3), row.getString(7))).groupByKey()
area01CarRdd.join(area02CarRdd).foreach(println)
}
4.2、行车轨迹
4.2.1. 车辆行车轨迹
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("AreaCar")
MockDataUtil.mock2view(sparkSession)
val c1Rdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd
val carRdd: RDD[(String, StringBuilder)] = c1Rdd.map(e => { (e.getString(3), (e.getString(4), e.getString(6), e.getString(2))) })
.groupByKey()
.map(e => {
val tuples: List[(String, String, String)] = e._2.toList.sortBy(_._1)
val list = new StringBuilder
for (i <- tuples) {
val str: String = i._2 + ":" + i._3
list.append(str + "-")
}
(e._1, list)
})
}
4.2.2. 车辆套牌
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("AreaCar")
MockDataUtil.mock2view(sparkSession)
val deckRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd
deckRdd.map(e => {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
(e.getString(3), (dateFormat.parse(e.getString(4)),e.getString(1)))
}).groupByKey(1)
.map(e => {
val list: List[(util.Date, String)] = e._2.toList.sortBy(x=>x._1)
var bool = false
var d: util.Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-23 00:00:00")
var mid="?"
for (i <- list) {
if (d.getTime - i._1.getTime < 600000 && i._2!=mid )
bool = true
d = i._1
mid=i._2
}
(e._1, bool)
})
.filter(f => f._2)
.foreach(println)
}
4.2.3. 车辆抽样–蓄水池抽样算法
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("Hello04MonitorTopNSpeed")
MockDataUtil.mock2view(sparkSession)
val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-21' ").rdd
val hourRDD: RDD[(String, Row)] = flowRdd.map(row => (DateUtils.getDateHour(row.getString(4)), row))
val flowAllCount: Long = hourRDD.count()
val broadcastFlowAllCount: Broadcast[Long] = sparkSession.sparkContext.broadcast(flowAllCount)
val hourRatio: collection.Map[String, Double] = hourRDD.countByKey().map(e => {
(e._1, e._2 * 1.0 / broadcastFlowAllCount.value)
})
val broadcastHourRatio: Broadcast[collection.Map[String, Double]] = sparkSession.sparkContext.broadcast(hourRatio)
val sampleRDD: RDD[Row] = hourRDD.groupByKey().flatMap(ele => {
val hour: String = ele._1
val list: List[Row] = ele._2.iterator.toList
val sampleRatio: Double = broadcastHourRatio.value.get(hour).getOrElse(0)
val sampleNum: Long = Math.round(sampleRatio * 100)
val sampleList: ListBuffer[Row] = new ListBuffer[Row]()
sampleList.appendAll(list.take(sampleNum.toInt))
for (i <- sampleNum until list.size) {
val num = (Math.random() * list.size).toInt
if (num < sampleNum) {
sampleList.update(num, list(i.toInt))
}
}
sampleList
})
sampleRDD.foreach(println)
}
4.2.4. 道路转换率
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")
MockDataUtil.mock2view(sparkSession)
val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd
val monitorCountMap: collection.Map[String, Long] = flowRdd.map(row => (row.getString(1), row)).countByKey()
val sortRDD: RDD[(String, List[Row])] = flowRdd.map(row => (row.getString(3), row)).groupByKey().map(ele => (ele._1, ele._2.iterator.toList.sortBy(_.getString(4))))
val m2mMap: collection.Map[String, Long] = sortRDD.flatMap(ele => {
val map: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]()
val list: List[Row] = ele._2.toList
for (i <- 0 until list.size; j <- i + 1 until list.size) {
val key = list(i).getString(1) + "->" + list(j).getString(1)
map.put(key, map.get(key).getOrElse(0) + 1);
}
map.toList
}).countByKey()
m2mMap.foreach(ele => {
println("卡口[" + ele._1 + "]的转换率为:" + ele._2.toDouble / monitorCountMap.get(ele._1.split("->")(0)).get)
})
}
4.3、区域道路流量Top3
4.3.1 RDD解决
def main(args: Array[String]): Unit = {
val sparkSession = ContextUtils.getSparkSession("Hello07MonitorConvertRatio")
MockDataUtil.mock2view(sparkSession)
val flowRdd: RDD[Row] = sparkSession.sql("select * from " + MockDataUtil.MONITOR_FLOW_ACTION + " where date = '2021-08-23' ").rdd
flowRdd.map(row => (row.getString(7) + "_" + row.getString(6) + "&" + (Math.random() * 30 + 10).toInt, 1)).reduceByKey(_ + _).map(ele => {
val area_road_random = ele._1
val count = ele._2
(area_road_random.split("_")(0), area_road_random.split("_")(1).split("&")(0) + "_" + count)
}).groupByKey().map(ele => {
val map = new mutable.HashMap[String, Int]()
ele._2.foreach(e => {
val key = e.split("_")(0)
val value = e.split("_")(1).toInt
map.put(key, map.get(key).getOrElse(0) + value)
})
"区划【" + ele._1 + "】车辆最多的三条道路分别为:" + map.toList.sortBy(_._2).takeRight(3).reverse.mkString("-")
}).foreach(println)
}
4.3.2. Java连接Hive—SQL解决
-
打开hive手动创建database 为traffic 。 -
生产模拟数据,将模拟数据提交到linux上。 -
将data2hive代码 打包,提交到客户端运行。 -
查询hive中数据库为traffic中monitor_flow_action和monitor_camera_info两张数据库表是否导入数据。
若出现mysql数据库乱码的问题
在安装mysql的linux节点路径/etc/my.cnf中加入:
- 在[client]下添加
default-character-set=utf8 - 在[mysqld]下添加
default-character-set=utf8
如图:
运行项目中AreaTop3RoadFlowAnalyze代码
4.4、Streaming 实时
4.4.1. 道路实时拥堵情况 --kafka
public class MockRealTimeData extends Thread {
private static final Random random = new Random();
private static final String[] locations = new String[]{"鲁", "沪", "沪", "沪", "沪", "京", "京", "深", "京", "京"};
private static final String topic = "RoadRealTimeLog";
private KafkaProducer<String, String> producer;
public MockRealTimeData() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.100.101:9092,192.168.100.102:9092,192.168.100.103:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "all");
properties.put("retries", 1);
properties.put("batch.size", 16384);
producer = new KafkaProducer<String, String>(properties);
}
public void run() {
while (true) {
String date = DateUtils.getTodayDate();
String baseActionTime = date + " " + StringUtils.fullFill(random.nextInt(24) + "");
baseActionTime = date + " " + StringUtils.fullFill((Integer.parseInt(baseActionTime.split(" ")[1]) + 1) + "");
String actionTime = baseActionTime + ":" + StringUtils.fullFill(random.nextInt(60) + "") + ":" + StringUtils.fullFill(random.nextInt(60) + "");
String monitorId = StringUtils.fullFill(4, random.nextInt(9) + "");
String car = locations[random.nextInt(10)] + (char) (65 + random.nextInt(26)) + StringUtils.fullFill(5, random.nextInt(99999) + "");
String speed = random.nextInt(260) + "";
String roadId = random.nextInt(50) + 1 + "";
String cameraId = StringUtils.fullFill(5, random.nextInt(9999) + "");
String areaId = StringUtils.fullFill(2, random.nextInt(8) + "");
ProducerRecord<String, String> banRecordBlue = new ProducerRecord<>(topic, "traffic_" + monitorId, date + "\t" + monitorId + "\t" + cameraId + "\t" + car + "\t" + actionTime + "\t" + speed + "\t" + roadId + "\t" + areaId);
producer.send(banRecordBlue);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MockRealTimeData mockRealTimeData = new MockRealTimeData();
mockRealTimeData.start();
}
}
object Hello10RealRoadState {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("Hello10RealRoadState").setMaster("local[2]")
val streamingContext = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "traffic",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("RoadRealTimeLog")
val linesDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
linesDStream.map(_.value())
.window(Seconds(10), Seconds(10))
.map(ele => (ele.split("\t")(1), ele.split("\t")(5).toInt))
.groupByKey()
.map(ele => {
(ele._1, ele._2.toList.sum.toDouble / ele._2.size)
})
.foreachRDD(rdd => {
rdd.foreach(ele => {
println(ele._1 + "--" + ele._2)
})
})
streamingContext.start()
streamingContext.awaitTermination()
}
}
4.4.2. 动态改变广播变量(布控)
|