数据处理逻辑:
数据读取
数据脱敏
预处理,过滤,字段格式转换
生成点(时间<>站点)和轨迹
生成倒排索引
两两相似性计算
基础知识:
spark.read.format(“csv”) printSchema(…) show(…) count(…) filter(…) select(…) where(…) dropDuplicates(…) join( ) udf UserDefinedFunction 用户自定义函数 withColumn 以原有的列为输入,增加一列, concat_ws(),将两列合并 groupBy agg,分组聚集 map(…) flatMap(…)
一.数据读取
val inputpath="/data/smartdata/*"
val dfraw= spark.read.format("csv").option("header",value = true).option("encoding","utf-8").load(inputpath)
val dfraw= spark.read.format("csv").option("header",value = true).option("encoding","utf-8").load("/data/smartdata/*")
源数据格式 卡号,交易时间,线路站点,交易类型 1990733,2018-10-24T13:37:34.000Z,田贝,地铁入站
此时dfraw是一个DataFrame 小结: DataFrame的常用操作 ①printSchema() 打印DataFrame的模式(Schema)信息 ②select() 选取部分列
对列名称进行重命名 ③filter() 条件查询 dfraw.filter(dfraw(“交易类型”)==“地铁入站”).show() ④groupBy() 对记录进行分组 dfraw.groupBy(“线路站点”).count().show ⑤sort() 排序 dfraw.sort(dfraw(“交易时间”).desc,dfraw(“线路站点”).asc).show 根据交易时间进行降序排序,如果相同就按照线路站点升序排序 ⑥创建 spark.read.json(“hello.json”) spark.read.parquet(“hello.parquet”) spark.read.csv(“hello.csv”) 或者 spark.read.format(“json”).load(“people.json”) spark.read.format(“parquet”).load(“people.parquet”) spark.read.format(“csv”).load(“people.csv”) ⑦保存 spark.write.json(“hello.json”) spark.write.parquet(“hello.parquet”) spark.write.csv(“hello.csv”) 或者 spark.write.format(“json”).save(“hello.json”) spark.write.format(“parquet”).save(“hello.parquet”) spark.write.format(“csv”).save(“hello.csv”) 还可以只选择其中的几列进行保存 dfraw.select(“name”,“age”).write.format(“csv”).save(“file:///user/local/spark/mycode/newhello.csv”) 此处的newhello.csv为目录名 保存成文本文件 dfraw.rdd.saveAsTextFile(“file:///user/local/spark/mycode/newhello.txt”)
二.数据脱敏
为防止个人信息泄露,对卡号进行变换 1.提取所有卡号并去重,形成一个新RDD
val dfcardid = dfraw.select("卡号").distinct()
dfcardid.show(10)
dfraw.count
dfcardid.count
小结: DataSet是分布式的数据集合
2.在新RDD中增加一列,为卡号对应的序号,将RDD转换为DataFrame
val dfcardid01 =dfcardid.rdd.map(x => x.toString().substring(1,x.toString().length - 1)).zipWithIndex()
val dfcardid02 = dfcardid01.map(x =>(x._1.toString,x._2.toString)).toDF("cardid","idx")
dfcardid01.take(10).foreach(println)
dfcardid02.show(10)
3.统一序号格式,位数不足则在前补充0
import java.text.DecimalFormat
def reformat(idx:String):String = {
val f = new DecimalFormat("000000000")
val rs =f.format(idx.toLong)
rs
}
val addCol_reformat=udf(reformat _)
val dfcardid03=dfcardid02.withColumn("index",addCol_reformat(dfcardid02("idx")))
dfcardid03.show(10)
4.将两个DataFrame连接(join),筛选卡号相同的行
val dfcardid04 =dfraw.join(dfcardid03)
val dfcardid05=dfcardid04.filter($"卡号"===$"cardid")
dfcardid04.show(10)
dfcardid05.show(10)
dfcardid05.show(10)
5.选择所需要的列
val dfcardid06=dfcardid05.select("index","交易时间","交易类型","线路站点")
dfcardid06.count
dfraw.count
dfcardid06.count dfraw.count dfcardid.count dfstation.count
三.预处理
1.将中文字段转化为英文字段
val schemas= Seq("cardid","time","type","rawstation")
val df01=dfcardid06.toDF(schemas: _*)
dfraw.printSchema
df01.printSchema
df01.show(10)
2.过滤:入站出站记录
val df02=df01.filter(col("type").contains("地铁入站") || col("type").contains("地铁出站"))
//选择三个字段;过滤站点为空的字段;去除重复记录
var df03=df02.select("cardid","time","rawstation").where("rawstation is not null").dropDuplicates("cardid","time","rawstation")
var df03A=df03.where("time like '2018-10%'")
df02.show(10)
df03.show(10)
df03A.show(10)
df02.show(5) df03.show(5) df03A.show(5)
3.统一站点名称
def replaceStationName(station:String):String ={
var dststation=station
if(!station.endsWith("站"))
dststation = station + "站"
else
dststation = dststation
if(dststation.equals("马鞍山站"))
dststation="马安山站"
else if(dststation.equals("深圳大学站"))
dststation="深大站"
else if(dststation.equals("?I岭站"))
dststation="孖岭站"
else { }
dststation
}
val addCol_replaceStation = udf(replaceStationName _)
val df04=df03.withColumn("station",addCol_replaceStation(df03("rawstation")))
df04.show(10)
4.时间格式转化 其中时间列名,改变前是time,改变后是captime 原时间格式:time yyyy-MM-ddTHH:mm:ss.000Z 2018-10-24T13:37:34.000Z 2018-10-19T08:35:25.000Z
新时间格式:captime yyyy-MM-dd HH:mm:ss 2018-10-19 08:35:25 2018-10-24 13:37:34
def replacegjTime(captime:String):String =
{
val oldstr = captime
val str1 = oldstr.substring(0,10)
val str2 = oldstr.substring(11)
val str3 = str2.substring(0,8)
val str4 = str1 + " " +str3
str4
}
//"yyyy-MM-ddTHH:mm:ss.000Z" → "yyyy-mm-dd hh:mm:ss"
val df05=df04.select("cardid","time","station")
val addCol_replacegjTime = udf(replacegjTime _)
var df05R=df05.withColumn("captime",addCol_replacegjTime(df05("time"))).select("cardid","captime","station")
df05.show(10)
df05R.show(10)
另一种格式转换: 其中时间列名,改变前是captime,改变后是time
def replacegjTime(captime:String):String =
{
val gjString=captime
val gjsdf = new SimpleDateFormat("yyyyMMddHHmmss");
val dstsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
val dt =gjsdf.parse(gjString)
val longtime=dt.getTime()
val dstString=dstsdf.format(longtime);
dstString
}
//"yyyymmddhhmmss" → "yyyy-mm-dd hh:mm:ss"
val df05=df04.select("cardid","time","station")
val addCol_replacegjTime = udf(replacegjTime _)
var df05R=df05.withColumn("time",addCol_replacegjTime(df05("captime"))).select("cardid","time","station")
df05.show(10)
df05R.show(10)
四.生成点(时间<>站点)和轨迹
- 将两列合并
var separator = "<>"
val df06=df05R.select(df05R("cardid"),concat_ws(separator, df05R("captime"), df05R("station")).cast(StringType).as("timelocation"))
df05R.show(10)
df06.show(10)
df06.limit(5).collect.foreach(println)
df06
- cardid|timelocation
val df07=df06.groupBy("cardid").agg(collect_set("timelocation"))
df07.limit(5).collect.foreach(println)
val df08=df07.withColumnRenamed("collect_set(timelocation)", "arrayPoint")
val addCol_mkArrayString = udf(makeArrayString _)
val df09=df08.withColumn("strPointList",addCol_mkArrayString(df08("arrayPoint"))).select("cardid","strPointList")
df07.show(10)
df08.show(10)
df09.show(10)
val df09=df08.withColumn("strPointList",addCol_mkArrayString(df08("arrayPoint"))).select("cardid","strPointList")
def makeArrayString(arrayPoint:Array[String]):String =
{
var str:String = ""
val arr = arrayPoint
val count =0
for(i <- arr){
str = str + "," + i.toString
}
str
}
df07.limit(5).collect.foreach(println) df08.limit(5).collect.foreach(println)
df09.limit(5).collect.foreach(println)
- cardid | sampledPointList
import spark.implicits._
import scala.collection.mutable.WrappedArray
var df10=df09.map{ line =>
var cardid=line.getAs[String]("cardid")
var pointList=line.getAs[String]("strPointList")
var len=pointList.split(",").length-1
(cardid,len,pointList)
}.toDF("cardid","len","strPointList")
val df10A=df10.where("len > 4").select("cardid","strPointList")
//var df10=df09.map{row =>
// var points=row(1).toString.split(",").length
// (row(0).toString,points,row(1).toString)
// }
//var df10A0=df10A.filter($"_2" > SZT_LEN_THRESHOLD)
// .toDF("cardid","len","sampledPointList")
// .select("cardid","sampledPointList")
- Sampling to 11:30,12:00,12:30,13:00
val addCol_conductSample = udf(conductSample _)
val df11=df10A.withColumn("sampledPointList",addCol_conductSample(df10A("strPointList")))
.select("cardid","sampledPointList")
conductSample( )
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.Map
def calculateDistance_30(minute:Int,second:Int):Int={
if((minute*60+second)>1800){
return
}
}
def timeTransStr2Long(strTime:String):Long={
}
def timeTransLong2Str(norLTime:Any):String={
}
def conductSample(srcList:String):String={
var time2stationMap = LinkedHashMap[String, String]()
var pointList =srcList.split(",") // 2018-08-04 17:02:28<>老街站
for(point <- pointList){
var len=point.split("<>").length
if(len==2){
var timestation=point.split("<>")// time(2018-08-04 17:02:28) station(老街站)
var strTime=timestation(0)
var station=timestation(1)
var strHMS=strTime.split(" ")(1);//17:02:28
var arrayHMS=strHMS.split(":")
var hour=arrayHMS(0).toInt//17
var minute=arrayHMS(1).toInt//02
var second=arrayHMS(2).toInt//28
var distance:Int=calculateDistance_30(minute,second)
var rawLTime= timeTransStr2Long(strTime)
var norLTime= rawLTime+distance
var dstTime= timeTransLong2Str(norLTime)
if( !time2stationMap.contains(dstTime) ){
time2stationMap+=(dstTime -> station)
}//end if
}// end if
}// end for
var dstList=new ArrayBuffer[String](time2stationMap.size)
time2stationMap.map(x => dstList+=(x._1+"<>"+x._2))
return dstList.mkString(",")
}// end conductSample()
五.生成倒排索引
- flatMap( )
var df12=df11.flatMap{ row =>
// cardid,sampledPointList
var resList = List[(String, String, String)]();
val cardid=row(0).toString//cardid
val pointList=row(1).toString//sampledPointList
val points=pointList.split(",")//point array
val trajectory=cardid+"|"+pointList
for(i <-0 to points.length-1){
val tuple=(cardid,points(i),trajectory)
resList::=tuple
}// end for
resList
}.toDF("cardid","point","trajectory")
- groupBy( ).agg( )
var df13=df12.select("point","trajectory").dropDuplicates("point","trajectory")
var df14=df13.groupBy("point").agg(collect_list("trajectory").as("trajectory_group"))
df14.limit(5).collect.foreach(println)
六.两两相似性计算
- flatMap()
var df15=df14.flatMap{ line =>
var resList = List[(String, String, Int, Int, Int, String)]();
var groups=line.getAs[WrappedArray[String]]("trajectory_group")
for(i <-0 to groups.length-1){
var a=groups(i).split('|')(0)
var a_plist=groups(i).split('|')(1)
for(j<-i+1 to groups.length-1){
var b=groups(j).split('|')(0)
var b_plist=groups(j).split('|')(1)
var restuple=simCalculation(a, b, a_plist, b_plist)
resList::=restuple
}//end for
}//end for
resList
}.toDF("A","B","Alen","Blen","Clen","comPoints").filter($"Clen">3)
- simCalculation( )
def simCalculation(ua:String, ub:String, srcTraj:String, dstTraj:String):(String,String,Int,Int,Int,String)=
{
var src_array:Array[String] = srcTraj.split(",").distinct
var dst_array:Array[String] = dstTraj.split(",").distinct
var m=src_array.length
var n=dst_array.length
var com_array=src_array.toSet.intersect(dst_array.toSet)
val c=com_array.size
val comPoint=com_array.toArray.mkString(",")
return (ua,ub,m,n,c,comPoint)
}
|