引言
本着“凡我不能创造的,我就不能理解”的思想,本系列文章会基于纯Python以及NumPy从零创建自己的深度学习框架,该框架类似PyTorch能实现自动求导。
要深入理解深度学习,从零开始创建的经验非常重要,从自己可以理解的角度出发,尽量不使用外部完备的框架前提下,实现我们想要的模型。本系列文章的宗旨就是通过这样的过程,让大家切实掌握深度学习底层实现,而不是仅做一个调包侠。
在处理数据集的时候,经常需要加载数据,当数据量过大时,一次加载进来是不现实的,这时需要分批处理。最好的做法是实现数据加载类,基于迭代器思想,每次只加载一部分数据。
存在的问题
之前的拆分批次代码存在一些问题
def make_batches(X, y, batch_size=32, shuffle=True):
'''
将数据集拆分成批大小为batch_size的批数据
:param X: 数据集 [样本数,样本维度]
:param y: 对应的标签
:param batch_size: 批大小
:param shuffle: 是否需要对数据进行洗牌
:return:
'''
n = X.shape[0]
if shuffle:
indexes = np.random.permutation(n)
else:
indexes = np.arange(n)
X_batches = [
Tensor(X[indexes, :][k:k + batch_size, :]) for k in range(0, n, batch_size)
]
y_batches = [
Tensor(y[indexes][k:k + batch_size]) for k in range(0, n, batch_size)
]
return X_batches, y_batches
很容易会出现MemoryError :
numpy.core._exceptions.MemoryError: Unable to allocate xx. MiB for an array with shape (xxx,xx) and data type uint8
这种内存不足的问题,主要是在实现的时候X[indexes, :] 这里先读取了所有的记录,然后再去分批。正确做法是对索引分批,每次只传入批次索引。
Dataset
首先我们创建一个数据集类:
class Dataset:
def __getitem__(self, index):
return NotImplementedError
class TensorDataset(Dataset):
def __init__(self, *tensors: Tensor) -> None:
self.tensors = tensors
def __getitem__(self, index):
return tuple(tensor[index] for tensor in self.tensors)
def __len__(self):
return len(self.tensors[0])
我们先实现TensorDataset 可以传入自定义数据和对应的标签。
DataLoader
import math
import numpy as np
from metagrad.dataset import Dataset
class DataLoader:
def __init__(self, dataset: Dataset, batch_size: int = 1,
shuffle: bool = False):
self.dataset = dataset
self.shuffle = shuffle
self.batch_size = batch_size
self.data_size = len(dataset)
self.max_its = math.ceil(self.data_size / batch_size)
self.it = 0
self.indices = None
self.reset()
def reset(self):
self.it = 0
if self.shuffle:
self.indices = np.random.permutation(self.data_size)
else:
self.indices = np.arange(self.data_size)
def __next__(self):
if self.it >= self.max_its:
self.reset()
raise StopIteration
i, batch_size = self.it, self.batch_size
batch_indices = self.indices[i * batch_size:(i + 1) * batch_size]
batch = self.dataset[batch_indices]
self.it += 1
X_batch, y_batch = batch
return X_batch, y_batch
def next(self):
return self.__next__()
def __iter__(self):
return self
然后实现数据加载类,传入数据集dataset ,每次调用__next__ 方法时,只会加载部分数据,利用我们实现的切片操作,可以避免循环实现。
实战
本节代码 → 点此
接下来看我们的数据加载器如何进行使用。
import numpy as np
from metagrad.dataloader import DataLoader
from metagrad.dataset import TensorDataset
from metagrad.functions import sigmoid
from metagrad.tensor import Tensor
import metagrad.module as nn
from keras.datasets import imdb
from metagrad.loss import BCELoss
from metagrad.optim import SGD
from metagrad.utils import make_batches, loss_batch, accuracy
from metagrad.tensor import no_grad
import matplotlib.pyplot as plt
class Feedforward(nn.Module):
'''
简单单隐藏层前馈网络,用于分类问题
'''
def __init__(self, input_size, hidden_size, output_size):
'''
:param input_size: 输入维度
:param hidden_size: 隐藏层大小
:param output_size: 分类个数
'''
self.net = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, output_size)
)
def forward(self, x: Tensor) -> Tensor:
return self.net(x)
def load_dataset():
(X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=10000)
y_train, y_test = y_train[:, np.newaxis], y_test[:, np.newaxis]
X_train = vectorize_sequences(X_train)
X_test = vectorize_sequences(X_test)
X_val = X_train[:10000]
X_train = X_train[10000:]
y_val = y_train[:10000]
y_train = y_train[10000:]
return Tensor(X_train), Tensor(X_test), Tensor(y_train), Tensor(y_test), Tensor(X_val), Tensor(y_val)
def indices_to_sentence(indices: Tensor):
word_index = imdb.get_word_index()
reverse_word_index = dict(
[(value, key) for (key, value) in word_index.items()])
decoded_review = ' '.join(
[reverse_word_index.get(i - 3, '?') for i in indices.data])
return decoded_review
def vectorize_sequences(sequences, dimension=10000):
results = np.zeros((len(sequences), dimension), dtype='uint8')
for i, sequence in enumerate(sequences):
results[i, sequence] = 1
return results
def compute_loss_and_accury(data_loader: DataLoader, model, loss_func, total_nums, opt=None):
losses = []
correct = 0
for X_batch, y_batch in data_loader:
y_pred = model(X_batch)
l = loss_func(y_pred, y_batch)
if opt is not None:
l.backward()
opt.step()
opt.zero_grad()
losses.append(l.item())
correct += np.sum(sigmoid(y_pred).numpy().round() == y_batch.numpy())
loss = sum(losses) / total_nums
accuracy = 100 * correct / total_nums
return loss, accuracy
if __name__ == '__main__':
X_train, X_test, y_train, y_test, X_val, y_val = load_dataset()
model = Feedforward(10000, 128, 1)
optimizer = SGD(model.parameters(), lr=0.001)
loss = BCELoss(reduction="sum")
epochs = 20
batch_size = 512
train_losses, val_losses = [], []
train_accuracies, val_accuracies = [], []
train_ds = TensorDataset(X_train, y_train)
train_dl = DataLoader(train_ds, batch_size=batch_size)
val_ds = TensorDataset(X_val, y_val)
val_dl = DataLoader(val_ds, batch_size=batch_size)
for epoch in range(epochs):
train_loss, train_accuracy = compute_loss_and_accury(train_dl, model, loss, len(X_train), optimizer)
train_losses.append(train_loss)
train_accuracies.append(train_accuracy)
with no_grad():
val_loss, val_accuracy = compute_loss_and_accury(val_dl, model, loss, len(X_val))
val_losses.append(val_loss)
val_accuracies.append(val_accuracy)
print(f"Epoch:{epoch + 1}, Training Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.2f}% | "
f" Validation Loss:{val_loss:.4f} , Accuracy:{val_accuracy:.2f}%")
epoch_list = range(1, epochs + 1)
plt.plot(epoch_list, train_losses, 'r', label='Training loss')
plt.plot(epoch_list, val_losses, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()
plt.clf()
plt.plot(epoch_list, train_accuracies, 'r', label='Training acc')
plt.plot(epoch_list, val_accuracies, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()
with no_grad():
X_test, y_test = Tensor(X_test), Tensor(y_test)
outputs = model(X_test)
correct = np.sum(sigmoid(outputs).numpy().round() == y_test.numpy())
accuracy = 100 * correct / len(y_test)
print(f"Test Accuracy:{accuracy}")
Epoch:1, Training Loss: 0.6283, Accuracy: 62.46% | Validation Loss:0.5137 , Accuracy:80.53%
Epoch:2, Training Loss: 0.6466, Accuracy: 68.52% | Validation Loss:0.5245 , Accuracy:81.06%
Epoch:3, Training Loss: 0.5974, Accuracy: 68.98% | Validation Loss:0.4521 , Accuracy:82.33%
Epoch:4, Training Loss: 0.5042, Accuracy: 75.94% | Validation Loss:0.3807 , Accuracy:83.38%
Epoch:5, Training Loss: 0.4556, Accuracy: 79.24% | Validation Loss:0.3690 , Accuracy:85.02%
Epoch:6, Training Loss: 0.3801, Accuracy: 83.11% | Validation Loss:0.3610 , Accuracy:84.43%
Epoch:7, Training Loss: 0.3606, Accuracy: 83.74% | Validation Loss:0.3267 , Accuracy:86.23%
Epoch:8, Training Loss: 0.3090, Accuracy: 86.72% | Validation Loss:0.3120 , Accuracy:86.53%
Epoch:9, Training Loss: 0.3088, Accuracy: 86.45% | Validation Loss:0.3056 , Accuracy:86.94%
Epoch:10, Training Loss: 0.3781, Accuracy: 83.32% | Validation Loss:0.3326 , Accuracy:86.67%
Epoch:11, Training Loss: 0.3872, Accuracy: 82.99% | Validation Loss:0.3053 , Accuracy:87.00%
Epoch:12, Training Loss: 0.2890, Accuracy: 88.03% | Validation Loss:0.2994 , Accuracy:87.33%
Epoch:13, Training Loss: 0.2775, Accuracy: 87.62% | Validation Loss:0.2955 , Accuracy:87.32%
Epoch:14, Training Loss: 0.3027, Accuracy: 86.41% | Validation Loss:0.3102 , Accuracy:87.04%
Epoch:15, Training Loss: 0.2360, Accuracy: 90.01% | Validation Loss:0.3045 , Accuracy:87.47%
Epoch:16, Training Loss: 0.2313, Accuracy: 90.17% | Validation Loss:0.3071 , Accuracy:87.43%
Epoch:17, Training Loss: 0.1887, Accuracy: 92.70% | Validation Loss:0.2868 , Accuracy:88.18%
Epoch:18, Training Loss: 0.5147, Accuracy: 77.11% | Validation Loss:0.3775 , Accuracy:86.02%
Epoch:19, Training Loss: 0.4108, Accuracy: 80.92% | Validation Loss:0.3340 , Accuracy:85.39%
Epoch:20, Training Loss: 0.2783, Accuracy: 87.69% | Validation Loss:0.3466 , Accuracy:86.15%
|