IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 2021.12.14Json复杂处理 -> 正文阅读

[大数据]2021.12.14Json复杂处理

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val jsonSchema = new StructType().add("battery_level", LongType).add("c02_level",LongType).add("cca3",StringType).add("cn", StringType).add("device_id",LongType).add("device_type", StringType).add("signal", LongType).add("ip",StringType).add("temp", LongType).add("timestamp", TimestampType)

scala> case class DeviceData (id: Int, device: String)

val eventsDS = Seq (
  (0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }"""),
  (1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cca3": "NOR", "cn": "Norway", "temp": 30, "signal": 18, "battery_level": 6, "c02_level": 1413, "timestamp" :1475600498 }"""),
  (2, """{"device_id": 2, "device_type": "sensor-ipad", "ip": "88.36.5.1", "cca3": "ITA", "cn": "Italy", "temp": 18, "signal": 25, "battery_level": 5, "c02_level": 1372, "timestamp" :1475600500 }"""),
  (3, """{"device_id": 3, "device_type": "sensor-inest", "ip": "66.39.173.154", "cca3": "USA", "cn": "United States", "temp": 47, "signal": 12, "battery_level": 1, "c02_level": 1447, "timestamp" :1475600502 }"""),
  (4, """{"device_id": 4, "device_type": "sensor-ipad", "ip": "203.82.41.9", "cca3": "PHL", "cn": "Philippines", "temp": 29, "signal": 11, "battery_level": 0, "c02_level": 983, "timestamp" :1475600504 }"""),
  (5, """{"device_id": 5, "device_type": "sensor-istick", "ip": "204.116.105.67", "cca3": "USA", "cn": "United States", "temp": 50, "signal": 16, "battery_level": 8, "c02_level": 1574, "timestamp" :1475600506 }"""),
  (6, """{"device_id": 6, "device_type": "sensor-ipad", "ip": "220.173.179.1", "cca3": "CHN", "cn": "China", "temp": 21, "signal": 18, "battery_level": 9, "c02_level": 1249, "timestamp" :1475600508 }"""),
  (7, """{"device_id": 7, "device_type": "sensor-ipad", "ip": "118.23.68.227", "cca3": "JPN", "cn": "Japan", "temp": 27, "signal": 15, "battery_level": 0, "c02_level": 1531, "timestamp" :1475600512 }"""),
  (8 ,""" {"device_id": 8, "device_type": "sensor-inest", "ip": "208.109.163.218", "cca3": "USA", "cn": "United States", "temp": 40, "signal": 16, "battery_level": 9, "c02_level": 1208, "timestamp" :1475600514 }"""),
  (9,"""{"device_id": 9, "device_type": "sensor-ipad", "ip": "88.213.191.34", "cca3": "ITA", "cn": "Italy", "temp": 19, "signal": 11, "battery_level": 0, "c02_level": 1171, "timestamp" :1475600516 }"""),
  (10,"""{"device_id": 10, "device_type": "sensor-igauge", "ip": "68.28.91.22", "cca3": "USA", "cn": "United States", "temp": 32, "signal": 26, "battery_level": 7, "c02_level": 886, "timestamp" :1475600518 }"""),
  (11,"""{"device_id": 11, "device_type": "sensor-ipad", "ip": "59.144.114.250", "cca3": "IND", "cn": "India", "temp": 46, "signal": 25, "battery_level": 4, "c02_level": 863, "timestamp" :1475600520 }"""),
  (12, """{"device_id": 12, "device_type": "sensor-igauge", "ip": "193.156.90.200", "cca3": "NOR", "cn": "Norway", "temp": 18, "signal": 26, "battery_level": 8, "c02_level": 1220, "timestamp" :1475600522 }"""),
  (13, """{"device_id": 13, "device_type": "sensor-ipad", "ip": "67.185.72.1", "cca3": "USA", "cn": "United States", "temp": 34, "signal": 20, "battery_level": 8, "c02_level": 1504, "timestamp" :1475600524 }"""),
  (14, """{"device_id": 14, "device_type": "sensor-inest", "ip": "68.85.85.106", "cca3": "USA", "cn": "United States", "temp": 39, "signal": 17, "battery_level": 8, "c02_level": 831, "timestamp" :1475600526 }"""),
  (15, """{"device_id": 15, "device_type": "sensor-ipad", "ip": "161.188.212.254", "cca3": "USA", "cn": "United States", "temp": 27, "signal": 26, "battery_level": 5, "c02_level": 1378, "timestamp" :1475600528 }"""),
  (16, """{"device_id": 16, "device_type": "sensor-igauge", "ip": "221.3.128.242", "cca3": "CHN", "cn": "China", "temp": 10, "signal": 24, "battery_level": 6, "c02_level": 1423, "timestamp" :1475600530 }"""),
  (17, """{"device_id": 17, "device_type": "sensor-ipad", "ip": "64.124.180.215", "cca3": "USA", "cn": "United States", "temp": 38, "signal": 17, "battery_level": 9, "c02_level": 1304, "timestamp" :1475600532 }"""),
  (18, """{"device_id": 18, "device_type": "sensor-igauge", "ip": "66.153.162.66", "cca3": "USA", "cn": "United States", "temp": 26, "signal": 10, "battery_level": 0, "c02_level": 902, "timestamp" :1475600534 }"""),
  (19, """{"device_id": 19, "device_type": "sensor-ipad", "ip": "193.200.142.254", "cca3": "AUT", "cn": "Austria", "temp": 32, "signal": 27, "battery_level": 5, "c02_level": 1282, "timestamp" :1475600536 }""")).toDF("id", "device").as[DeviceData]
  
