IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 数据处理逻辑 -> 正文阅读

[大数据]数据处理逻辑

数据处理逻辑:

数据读取
数据脱敏
预处理,过滤,字段格式转换
生成点(时间<>站点)和轨迹
生成倒排索引
两两相似性计算

基础知识:

spark.read.format(“csv”)
printSchema(…)
show(…)
count(…)
filter(…)
select(…)
where(…)
dropDuplicates(…)
join( )
udf UserDefinedFunction 用户自定义函数
withColumn 以原有的列为输入,增加一列,
concat_ws(),将两列合并
groupBy agg,分组聚集
map(…)
flatMap(…)

一.数据读取

val inputpath="/data/smartdata/*"
val dfraw= spark.read.format("csv").option("header",value = true).option("encoding","utf-8").load(inputpath)

val dfraw= spark.read.format("csv").option("header",value = true).option("encoding","utf-8").load("/data/smartdata/*")

在这里插入图片描述
在这里插入图片描述

源数据格式
卡号,交易时间,线路站点,交易类型
1990733,2018-10-24T13:37:34.000Z,田贝,地铁入站

此时dfraw是一个DataFrame
小结:
DataFrame的常用操作
①printSchema()
打印DataFrame的模式(Schema)信息
在这里插入图片描述
②select()
选取部分列
在这里插入图片描述

