由于时间序列自带时间这一个维度,是一种序列数据。因此自然而然会想到用lstm,seq2seq等模型。但由于无法并行加速,等缺点,我们也可以尝试着用卷积神经网络来预测时间。
首先需要改变一下普通的卷积,将未来的数据输入了进去会发生泄漏,自然会过拟合。因此采用了因果卷积,因果卷积就是再前面加上0,让感受野只感受到历史数据,而感受不到未来的数据。
因为单变量的多步预测最为普遍,这里以此为例。我参考的代码来自这里 https://github.com/LongxingTan/Time-series-prediction
简单一点的模型: 一路忘深里走,每一步的空洞都加倍,最后的感受野就很深。 最后通过dense层来直接预测出多步的结果。来dense自己来选择权重
复杂的encoder-decoder模型: 编码的时候不断忘深里走,同时记录每一层的中间结果。 解码的时候用中间结果,同时加上解码的特征,来循环的得到每一步的预测值。
一个简化版的例子如下:
数据读取:
class SineData(DataSet):
def __init__(self,params):
'''
This is a toy data example of sine function with some noise
:param params:
'''
super(SineData, self).__init__(params)
def load_data(self, data_dir=None, *args):
n_examples = 1000
sequence_length = 30
predict_sequence_length = 10
x = []
y = []
for _ in range(n_examples):
rand = random.random()*2*np.pi
sig1 = np.sin(np.linspace(rand, 3.*np.pi+rand, sequence_length + predict_sequence_length))
sig2 = np.cos(np.linspace(rand, 3.*np.pi+rand, sequence_length + predict_sequence_length))
x1 = sig1[:sequence_length]
y1 = sig1[sequence_length:]
x2 = sig2[:sequence_length]
y2 = sig2[sequence_length:]
x_ = np.array([x1, x2])
y_ = np.array([y1, y2])
x.append(x_.T)
y.append(y_.T)
x = np.array(x)
y = np.array(y)
return x, y
def get_examples(self,data_dir=None, sample=1, plot=False):
x, y = self.load_data()
if plot:
plt.plot(x[0,:])
plt.show()
return x, y[:,:,0:1]
数据导入部分
def load_data(training):
x_train, x_test, y_train, y_test=prepare_data()
if training:
dataset = tf.data.Dataset.from_tensor_slices((x_train,y_train))
dataset = dataset.shuffle(buffer_size=2000)
dataset = dataset.repeat(20)
else:
dataset = tf.data.Dataset.from_tensor_slices((x_test,y_test))
dataset = dataset.repeat(20)
dataset = dataset.batch(32).prefetch(tf.data.experimental.AUTOTUNE)
return dataset
模型部分
def conv_block(x, kernel_size, filters, dilation):
inputs = x
layer_out = ConvTime(filters, kernel_size, causal=True, dilation_rate=dilation, activation='selu')(x)
skip_out = Conv1D(1, 1, activation=None, use_bias=False)(layer_out)
net_in = Conv1D(1, 1, activation=None, use_bias=False)(layer_out)
net_out = Add()([inputs, net_in])
return net_out, skip_out
class CNN(object):
def __init__(self, custom_model_params={}):
self.params = params
self.conv_times = []
for i, (dilation, kernel_size) in enumerate(zip(self.params['dilation_rates'], self.params['kernel_sizes'])):
self.conv_times.append(ConvTime(filters=2 * self.params['filters'],
kernel_size=kernel_size,
causal=True,
dilation_rate=dilation))
self.dense_time1 = Dense3D(units=self.params['filters'], name='encoder_dense_time_1')
self.dense_time2 = Dense3D(units=self.params['filters'] + self.params['filters'], name='encoder_dense_time_2')
self.dense_time3 = Dense3D(units=1, name='encoder_dense_time_3')
self.conv_1 = Conv1D(1, 1, activation=None, use_bias=False)
self.dense_1 = Dense(1)
def __call__(self, x):
l1a, l1b = conv_block(x, 2, 32, 1)
l2a, l2b = conv_block(l1a, 2, 32, 2)
l3a, l3b = conv_block(l2a, 2, 42, 4)
l4a, l4b = conv_block(l3a, 2, 32, 8)
l5a, l5b = conv_block(l4a, 2, 32, 16)
l6a, l6b = conv_block(l5a, 2, 32, 32)
l7a, l7b = conv_block(l6a, 2, 32, 64)
l8 = Add()([l1b, l2b, l3b, l4b, l5b, l6b, l7b])
l9 = Lambda(lambda x: tf.nn.relu(x))(l8)
l10 = Lambda (lambda x: tf.squeeze(self.conv_1(x),-1))(l9)
l10 = self.dense_1(l10)
return l10
训练和验证部分
class Model(object):
def __init__(self):
pass
def build_model(self):
x = tf.keras.layers.Input([12,19])
outputs = CNN()(x)
self.model = tf.keras.Model(x, outputs)
self.optimizer = tf.keras.optimizers.Adam(0.005)
def train(self, dataset, validation_data=None):
self.build_model()
self.model.compile(loss='mse', optimizer=self.optimizer)
callbacks = [
# ReduceLROnPlateau(verbose=1),
EarlyStopping(patience=3, verbose=1),
# ModelCheckpoint('checkpoints/ts_train_{epoch}.tf',verbose=1, save_weights_only=True),
TensorBoard(log_dir='../../outputs/logs')]
self.model.fit(dataset, steps_per_epoch=15, epochs=20, callbacks=callbacks, validation_data=validation_data, validation_steps=1
具体代码还是可以参考这里,里面wavenet似乎更复杂了: https://github.com/LongxingTan/Time-series-prediction
|