IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 离线数仓开发思路---从日志采集到dwd层 -> 正文阅读

[大数据]离线数仓开发思路---从日志采集到dwd层

1- 首先我们先从app端采集日志到日志服务器,一般按照日期分为不同的文件夹

使用到的技术:nginx\openresty

产生的数据:json格式的文件

2-日志服务器中flume的架构为两层,采集数据到hdfs上,压缩并且按日期分在不同的文件夹中

第一层使用failover组的sink,并要考虑峰值数据传进来的速度,第一层有六个节点,第二层有四个节点,上层的三个的sink传进下层的一个source,并且当上层的一个sink,或者下层的一个节点失效无法传输时,启动上层的failover组的另一个sink,以及下层的另一个节点

上层的flume配置文件

source为TAILDIR

channel为file?

sink为avro,并组成一个failover组

下层的flume配置文件

source为avro (配置一个channel选择器,实现将source的数据平均分到多个channel中,要实现channel的选择器,要先在idea中编写一个拦截器,在头信息中增加一个状态值,选择器根据这个状态值,来进行筛选要到那个channel中)这样可以实现多并行

channel为file?

sink为hdfs

3- 数据采集到hdfs上之后,我们要检查一下,hdfs上的数据有没有传对,数据会不会有重复的多了,或者是少了几行。但是因为日志服务器,hdfs服务器不在同一节点上,无法将结果在一起比较,因此需要开发一个web服务器,作为一个监控平台,把日志文件的行数发送到web端,并把hdfs上的文件行数也发送到web端

那么首先开发web服务器,我们使用spingboot框架开发

3.1 先查询日志文件的行数

查询行数的命令

wc -l

要先判断该日期下的文件有多少个,一个和多个查询到的数据格式不一样,要用不同的方法获取

请求web服务器的命令,把当前日志服务器节点的主机名、日志类型、时间信息、行数写在一条json中发送给web服务:

curl -H "Content-type: application/json" -X POST -d"{\"logServerName\":\"${servername}\",\"logType\":\"applog\",\"logDate\":\"${dt}\",\"lineCnt\":${cnt} }" http://${metabus_host}/api/commit

3.2 在日志服务器中连接mysql,将日志服务器的信息保存在mysql中?

3.3?在hdfs节点上判断

3.2.1 从web服务器获取到日志服务器查到的行数

命令:

metabus_host为运行web服务器节点的ip地址

