数据介绍
传送门 一段值是value,一段值是时间 index_array处理后数据为下图,可以理解为联合index,具有唯一性 value_array处理后数据为下图,可以理解为,每一个小列表对应上图的一个index,31天数据,一共24个index
python一二次平滑指数法
误差采用的是均方误差
import pandas as pd
import numpy as np
import copy
def data_access():
"""
数据接入
:return:
"""
source_data = pd.read_csv('./xxxxx机房耗电量.csv')
data_value = pd \
.DataFrame(source_data, columns=['device_id', 'mete_id', 'value'])
data_value[['device_id', 'mete_id']] = data_value[['device_id', 'mete_id']].astype(object)
value_array = []
index_array = []
for i in data_value.groupby(by=['device_id', 'mete_id']):
temp_value = []
for j in i[1].values:
temp_value.append(j[2])
value_array.append(temp_value)
index_array.append(i[0])
return index_array, value_array
def single_exponential_smoothing(index_array, value_array):
"""
一次指数平滑处理
:param index_array:
:param value_array:
:return:
"""
s1_1 = []
for m in range(0, len(index_array)):
s1_1_temp = []
x = 0
for n in range(0, 3):
x = x + float(value_array[m][n])
x = x / 3
s1_1_temp.append(x)
s1_1.append(s1_1_temp)
alpha = np.arange(0.05, 1, 0.05)
mse_array = []
for z in alpha:
mse_array_temp = []
s1 = copy.deepcopy(s1_1)
for i in range(0, len(value_array)):
mse = 0
for j in range(0, len(value_array[i])):
s1[i].append(
float(z) * float(value_array[i][j]) + (1 - float(z)) * float(s1[i][j])
)
mse = (float(s1[i][j]) - float(value_array[i][j])) ** 2 + mse
mse = mse / int(len(value_array[i]))
mse_array_temp.append(mse)
mse_array.append(mse_array_temp)
alpha_fit = 0
mse_min = float("inf")
for i, j in zip(alpha, mse_array):
mse_sum = 0
for z in j:
mse_sum = mse_sum + z
if mse_min > mse_sum:
mse_min = mse_sum
alpha_fit = i
return format(alpha_fit, '.3f')
def second_exponential_smoothing(index_array, value_array, day):
"""
二次指数平滑处理
:param index_array:
:param value_array:
:param day:
:return:
"""
s2_1 = []
s2_2 = []
for m in range(0, len(index_array)):
s2_1_temp = []
x = 0
for n in range(0, 3):
x = x + float(value_array[m][n])
x = x / 3
s2_1_temp.append(x)
s2_1.append(s2_1_temp)
s2_2.append(s2_1_temp)
alpha = np.arange(0.05, 1, 0.05)
mse_array = {}
s2_1_alpha_predicted = {}
s2_2_alpha_predicted = {}
for z in alpha:
mse_array_temp = []
s2_1_predicted = []
for i in range(0, len(value_array)):
s2_1_temp = [[]] * len(index_array)
for j in range(0, len(value_array[i])):
if j == 0:
s2_1_temp[i].append(
float(z) * float(value_array[i][j]) + (1 - float(z)) * float(s2_1[i][j])
)
else:
s2_1_temp[i].append(
float(z) * float(value_array[i][j]) + (1 - float(z)) * float(s2_1_temp[i][j - 1])
)
s2_1_predicted.append(s2_1_temp[i])
s2_2_predicted = []
for i in range(0, len(value_array)):
s2_2_temp = [[]] * len(index_array)
mse = 0
for j in range(0, len(value_array[i])):
if j == 0:
s2_2_temp[i].append(
float(z) * float(s2_1_predicted[i][j]) + (1 - float(z)) * float(s2_2[i][j])
)
else:
s2_2_temp[i].append(
float(z) * float(s2_1_predicted[i][j]) + (1 - float(z)) * float(s2_2_temp[i][j - 1])
)
mse = (float(s2_2_temp[i][j]) - float(value_array[i][j])) ** 2 + mse
mse = mse / int(len(value_array[i]))
mse_array_temp.append(mse)
s2_2_predicted.append(s2_2_temp[i])
mse_array[z] = mse_array_temp
s2_1_alpha_predicted[z] = s2_1_predicted
s2_2_alpha_predicted[z] = s2_2_predicted
break
alpha_fit = 0
mse_min = float("inf")
for k, v in mse_array.items():
mse_sum = 0
for z in v:
mse_sum = mse_sum + z
if mse_min > mse_sum:
mse_min = mse_sum
alpha_fit = k
s2_1_predicted = s2_1_alpha_predicted[alpha_fit]
s2_2_predicted = s2_2_alpha_predicted[alpha_fit]
Xt = []
for i in range(0, len(value_array)):
At = (
float(s2_1_predicted[i][len(s2_1_predicted[i]) - 1]) * 2 -
float(s2_2_predicted[i][len(s2_2_predicted[i]) - 1])
)
Bt = (
float(alpha_fit) / (1 - float(alpha_fit)) * (
float(s2_1_predicted[i][len(s2_1_predicted[i]) - 1]) - float(
s2_2_predicted[i][len(s2_2_predicted[i]) - 1]))
)
Xt.append(At + Bt * int(day))
print('第' + str(i + 1) + '组的二次平滑预估值为:' + str(Xt[i]) + ';均方误差为:' + str(mse_array[alpha_fit][i]))
if __name__ == '__main__':
index_array, value_array = data_access()
print(index_array)
print(value_array)
二次指数平滑最终结果输出图 三次带更新,二次比较一次带了趋势概念,三次则具有季节性特征
|