目录
0 问题背景
1 算法分析
2 问题描述
3 数据准备
4 问题分析
5 小 结
0 问题背景
???? 在设备健康监测管理时,我们往往需要对设备运行时的电路参数进行监控,以反映设备的运行状况,通过对设备的运行参数长期统计观察建立设备的状态评估模型 。其中一个典型的场景就是我们需要对设备参数指标,进行趋势分析,比如判断某个指标在某一段时间内呈现递增或是递减的趋势,换句话说设备在一时期某一个参数朝着恶化的趋势发展,我们需要对这种趋势进行预警。解决这种长周期的趋势预警问题,我们往往需要构建设备数仓,在批处理中进行分析。本文以转辙机动作电流为例进行分析,针对转辙机动作电流文中给出了一种设备趋势预警的方法,对一定窗口内,同一设备的同一阶段的动作电流平均值指标变化趋势进行趋势分析,当1个时间窗口内,设备在某一阶段的动作电流水平保持整体性增长,大数据系统进行报警,文中通过批处理的形式对该方法进行了实现。
1 算法分析
趋势预警判断规则如下:
对于n个状态平均点形成动作电流平均值窗口集合:P{?? Pt1,? Pt2, Pt3…… Ptn} | 求得 | | | | | {d1,d2, ……..dn} | | | | | 其中di? = (Pt(i+1) - Pt(i))?? /? Pt(i) | | | | 当 | | | | | 1)Σdi /(n-1)?? ≥? 0.1 (平均变化率,可配置); di ≠dn(最末1日,不计算增长率) | 2)max(Pi)/Pbase(基线变化率)? ≥ 1.5 (可配置) | | 3)|Pn-P1|/P1(最后一天动作电流与第一天动作电流差值的绝对值/第一天动作电流)反应相对于初始值增长或降低情况,称为首尾变化率? ≥ 0.5(可配置) | 4)时间窗口内,最多有容忍30%个点没有增长。反向变化率或正向变化率。反向变化率:未增长的点个数/窗口点个数 -1.正向变化率大于0.7或反向变化率小于0.3时预警。反向变化率=1-正向变化率 | | | 5)1)and 2) and 3)and4) 同时成立时,预警条件符合。 | | | 注意:这里的Pbase基线暂取为窗口内数据的中位值 | | | | |
2 问题描述
现已知动作电流按天分区的天表,其表结构如下(为了便于分析本文以7天窗口为例):
gw_id:表示网关id,sensor_id:表示设备id,switch_dir:转换方向 0表示定到反,1表示反到定。
avg_act_ic:表示计算的指标,动作电流按天的平均值。compute_day:计算日期
gw_id | sensor_id | switch_dir | avg_act_ic | compute_day | 1001 | 123456 | 0 | 5.4 | 2017-07-01 | 1001 | 123456 | 0 | 3.5 | 2017-07-02 | 1001 | 123456 | 0 | 5.8 | 2017-07-03 | 1001 | 123456 | 0 | 8.1 | 2017-07-04 | 1001 | 123456 | 0 | 5.2 | 2017-07-05 | 1001 | 123456 | 0 | 8.5 | 2017-07-06 | 1001 | 123456 | 0 | 13.5 | 2017-07-07 | 1001 | 123456 | 1 | 3.1 | 2017-07-01 | 1001 | 123456 | 1 | 2.8 | 2017-07-02 | 1001 | 123456 | 1 | 1.1 | 2017-07-03 | 1001 | 123456 | 1 | 5.5 | 2017-07-04 | 1001 | 123456 | 1 | 5.3 | 2017-07-05 | 1001 | 123456 | 1 | 5.6 | 2017-07-06 | 1001 | 123456 | 1 | 2.8 | 2017-07-07 | 1001 | 234567 | 1 | 8.1 | 2017-07-01 | 1001 | 234567 | 1 | 7.2 | 2017-07-02 | 1001 | 234567 | 1 | 6.2 | 2017-07-03 | 1001 | 234567 | 1 | 5.1 | 2017-07-04 | 1001 | 234567 | 1 | 9.3 | 2017-07-05 | 1001 | 234567 | 1 | 4.8 | 2017-07-06 | 1001 | 234567 | 1 | 2.3 | 2017-07-07 |
根据以上趋势预警规则,通过SQL形式分析出结果。预警结果保持增长的趋势定义warnning_code为act_ic_growths,递减的趋势定义为act_ic_decline,最终的结果只需要在原有表的基础上给出warning_code即可。
3 数据准备
?(1) 数据准备
?? vim act_ic.txt
1001 123456 0 5.4 2017-07-01
1001 123456 0 3.5 2017-07-02
1001 123456 0 5.8 2017-07-03
1001 123456 0 8.1 2017-07-04
1001 123456 0 5.2 2017-07-05
1001 123456 0 8.5 2017-07-06
1001 123456 0 13.5 2017-07-07
1001 123456 1 3.1 2017-07-01
1001 123456 1 2.8 2017-07-02
1001 123456 1 1.1 2017-07-03
1001 123456 1 5.5 2017-07-04
1001 123456 1 5.3 2017-07-05
1001 123456 1 5.6 2017-07-06
1001 123456 1 2.8 2017-07-07
1001 234567 1 8.1 2017-07-01
1001 234567 1 7.2 2017-07-02
1001 234567 1 6.2 2017-07-03
1001 234567 1 5.1 2017-07-04
1001 234567 1 9.3 2017-07-05
1001 234567 1 4.8 2017-07-06
1001 234567 1 2.3 2017-07-07
? (2) 建表.在hive数仓中建表
drop table if exists act_ic;
create table if not exists act_ic(
gw_id string,
sensor_id string,
switch_dir int,
avg_act_ic float,
compute_day string
)
row format delimited fields terminated by '\t'
(3)加载数据
load data local inpath "/home/centos/dan_test/act_ic.txt" into table act_ic;
(4)查询数据
hive> select * from act_ic;
OK
1001 123456 0 5.4 2017-07-01
1001 123456 0 3.5 2017-07-02
1001 123456 0 5.8 2017-07-03
1001 123456 0 8.1 2017-07-04
1001 123456 0 5.2 2017-07-05
1001 123456 0 8.5 2017-07-06
1001 123456 0 13.5 2017-07-07
1001 123456 1 3.1 2017-07-01
1001 123456 1 2.8 2017-07-02
1001 123456 1 1.1 2017-07-03
1001 123456 1 5.5 2017-07-04
1001 123456 1 5.3 2017-07-05
1001 123456 1 5.6 2017-07-06
1001 123456 1 2.8 2017-07-07
1001 234567 1 8.1 2017-07-01
1001 234567 1 7.2 2017-07-02
1001 234567 1 6.2 2017-07-03
1001 234567 1 5.1 2017-07-04
1001 234567 1 9.3 2017-07-05
1001 234567 1 4.8 2017-07-06
1001 234567 1 2.3 2017-07-07
Time taken: 0.592 seconds, Fetched: 21 row(s)
4 问题分析
注:问题求解钱需要先获取按天统计按天分区的动作电流均值指标表,这里的统计在本文中不再具体阐述。
(1) 先做底表。先利用lag()函数求出相邻上一条记录的avg_act_ic指标值作为辅助列,求出max(avg_act_ic),avg_act_ic的中位值,窗口内按时间排序后的第一个值和最后一个值。
SQL代码如下:为了保持结果按天排序我们按照row_number()进行排序
select *
,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir)
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
from act_ic
) t
结果如下:
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 5.4 0 13.5 5.8 13.5 5.4 1
1001 123456 0 3.5 2017-07-02 5.4 -1.9 13.5 5.8 13.5 5.4 2
1001 123456 0 5.8 2017-07-03 3.5 2.3 13.5 5.8 13.5 5.4 3
1001 123456 0 8.1 2017-07-04 5.8 2.3 13.5 5.8 13.5 5.4 4
1001 123456 0 5.2 2017-07-05 8.1 -2.9 13.5 5.8 13.5 5.4 5
1001 123456 0 8.5 2017-07-06 5.2 3.3 13.5 5.8 13.5 5.4 6
1001 123456 0 13.5 2017-07-07 8.5 5 13.5 5.8 13.5 5.4 7
1001 123456 1 3.1 2017-07-01 3.1 0 5.6 3.1 2.8 3.1 1
1001 123456 1 2.8 2017-07-02 3.1 -0.3 5.6 3.1 2.8 3.1 2
1001 123456 1 1.1 2017-07-03 2.8 -1.7 5.6 3.1 2.8 3.1 3
1001 123456 1 5.5 2017-07-04 1.1 4.4 5.6 3.1 2.8 3.1 4
1001 123456 1 5.3 2017-07-05 5.5 -0.2 5.6 3.1 2.8 3.1 5
1001 123456 1 5.6 2017-07-06 5.3 0.3 5.6 3.1 2.8 3.1 6
1001 123456 1 2.8 2017-07-07 5.6 -2.8 5.6 3.1 2.8 3.1 7
1001 234567 1 8.1 2017-07-01 8.1 0 9.3 6.2 2.3 8.1 1
1001 234567 1 7.2 2017-07-02 8.1 -0.9 9.3 6.2 2.3 8.1 2
1001 234567 1 6.2 2017-07-03 7.2 -1 9.3 6.2 2.3 8.1 3
1001 234567 1 5.1 2017-07-04 6.2 -1.1 9.3 6.2 2.3 8.1 4
1001 234567 1 9.3 2017-07-05 5.1 4.2 9.3 6.2 2.3 8.1 5
1001 234567 1 4.8 2017-07-06 9.3 -4.5 9.3 6.2 2.3 8.1 6
1001 234567 1 2.3 2017-07-07 4.8 -2.5 9.3 6.2 2.3 8.1 7
Time taken: 21.78 seconds, Fetched: 21 row(s)
此处也可以统计处差值大于0的个数,以及正向变化率,为了优化逻辑,我们将部分指标计算进行下沉处理,SQL代码如下:
select *
,(sum(case when avg_act_ic_diff <= 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir)) -1 as anti_var_cnt
,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 5.4 0 13.5 5.8 13.5 5.4 7 2 0.49773696506423515 1
1001 123456 0 3.5 2017-07-02 5.4 -1.9 13.5 5.8 13.5 5.4 7 2 0.49773696506423515 2
1001 123456 0 5.8 2017-07-03 3.5 2.3 13.5 5.8 13.5 5.4 7 2 0.49773696506423515 3
1001 123456 0 8.1 2017-07-04 5.8 2.3 13.5 5.8 13.5 5.4 7 2 0.49773696506423515 4
1001 123456 0 5.2 2017-07-05 8.1 -2.9 13.5 5.8 13.5 5.4 7 2 0.49773696506423515 5
1001 123456 0 8.5 2017-07-06 5.2 3.3 13.5 5.8 13.5 5.4 7 2 0.49773696506423515 6
1001 123456 0 13.5 2017-07-07 8.5 5 13.5 5.8 13.5 5.4 7 2 0.49773696506423515 7
1001 123456 1 3.1 2017-07-01 3.1 0 5.6 3.1 2.8 3.1 7 4 0.8828140656227909 1
1001 123456 1 2.8 2017-07-02 3.1 -0.3 5.6 3.1 2.8 3.1 7 4 0.8828140656227909 2
1001 123456 1 1.1 2017-07-03 2.8 -1.7 5.6 3.1 2.8 3.1 7 4 0.8828140656227909 3
1001 123456 1 5.5 2017-07-04 1.1 4.4 5.6 3.1 2.8 3.1 7 4 0.8828140656227909 4
1001 123456 1 5.3 2017-07-05 5.5 -0.2 5.6 3.1 2.8 3.1 7 4 0.8828140656227909 5
1001 123456 1 5.6 2017-07-06 5.3 0.3 5.6 3.1 2.8 3.1 7 4 0.8828140656227909 6
1001 123456 1 2.8 2017-07-07 5.6 -2.8 5.6 3.1 2.8 3.1 7 4 0.8828140656227909 7
1001 234567 1 8.1 2017-07-01 8.1 0 9.3 6.2 2.3 8.1 7 5 0.3759421760605059 1
1001 234567 1 7.2 2017-07-02 8.1 -0.9 9.3 6.2 2.3 8.1 7 5 0.3759421760605059 2
1001 234567 1 6.2 2017-07-03 7.2 -1 9.3 6.2 2.3 8.1 7 5 0.3759421760605059 3
1001 234567 1 5.1 2017-07-04 6.2 -1.1 9.3 6.2 2.3 8.1 7 5 0.3759421760605059 4
1001 234567 1 9.3 2017-07-05 5.1 4.2 9.3 6.2 2.3 8.1 7 5 0.3759421760605059 5
1001 234567 1 4.8 2017-07-06 9.3 -4.5 9.3 6.2 2.3 8.1 7 5 0.3759421760605059 6
1001 234567 1 2.3 2017-07-07 4.8 -2.5 9.3 6.2 2.3 8.1 7 5 0.3759421760605059 7
Time taken: 16.283 seconds, Fetched: 21 row(s)
(2)根据步骤1计算的宽表依据具体的算法求各项指标。指标包括:平均变化率、基线变化率、正向变化率、反向变化率、首尾变化率
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,cast(avgvarrate as decimal(6,1)) as avgvarrate --平均变化率
,cast(max_avg_act_ic/avg_act_ic_m as decimal(6,1)) as basedvarrate --基线变化率
,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基发展速度)
,cast(anti_var_cnt / (cnt -1) as decimal(6,1)) as antivarrate --反向变化率
,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate --正向变化率
FROM(
select *
,(sum(case when avg_act_ic_diff <= 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir)) -1 as anti_var_cnt
,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
) m
注意点:
- (1)求反向个数的时候进行了减1操作原因是计算的是差值小于等于0的个数,由于在窗口内第一个值取上一个值时默认为他本身,此时差值为0,所以应该将该边界减掉
- (2)求反向变化率的时候,cnt减1是因为进行当前的值减去上一个值的操作个数是cnt -1,也需要减去边界点。
- (3)正向变化率就是1减去反向变化率
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 0.5 2.3 1.5 0.3 0.7
1001 123456 0 3.5 2017-07-02 0.5 2.3 1.5 0.3 0.7
1001 123456 0 5.8 2017-07-03 0.5 2.3 1.5 0.3 0.7
1001 123456 0 8.1 2017-07-04 0.5 2.3 1.5 0.3 0.7
1001 123456 0 5.2 2017-07-05 0.5 2.3 1.5 0.3 0.7
1001 123456 0 8.5 2017-07-06 0.5 2.3 1.5 0.3 0.7
1001 123456 0 13.5 2017-07-07 0.5 2.3 1.5 0.3 0.7
1001 123456 1 3.1 2017-07-01 0.9 1.8 0.1 0.7 0.4
1001 123456 1 2.8 2017-07-02 0.9 1.8 0.1 0.7 0.4
1001 123456 1 1.1 2017-07-03 0.9 1.8 0.1 0.7 0.4
1001 123456 1 5.5 2017-07-04 0.9 1.8 0.1 0.7 0.4
1001 123456 1 5.3 2017-07-05 0.9 1.8 0.1 0.7 0.4
1001 123456 1 5.6 2017-07-06 0.9 1.8 0.1 0.7 0.4
1001 123456 1 2.8 2017-07-07 0.9 1.8 0.1 0.7 0.4
1001 234567 1 8.1 2017-07-01 0.4 1.5 0.7 0.8 0.3
1001 234567 1 7.2 2017-07-02 0.4 1.5 0.7 0.8 0.3
1001 234567 1 6.2 2017-07-03 0.4 1.5 0.7 0.8 0.3
1001 234567 1 5.1 2017-07-04 0.4 1.5 0.7 0.8 0.3
1001 234567 1 9.3 2017-07-05 0.4 1.5 0.7 0.8 0.3
1001 234567 1 4.8 2017-07-06 0.4 1.5 0.7 0.8 0.3
1001 234567 1 2.3 2017-07-07 0.4 1.5 0.7 0.8 0.3
Time taken: 24.558 seconds, Fetched: 21 row(s)
(3) 通过第二步计算的指标,进行阈值比较判断得出趋势预警code
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,case when posvarrate>=0.7
then case when avgvarrate>=0.1 and basedvarrate>=1.5 and end1stvarrate>=0.5
then "act_ic_growths"
else null end
else case when avgvarrate>=0.1 and basedvarrate>=1.5 and end1stvarrate>=0.5
then "act_ic_decline"
else null end
end as warning_code
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,cast(avgvarrate as decimal(6,1)) as avgvarrate --平均变化率
,cast(max_avg_act_ic/avg_act_ic_m as decimal(6,1)) as basedvarrate --基线变化率
,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基变化速度)
,cast(anti_var_cnt / (cnt -1) as decimal(6,1)) as antivarrate --反向变化率
,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate --正向变化率
FROM(
select *
,(sum(case when avg_act_ic_diff <= 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir)) -1 as anti_var_cnt
,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
) m
) n
计算结果如下:
VERTICES: 06/06 [==========================>>] 100% ELAPSED TIME: 15.98 s
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 act_ic_growths
1001 123456 0 3.5 2017-07-02 act_ic_growths
1001 123456 0 5.8 2017-07-03 act_ic_growths
1001 123456 0 8.1 2017-07-04 act_ic_growths
1001 123456 0 5.2 2017-07-05 act_ic_growths
1001 123456 0 8.5 2017-07-06 act_ic_growths
1001 123456 0 13.5 2017-07-07 act_ic_growths
1001 123456 1 3.1 2017-07-01 NULL
1001 123456 1 2.8 2017-07-02 NULL
1001 123456 1 1.1 2017-07-03 NULL
1001 123456 1 5.5 2017-07-04 NULL
1001 123456 1 5.3 2017-07-05 NULL
1001 123456 1 5.6 2017-07-06 NULL
1001 123456 1 2.8 2017-07-07 NULL
1001 234567 1 8.1 2017-07-01 act_ic_decline
1001 234567 1 7.2 2017-07-02 act_ic_decline
1001 234567 1 6.2 2017-07-03 act_ic_decline
1001 234567 1 5.1 2017-07-04 act_ic_decline
1001 234567 1 9.3 2017-07-05 act_ic_decline
1001 234567 1 4.8 2017-07-06 act_ic_decline
1001 234567 1 2.3 2017-07-07 act_ic_decline
Time taken: 18.042 seconds, Fetched: 21 row(s)
(4)过滤掉warning_code为NULL的值 求出最终结果
select *
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,case when posvarrate>=0.7
then case when avgvarrate>=0.1 and basedvarrate>=1.5 and end1stvarrate>=0.5
then "act_ic_growths"
else null end
else case when avgvarrate>=0.1 and basedvarrate>=1.5 and end1stvarrate>=0.5
then "act_ic_decline"
else null end
end as warning_code
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,cast(avgvarrate as decimal(6,1)) as avgvarrate --平均变化率
,cast(max_avg_act_ic/avg_act_ic_m as decimal(6,1)) as basedvarrate --基线变化率
,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基变化速度)
,cast(anti_var_cnt / (cnt -1) as decimal(6,1)) as antivarrate --反向变化率
,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate --正向变化率
FROM(
select *
,(sum(case when avg_act_ic_diff <= 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir)) -1 as anti_var_cnt
,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
) m
) n
) p
where warning_code is not null
?结果如下:
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 act_ic_growths
1001 123456 0 3.5 2017-07-02 act_ic_growths
1001 123456 0 5.8 2017-07-03 act_ic_growths
1001 123456 0 8.1 2017-07-04 act_ic_growths
1001 123456 0 5.2 2017-07-05 act_ic_growths
1001 123456 0 8.5 2017-07-06 act_ic_growths
1001 123456 0 13.5 2017-07-07 act_ic_growths
1001 234567 1 8.1 2017-07-01 act_ic_decline
1001 234567 1 7.2 2017-07-02 act_ic_decline
1001 234567 1 6.2 2017-07-03 act_ic_decline
1001 234567 1 5.1 2017-07-04 act_ic_decline
1001 234567 1 9.3 2017-07-05 act_ic_decline
1001 234567 1 4.8 2017-07-06 act_ic_decline
1001 234567 1 2.3 2017-07-07 act_ic_decline
Time taken: 24.369 seconds, Fetched: 14 row(s)
趋势图如下:
act_ic:7天增的趋势如下

?act_ic:7天减的趋势如下

5 小 结
本文针对物联网应用场景,以道岔转辙机动作电流为例,详细给出了一种曲线趋势预测方法。该方法通过对具有趋势性的曲线的特性进行抓取,通过统计的特征的形式进行描述,从4个维度分别来反应具有递增或递减趋势的曲线,分别为,基线变化率、首尾变化率、平均变化率、反向或正向变化率,通过该四个维度的特征与阈值比较最终判断是都复合某种发展趋势。其中本文给出的阈值读者可根据业务进行灵活配置,不断调试,以达到复合自己业务要求的结果。文中通过hivesql批处理的形式进行实现,本文给出的算法在实际中得到了较好的验证。
|