项目场景:
时间序列异常检测。
完整代码:
from pandas import DataFrame
df_voltage = DataFrame(Voltage)
df_voltage.insert(0, 'ts', TimeStamp_rectified)
df_voltage.columns = ['ts','voltage']
import time
import tsod
from preprocess_tool import df_norm
num = 10
lenth = int(100000 / num)
names = locals()
x = np.arange(lenth)
names = locals()
q1 = np.percentile(df0['voltage'], 25)
q3 = np.percentile(df0['voltage'], 75)
iqr = q3 - q1
up_threshold = q3 + 0.6 * iqr
down_threshold = q1 - 0.6 * iqr
w=3
for j in range(num):
c = j * lenth
d = (j + 1) * lenth
names[f'df{j + 1}'] = df0.iloc[c:d]
ts = pd.Series(names[f'df{j + 1}']['voltage'].values,index=pd.to_datetime(names[f'df{j + 1}']['ts']))
names[f'df_{j + 1}'] = names[f'df{j + 1}'].reset_index(drop=True)
names[f'df_{j + 1}']['value'] = names[f'df_{j + 1}']['voltage']
base = 1/(1+0.5+0.25)
for k in range(w,len(names[f'df_{j + 1}'])):
names[f'df_{j + 1}'].loc[k,'value'] = base*names[f'df_{j + 1}'].loc[k,'voltage'] + base*0.5*names[f'df_{j + 1}'].loc[k-1,'voltage'] + base*0.25*names[f'df_{j + 1}'].loc[k-2,'voltage']
cgd = tsod.ConstantValueDetector()
res1 = cgd.detect(ts)
res1 = DataFrame(res1.reset_index(drop=True))
drd = tsod.DiffDetector()
drd.fit(ts)
res2 = drd.detect(ts)
res2 = DataFrame(res2.reset_index(drop=True))
outlier_index1 = [i for i, r in enumerate(res1[0]) if r == True]
outlier_index2 = [i for i, r in enumerate(res2[0]) if r == True]
outlier_index3 = [i for i, res in enumerate(names[f'df_{j + 1}']['voltage']) if res > up_threshold or res < down_threshold]
plt.figure(figsize=(40,4))
pd.Series(data=names[f'df_{j + 1}']['voltage'], index=x).plot(color='b', linestyle='-')
plt.title("Residual Plot")
names[f'df_{j + 1}']['is_anomaly'] = names[f'df_{j + 1}']['value'] - names[f'df_{j + 1}']['voltage'] > 0.4
y_outlier4 =names[f'df_{j + 1}'][names[f'df_{j + 1}']['is_anomaly'] == True]
y_outlier1 = names[f'df_{j + 1}']['voltage'][outlier_index1]
y_outlier2 = names[f'df_{j + 1}']['voltage'][outlier_index2]
y_outlier3 = names[f'df_{j + 1}']['voltage'][outlier_index3]
plt.plot(outlier_index1, y_outlier1, "co")
plt.plot(outlier_index2, y_outlier2, "co")
plt.plot(outlier_index3, y_outlier3, "co", label="Predict")
plt.plot(y_outlier4.index,y_outlier4['voltage'],"co")
plt.legend()
plt.grid()
timeslot = int(time.time())
dt = time.strftime("%Y%m%d%H%M%S", time.localtime(timeslot))
print('finish!')
|