scala> eventsDS.printSchema
root
 |-- id: integer (nullable = false)
 |-- device: string (nullable = true)

scala> eventsDS.show(1,false)  

|id |device      |                                                                                                                                                                                         |
|0  |{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cca3": "USA", "cn": "United States", "temp": 25, "signal": 23, "battery_level": 8, "c02_level": 917, "timestamp" :1475600496 }|




// op.log
scala> val optionRDD = sc.textFile("file:///opt/tmp/op.log")


scala> val option1 = optionRDD.map(x=>x.split('|')).map(x=>(x(0),x(1)))

scala> val jsonStr = option1.map(x=>x._2)
// 我们将用户id加入到json字符串中
scala> val jsonStr = option1.map(x=>{val jsstr = x._2; val js2 = 
jsstr.substring(0, jsstr.length-1); js2+", \"id\":"+x._1+" }"})

scala> val jsonstrDF = jsonStr.toDF()

scala> jsonstrDF.printSchema
root
 |-- value: string (nullable = true)

scala> val jsonobj = jsonstrDF.select(get_json_object($"value","$.ap").as("ap"),
 get_json_object($"value","$.cm").as("cm"), 
 get_json_object($"value","$.et").as("et"))


scala> val jsonobj2 = jsonobj.select($"ap", get_json_object($"cm","$.ln").as("ln"),
 get_json_object($"cm","$.sv").as("sv"), 
 get_json_object($"cm","$.os").as("os"), 
 get_json_object($"cm","$.g").as("g"), 
 get_json_object($"cm","$.mid").as("mid"),
 get_json_object($"cm","$.nw").as("nw"), 
 get_json_object($"cm","$.l").as("l"), 
 get_json_object($"cm","$.vc").as("vc"), 
 get_json_object($"cm","$.hw").as("hw"), 
 get_json_object($"cm","$.ar").as("ar"), 
 get_json_object($"cm","$.uid").as("uid"), 
 get_json_object($"cm","$.t").as("t"), 
 get_json_object($"cm","$.la").as("la"), 
 get_json_object($"cm","$.md").as("md"), 
 get_json_object($"cm","$.vn").as("vn"), 
 get_json_object($"cm","$.ba").as("ba"), 
 get_json_object($"cm","$.sr").as("sr")  ,$"et")
jsonobj2: org.apache.spark.sql.DataFrame = [ap: string, ln: string ... 17 more fields]

scala> val jsonStr2DF = jsonobj2.select($"et")

{"ett":"1593105344120","en":"praise","kv":{"target_id":9,"id":7,"type":1,"add_time":"1593098545976","userid":8}}]}

scala> val schema = ArrayType(StructType(StructField("ett",StringType)::StructField("en",StringType)::StructField("kv",StringType)::Nil))
schema: org.apache.spark.sql.types.ArrayType = ArrayType(StructType(StructField(ett,StringType,true), StructField(en,StringType,true), StructField(kv,StringType,true)),true)

scala> jsonStr2DF.select(from_json($"et",schema).as("events"))

val jsonobj3=jsonStr2DF.select(from_json($"et",schema).as("events"))

scala> val jsonobj4 = jsonobj3.withColumn("events", explode(col("events")))

jsonobj4.filter($"events.en"==="loading").select($"events.ett",$"events.en",$"events.kv").show(false)


scala> jsonobj4.printSchema
root
 |-- events: struct (nullable = true)
 |    |-- ett: string (nullable = true)
 |    |-- en: string (nullable = true)
 |    |-- kv: string (nullable = true)

scala> jsonobj4.select($"events.ett", $"events.en", $"events.kv").show()

// 将cm, ap , et  全部转换成列  保留一个“kv”,
// 将生成DF,按照不同的en(loading,ad,notification...)过滤,
// 过滤后,再进行from_json/get_json_object处理
// 处理结果保存到hive。  

jsonobj2.select(
$"ap",$"ln",$"sv",$"os",
$"g",$"mid",$"nw",$"l",
$"vc",$"hw",$"ar",$"uid",
$"t",$"la",$"md",$"vn",
$"ba",$"sr",from_json($"et",schema).as("events")
).withColumn("events", explode(col("events"))).select($"ap",$"ln",$"sv",$"os",
$"g",$"mid",$"nw",$"l",
$"vc",$"hw",$"ar",$"uid",
$"t",$"la",$"md",$"vn",
$"ba",$"sr",$"events.ett", $"events.en", $"events.kv")

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-15 18:21:59  更:2021-12-15 18:22:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 7:46:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码