logserver_cnt=$(curl http://${metabus_host}/api/get/applog?date=${dt})

3.2.2 统计hdfs上文件的行数

hdfds_cnt=0
for f in $(hdfs dfs -ls /logdata/applog/${dt}/ | awk 'NR==1{next}{print $NF}')
do
  i=$(hdfs dfs -text $f | wc -l)
  hdfds_cnt=$((hdfds_cnt+i))
  echo "计算hdfs上的日志行数为: $hdfds_cnt "
done

3.2.3 判断两个统计的行数是否相等,如果不相等,执行hive sql命令,将数据group by 去重后,将数据写入hdfs上的去重文件夹中:? ? ? /tmp/distinct_task/applog/${dt}

4- 数据去重之后,将数据加载入库ods层

判断有没有做过去重,有的话去去重文件夹加载数据

没做过去重,则将flume的采集路径作为数据加载路径

结果:生成一个APP日志表?ods.mall_app_log partition(dt='${dt}') (字段包含用户日志的所有信息)

每日更新该表

5- 建数据库,建表,执行脚本********

5.1 各层数据库

drop database ods cascade;
drop database dwd cascade;
drop database dws cascade;
drop database dim cascade;
drop database tmp cascade;
create database ods;
create database dwd;
create database dws;
create database dim;
create database tmp;

5.2 建表 (建表语句就先不写了)

dws层:

? ? ? ? 1. 设备账号绑定权重表 dws.dev_acc_bind_w (字段包含device_id、account、weight、last_logtime),从来没有登陆过账号的设备不会在这里,但是t日某个设备没有登录过账号,以前登录过账号的,可以在这里查找到以前登陆过的账号

? ? ? ? 该表的功能:如果今天传入的某条数据没有账号信息,可以到该表查询该设备有没有登录过账户,选一个权重最大的账户关联上,就可以再和dwd.user_reg_info关联到user_id

????????没有的话,就只能到空设备临时id映射表关联到user_id

? ? ? ? 2. 空设备临时id映射表 dws.mall_app_device_tmpid (字段:device_id、user_id)

dwd层:

? ? ? ? 3.用户设备信息表 dwd.user_reg_info (字段:user_id account

? ? ? ? 4.APP日志明细表 dwd.mall_applog_detail (******@@@要获得的最终结果******)

tmp层:

? ? ? ? 5.清洗过滤临时表?tmp.mall_applog_washed

? ? ? ? 6.session分割临时表?tmp.mall_applog_session_split

? ? ? ? 7.地理位置集成临时表?tmp.mall_applog_area?

5.3 更新计算-用户注册信息表?dwd.user_reg_info

sql逻辑:

第一步:

查询更新前的用户注册信息表?dwd.user_reg_info

得到最大的user_id

第二步:

从APP日志表?ods.mall_app_log?中读取到account,

并使用row_number()生成序列,增加上查询到的最大的user_id,

第三步:(更新)

将结果插入到用户注册信息表?dwd.user_reg_info

疑问:按照每日的时间作为分区,那么如果今天的某个设备,某个用户做的操作,再之前已经附上了user_id,那么今天的分区他会再附上一个新的user_id,这不是冲突了么

解决:要关联下t-1日的未更新的account,筛选出之前没有的

5.4?更新计算-绑定权重表dim.dev_acc_bind_w

sql逻辑:

第一步:

ods.mall_app_log中按照设备id和account 分组

select到字段:设备id、account、计算相同的session_id(注:此时的session_id还没有切分)的数量作为权重weight、最后登录的时间,

与t-1日的dim.dev_acc_bind_w表进行Full join

要求设备id相同、账号相同

得到一个临时视图,以前的权重表与t日的权重表在一行中

第二步:

再增加一次查询、将之前的权重和今日所得到的权重进行汇总

更新最后登录的时间。

第三步:

将结果插入到dim.dev_acc_bind_w中即可

报错:classNotFount

解决:add jar?

5.5?更新计算-空设备临时id映射表?dws.mall_app_device_tmpid

sql逻辑:

第一步:

读取用户日志表ods.mall_app_log

筛选出account为空的 device_id?

第二步:

left join 上已经更新到t日的绑定权重表dim.dev_acc_bind_w

left join 上t-1日的空设备临时id映射表dws.mall_app_device_tmpid

条件为device_id相同,筛选出device_id 为 null的数据,说明这些设备从来没有登陆过账户

第三步:?

读取t-1日的空设备临时id映射表 dws.mall_app_device_tmpid?中最大的user_id

第四步:

关联以上数据,给每个这样的设备添上一个自增的user_id

第五步:(更新表)

将结果插入到空设备临时id映射表?dws.mall_app_device_tmpid

5.6 对数据进行加工:做数据清洗、切分sessionId、数据规范、数据集成、添加guid

?重点:添加guid

sql逻辑:

第一步:

从地理位置集成临时表?tmp.mall_applog_area 读取数据

获取到的数据进行筛选

part1:? ? ? ? 从地理位置集成临时表?tmp.mall_applog_area?筛选出account不为空的数据

? ? ? ? ????????关联用户注册信息表?dwd.user_reg_info

????????????????条件account相同

获取到的数据是有账号登陆的设备,直接拿用户注册信息表的user_id 当作guid

part2:? ? ? 从地理位置集成临时表?tmp.mall_applog_area?筛选出account为空的数据

????????????????-- 先从 "设备账号绑定表" 中,取出每个 设备对应的权重最大的账号 ?==> tmp1
????????????????-- 用tmp1 关联 "用户注册信息表" ,得到 user_id ?==> tmp2?
????????????????-- 拿着 part2 ?关联 ? tmp2 ?关联 ?"空设备id映射表"
????????????????-- 取数:优先用 tmp2的user_id,次之用 "空设备id映射表"的user_id ?,作为 ?guid

最后,把part1 ?UNION ALL ?part2

得到结果

?优化

使用sparkSQL,更灵活,而且因为逻辑中涉及到要反复对一份数据进行运算,可充分利用spark的cache机制来提高效率

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 00:08:39  更:2022-04-01 00:08:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 14:38:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码