对列名称进行重命名
在这里插入图片描述
③filter()
条件查询
dfraw.filter(dfraw(“交易类型”)==“地铁入站”).show()
在这里插入图片描述
④groupBy()
对记录进行分组
dfraw.groupBy(“线路站点”).count().show
在这里插入图片描述
⑤sort()
排序
dfraw.sort(dfraw(“交易时间”).desc,dfraw(“线路站点”).asc).show
根据交易时间进行降序排序,如果相同就按照线路站点升序排序
在这里插入图片描述
⑥创建
spark.read.json(“hello.json”)
spark.read.parquet(“hello.parquet”)
spark.read.csv(“hello.csv”)
或者
spark.read.format(“json”).load(“people.json”)
spark.read.format(“parquet”).load(“people.parquet”)
spark.read.format(“csv”).load(“people.csv”)
⑦保存
spark.write.json(“hello.json”)
spark.write.parquet(“hello.parquet”)
spark.write.csv(“hello.csv”)
或者
spark.write.format(“json”).save(“hello.json”)
spark.write.format(“parquet”).save(“hello.parquet”)
spark.write.format(“csv”).save(“hello.csv”)
还可以只选择其中的几列进行保存
dfraw.select(“name”,“age”).write.format(“csv”).save(“file:///user/local/spark/mycode/newhello.csv”)
此处的newhello.csv为目录名
保存成文本文件
dfraw.rdd.saveAsTextFile(“file:///user/local/spark/mycode/newhello.txt”)

二.数据脱敏

为防止个人信息泄露,对卡号进行变换
1.提取所有卡号并去重,形成一个新RDD

val dfcardid = dfraw.select("卡号").distinct()
dfcardid.show(10)
dfraw.count
dfcardid.count

在这里插入图片描述

小结:
DataSet是分布式的数据集合
在这里插入图片描述
在这里插入图片描述

2.在新RDD中增加一列,为卡号对应的序号,将RDD转换为DataFrame

val  dfcardid01 =dfcardid.rdd.map(x => x.toString().substring(1,x.toString().length - 1)).zipWithIndex()
val dfcardid02 = dfcardid01.map(x =>(x._1.toString,x._2.toString)).toDF("cardid","idx")

dfcardid01.take(10).foreach(println)
dfcardid02.show(10)

在这里插入图片描述
在这里插入图片描述

3.统一序号格式,位数不足则在前补充0

import java.text.DecimalFormat
def reformat(idx:String):String = {
    val f = new DecimalFormat("000000000")
    val rs =f.format(idx.toLong)
    rs
}
val addCol_reformat=udf(reformat _)
val dfcardid03=dfcardid02.withColumn("index",addCol_reformat(dfcardid02("idx")))
dfcardid03.show(10)

在这里插入图片描述

4.将两个DataFrame连接(join),筛选卡号相同的行

val dfcardid04 =dfraw.join(dfcardid03)
val dfcardid05=dfcardid04.filter($"卡号"===$"cardid")
dfcardid04.show(10)
dfcardid05.show(10)

在这里插入图片描述
dfcardid05.show(10)
在这里插入图片描述

5.选择所需要的列

val dfcardid06=dfcardid05.select("index","交易时间","交易类型","线路站点")
dfcardid06.count
dfraw.count

在这里插入图片描述
dfcardid06.count
在这里插入图片描述
dfraw.count
在这里插入图片描述
dfcardid.count
在这里插入图片描述
dfstation.count
在这里插入图片描述

三.预处理

1.将中文字段转化为英文字段

val schemas=   Seq("cardid","time","type","rawstation")
val df01=dfcardid06.toDF(schemas: _*)

dfraw.printSchema
df01.printSchema
df01.show(10)

在这里插入图片描述
在这里插入图片描述

2.过滤:入站出站记录

val df02=df01.filter(col("type").contains("地铁入站") || 		  		                      col("type").contains("地铁出站"))
//选择三个字段;过滤站点为空的字段;去除重复记录
var df03=df02.select("cardid","time","rawstation").where("rawstation is not null").dropDuplicates("cardid","time","rawstation")

var df03A=df03.where("time like '2018-10%'")

df02.show(10)
df03.show(10)
df03A.show(10)

在这里插入图片描述
在这里插入图片描述
df02.show(5)
在这里插入图片描述
df03.show(5)
在这里插入图片描述
df03A.show(5)
在这里插入图片描述

3.统一站点名称

def replaceStationName(station:String):String ={
      var dststation=station
      if(!station.endsWith("站"))
        dststation = station + "站"
      else
        dststation = dststation

      if(dststation.equals("马鞍山站"))
        dststation="马安山站"
      else if(dststation.equals("深圳大学站"))
        dststation="深大站"
      else if(dststation.equals("?I岭站"))
        dststation="孖岭站"
      else {  }
      dststation
   }
   
val addCol_replaceStation = udf(replaceStationName _)
val df04=df03.withColumn("station",addCol_replaceStation(df03("rawstation")))

df04.show(10)

在这里插入图片描述
在这里插入图片描述

4.时间格式转化
其中时间列名,改变前是time,改变后是captime
原时间格式:time
yyyy-MM-ddTHH:mm:ss.000Z
2018-10-24T13:37:34.000Z
2018-10-19T08:35:25.000Z

新时间格式:captime
yyyy-MM-dd HH:mm:ss
2018-10-19 08:35:25
2018-10-24 13:37:34

def replacegjTime(captime:String):String =
   {
    val oldstr = captime
    val str1 = oldstr.substring(0,10)
    val str2 = oldstr.substring(11)
    val str3 = str2.substring(0,8)
    val str4 = str1 + " " +str3
    str4
   }

//"yyyy-MM-ddTHH:mm:ss.000Z""yyyy-mm-dd hh:mm:ss"
val df05=df04.select("cardid","time","station")
val addCol_replacegjTime = udf(replacegjTime _)
var df05R=df05.withColumn("captime",addCol_replacegjTime(df05("time"))).select("cardid","captime","station")

df05.show(10)
df05R.show(10)

在这里插入图片描述
在这里插入图片描述

另一种格式转换:
其中时间列名,改变前是captime,改变后是time

def replacegjTime(captime:String):String =
   {
      val gjString=captime
      val gjsdf = new SimpleDateFormat("yyyyMMddHHmmss");
      val dstsdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      val dt =gjsdf.parse(gjString)
      val longtime=dt.getTime()
      val dstString=dstsdf.format(longtime);
      dstString
   }
   
//"yyyymmddhhmmss""yyyy-mm-dd hh:mm:ss"
val df05=df04.select("cardid","time","station")
val addCol_replacegjTime = udf(replacegjTime _)
var df05R=df05.withColumn("time",addCol_replacegjTime(df05("captime"))).select("cardid","time","station")

df05.show(10)
df05R.show(10)

四.生成点(时间<>站点)和轨迹

  1. 将两列合并
var separator = "<>"
val df06=df05R.select(df05R("cardid"),concat_ws(separator, df05R("captime"),    	  df05R("station")).cast(StringType).as("timelocation"))

df05R.show(10)
df06.show(10)
df06.limit(5).collect.foreach(println)

在这里插入图片描述
在这里插入图片描述
df06

在这里插入图片描述

  1. cardid|timelocation
val df07=df06.groupBy("cardid").agg(collect_set("timelocation"))
df07.limit(5).collect.foreach(println)

val df08=df07.withColumnRenamed("collect_set(timelocation)", "arrayPoint")
val addCol_mkArrayString = udf(makeArrayString _)
val df09=df08.withColumn("strPointList",addCol_mkArrayString(df08("arrayPoint"))).select("cardid","strPointList")

df07.show(10)
df08.show(10)
df09.show(10)

val df09=df08.withColumn("strPointList",addCol_mkArrayString(df08("arrayPoint"))).select("cardid","strPointList")

def makeArrayString(arrayPoint:Array[String]):String =
{
  var str:String = ""
  val arr = arrayPoint 
  val count =0
  for(i <- arr){
 		str = str + "," + i.toString
  }
  str
  }

在这里插入图片描述
df07.limit(5).collect.foreach(println)
在这里插入图片描述
在这里插入图片描述
df08.limit(5).collect.foreach(println)
在这里插入图片描述

df09.limit(5).collect.foreach(println)
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

  1. cardid | sampledPointList
import spark.implicits._
import scala.collection.mutable.WrappedArray
var df10=df09.map{ line =>
         var cardid=line.getAs[String]("cardid")
         var pointList=line.getAs[String]("strPointList")
         var len=pointList.split(",").length-1
          (cardid,len,pointList)
}.toDF("cardid","len","strPointList")
val df10A=df10.where("len > 4").select("cardid","strPointList")
//var df10=df09.map{row =>
//              var points=row(1).toString.split(",").length
 //              (row(0).toString,points,row(1).toString)
 // }
//var df10A0=df10A.filter($"_2" > SZT_LEN_THRESHOLD)
//			.toDF("cardid","len","sampledPointList")
//			.select("cardid","sampledPointList")

在这里插入图片描述

  1. Sampling to 11:30,12:00,12:30,13:00
val addCol_conductSample = udf(conductSample _)
val df11=df10A.withColumn("sampledPointList",addCol_conductSample(df10A("strPointList")))
		 .select("cardid","sampledPointList")

conductSample( )


import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.Map
def calculateDistance_30(minute:Int,second:Int):Int={
	if((minute*60+second)>1800){
	return 
	}
		}
def timeTransStr2Long(strTime:String):Long={

}
def timeTransLong2Str(norLTime:Any):String={

}

def conductSample(srcList:String):String={
     var time2stationMap = LinkedHashMap[String, String]()
     var pointList =srcList.split(",")  // 2018-08-04 17:02:28<>老街站
     for(point <- pointList){
         var len=point.split("<>").length
         if(len==2){
           var timestation=point.split("<>")// time(2018-08-04 17:02:28) station(老街站)
           var strTime=timestation(0)
           var station=timestation(1)
           var strHMS=strTime.split(" ")(1);//17:02:28
           var arrayHMS=strHMS.split(":")
           var hour=arrayHMS(0).toInt//17
           var minute=arrayHMS(1).toInt//02
           var second=arrayHMS(2).toInt//28
           var distance:Int=calculateDistance_30(minute,second)
           var rawLTime= timeTransStr2Long(strTime)
           var norLTime= rawLTime+distance
           var dstTime= timeTransLong2Str(norLTime)
           if( !time2stationMap.contains(dstTime) ){
               time2stationMap+=(dstTime -> station)
           }//end if
          }// end if
      }// end for
   var dstList=new ArrayBuffer[String](time2stationMap.size)
   time2stationMap.map(x => dstList+=(x._1+"<>"+x._2))
   return dstList.mkString(",")
}// end conductSample()
 

在这里插入图片描述

五.生成倒排索引

  1. flatMap( )
var df12=df11.flatMap{ row =>
      // cardid,sampledPointList
     var resList = List[(String, String, String)]();
     val cardid=row(0).toString//cardid
     val pointList=row(1).toString//sampledPointList
     val points=pointList.split(",")//point array
     val trajectory=cardid+"|"+pointList
     for(i <-0 to points.length-1){
          val tuple=(cardid,points(i),trajectory)
          resList::=tuple
      }// end for
     resList
}.toDF("cardid","point","trajectory")

  1. groupBy( ).agg( )
var df13=df12.select("point","trajectory").dropDuplicates("point","trajectory")
var df14=df13.groupBy("point").agg(collect_list("trajectory").as("trajectory_group"))
df14.limit(5).collect.foreach(println)

六.两两相似性计算

  1. flatMap()
var df15=df14.flatMap{ line =>
    var resList = List[(String, String, Int, Int, Int, String)]();
    var groups=line.getAs[WrappedArray[String]]("trajectory_group")
    for(i <-0 to groups.length-1){
       var a=groups(i).split('|')(0)
       var a_plist=groups(i).split('|')(1)
       for(j<-i+1 to groups.length-1){
          var b=groups(j).split('|')(0)
          var b_plist=groups(j).split('|')(1)
          var restuple=simCalculation(a, b, a_plist, b_plist)
               resList::=restuple
       }//end for
    }//end for
    resList
}.toDF("A","B","Alen","Blen","Clen","comPoints").filter($"Clen">3)
  1. simCalculation( )
def simCalculation(ua:String, ub:String, srcTraj:String, 	dstTraj:String):(String,String,Int,Int,Int,String)=
{
      var src_array:Array[String] = srcTraj.split(",").distinct
      var dst_array:Array[String] = dstTraj.split(",").distinct
      var m=src_array.length
      var n=dst_array.length

      var com_array=src_array.toSet.intersect(dst_array.toSet)
      val c=com_array.size
      val comPoint=com_array.toArray.mkString(",")
      return  (ua,ub,m,n,c,comPoint)
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-28 11:56:15  更:2022-04-28 12:00:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:47:43-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码