深入研究鸢尾花数据集
画出数据集中150个数据的前两个特征的散点分布图(顺便复习一下上学期的SVM):
import numpy as np
import matplotlib.pyplot as plt
from sklearn import svm, datasets
import torch
from dataset import load_data
import numpy as np
data_x,data_y = load_data()
iris_first = []
iris_second = []
iris_third = []
for i in range(0,len(data_y)):
if(data_y[i]==0):
iris_first.append(data_x[i,:].numpy())
elif(data_y[i]==2):
iris_second.append(data_x[i,:].numpy())
else:
iris_third.append(data_x[i,:].numpy())
iris_first = torch.tensor(iris_first)
iris_second = torch.tensor(iris_second)
iris_third = torch.tensor(iris_third)
plt.scatter(iris_first[:,0],iris_first[:,1],c='b')
plt.scatter(iris_second[:,0],iris_second[:,1],c='y')
plt.scatter(iris_third[:,0],iris_third[:,1],c='g')
plt.legend(['iris versicolor','iris setosa','iris vlrglnica'])
4.5 实践:基于前馈神经网络完成鸢尾花分类
继续使用第三章中的鸢尾花分类任务,将Softmax分类器替换为前馈神经网络。
损失函数:交叉熵损失; 优化器:随机梯度下降法; 评价指标:准确率。
4.5.1 小批量梯度下降法
在梯度下降法中,目标函数是整个训练集上的风险函数,这种方式称为批量梯度下降法(Batch Gradient Descent,BGD)。 批量梯度下降法在每次迭代时需要计算每个样本上损失函数的梯度并求和。当训练集中的样本数量(N)很大时,空间复杂度比较高,每次迭代的计算开销也很大。
为了减少每次迭代的计算复杂度,我们可以在每次迭代时只采集一小部分样本,计算在这组样本上损失函数的梯度并更新参数,这种优化方式称为 小批量梯度下降法(Mini-Batch Gradient Descent,Mini-Batch GD)。
第(t)次迭代时,随机选取一个包含(K)个样本的子集(\mathcal{B}_t),计算这个子集上每个样本损失函数的梯度并进行平均,然后再进行参数更新。
[\theta_{t+1} \leftarrow \theta_t - \alpha \frac{1}{K} \sum_{(\boldsymbol{x},y)\in \mathcal{S}_t} \frac{\partial \mathcal{L}\Big(y,f(\boldsymbol{x};\theta)\Big)}{\partial \theta}, ] 其中(K)为批量大小(Batch Size)。(K)通常不会设置很大,一般在(1\sim100)之间。在实际应用中为了提高计算效率,通常设置为2的幂(2^n)。
在实际应用中,小批量随机梯度下降法有收敛快、计算开销小的优点,因此逐渐成为大规模的机器学习中的主要优化算法。 此外,随机梯度下降相当于在批量梯度下降的梯度上引入了随机噪声。在非凸优化问题中,随机梯度下降更容易逃离局部最优点。
小批量随机梯度下降法的训练过程如下:
4.5.1.1 数据分组
为了小批量梯度下降法,我们需要对数据进行随机分组。目前,机器学习中通常做法是构建一个数据迭代器,每个迭代过程中从全部数据集中获取一批指定数量的数据。
import numpy as np
import torch
from dataset import load_data
class IrisDataset(torch.utils.data.Dataset):
def __init__(self, mode='train', num_train=120, num_dev=15):
super(IrisDataset, self).__init__()
X, y = load_data(shuffle=True)
if mode == 'train':
self.X, self.y = X[:num_train], y[:num_train]
elif mode == 'dev':
self.X, self.y = X[num_train:num_train + num_dev], y[num_train:num_train + num_dev]
else:
self.X, self.y = X[num_train + num_dev:], y[num_train + num_dev:]
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
def __len__(self):
return len(self.y)
train_dataset = IrisDataset(mode='train')
dev_dataset = IrisDataset(mode='dev')
test_dataset = IrisDataset(mode='test')
4.5.2.2 用DataLoader进行封装
batch_size = 16
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = torch.utils.data.DataLoader(dev_dataset, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)
4.5.3 模型构建
from torch import nn
class Model_MLP_L2_V3(torch.nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(Model_MLP_L2_V3, self).__init__()
self.fc1 = torch.nn.Linear(input_size, hidden_size)
w_ = torch.normal(0, 0.01, size=(hidden_size, input_size), requires_grad=True)
self.fc1.weight = torch.nn.Parameter(w_)
self.fc1.bias = torch.nn.init.constant_(self.fc1.bias, val=1.0)
self.fc2 = torch.nn.Linear(hidden_size, output_size )
w2 = torch.normal(0, 0.01, size=(output_size, hidden_size), requires_grad=True)
self.fc2.weight = nn.Parameter(w2)
self.fc2.bias = torch.nn.init.constant_(self.fc2.bias, val=1.0)
self.act = torch.sigmoid
def forward(self, inputs):
outputs = self.fc1(inputs)
outputs = self.act(outputs)
outputs = self.fc2(outputs)
return outputs
fnn_model =Model_MLP_L2_V3(input_size=4, hidden_size=6,output_size=3)
4.5.4 完善Runner类
class RunnerV3(object):
def __init__(self, model, optimizer, loss_fn, metric, **kwargs):
self.model = model
self.optimizer = optimizer
self.loss_fn = loss_fn
self.metric = metric
self.dev_scores = []
self.train_epoch_losses = []
self.train_step_losses = []
self.dev_losses = []
self.best_score = 0
def train(self, train_loader, dev_loader=None, **kwargs):
self.model.train()
num_epochs = kwargs.get("num_epochs", 0)
log_steps = kwargs.get("log_steps", 100)
eval_steps = kwargs.get("eval_steps", 0)
save_path = kwargs.get("save_path", "best_model.pdparams")
custom_print_log = kwargs.get("custom_print_log", None)
num_training_steps = num_epochs * len(train_loader)
if eval_steps:
if self.metric is None:
raise RuntimeError('Error: Metric can not be None!')
if dev_loader is None:
raise RuntimeError('Error: dev_loader can not be None!')
global_step = 0
for epoch in range(num_epochs):
total_loss = 0
for step, data in enumerate(train_loader):
X, y = data
logits = self.model(X)
loss = self.loss_fn(logits, y)
total_loss += loss
self.train_step_losses.append((global_step, loss.item()))
if log_steps and global_step % log_steps == 0:
print(
f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")
loss.backward()
if custom_print_log:
custom_print_log(self)
self.optimizer.step()
self.optimizer.zero_grad()
if eval_steps > 0 and global_step > 0 and \
(global_step % eval_steps == 0 or global_step == (num_training_steps - 1)):
dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
print(f"[Evaluate] dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")
self.model.train()
if dev_score > self.best_score:
self.save_model(save_path)
print(
f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}")
self.best_score = dev_score
global_step += 1
trn_loss = (total_loss / len(train_loader)).item()
self.train_epoch_losses.append(trn_loss)
print("[Train] Training done!")
@torch.no_grad()
def evaluate(self, dev_loader, **kwargs):
assert self.metric is not None
self.model.eval()
global_step = kwargs.get("global_step", -1)
total_loss = 0
self.metric.reset()
for batch_id, data in enumerate(dev_loader):
X, y = data
logits = self.model(X)
loss = self.loss_fn(logits, y).item()
total_loss += loss
self.metric.update(logits, y)
dev_loss = (total_loss / len(dev_loader))
dev_score = self.metric.accumulate()
if global_step != -1:
self.dev_losses.append((global_step, dev_loss))
self.dev_scores.append(dev_score)
return dev_score, dev_loss
@torch.no_grad()
def predict(self, x, **kwargs):
self.model.eval()
logits = self.model(x)
return logits
def save_model(self, save_path):
torch.save(self.model.state_dict(), save_path)
def load_model(self, model_path):
model_state_dict = torch.load(model_path)
self.model.load_state_dict(model_state_dict)
class Accuracy(object):
def __init__(self, is_logist=True):
self.num_correct = 0
self.num_count = 0
self.is_logist = is_logist
def update(self, outputs, labels):
if outputs.shape[1] == 1:
outputs = torch.squeeze(outputs, axis=-1)
if self.is_logist:
preds = (outputs>=0).to(torch.float32)
else:
preds = (outputs>=0.5).to(torch.float32)
else:
preds = torch.argmax(outputs, dim=1).int()
labels = torch.squeeze(labels, axis=-1)
batch_correct = torch.sum(torch.tensor(preds==labels, dtype=torch.float32)).numpy()
batch_count = len(labels)
self.num_correct += batch_correct
self.num_count += batch_count
def accumulate(self):
if self.num_count == 0:
return 0
return self.num_correct / self.num_count
def reset(self):
self.num_correct = 0
self.num_count = 0
def name(self):
return "Accuracy"
4.5.5 模型训练
使用训练集和验证集进行模型训练,共训练150个epoch。在实验中,保存准确率最高的模型作为最佳模型。代码实现如下:
import torch.optim as opt
lr = 0.2
model = fnn_model
optimizer = opt.SGD(model.parameters(),lr=lr)
loss_fn = F.cross_entropy
metric = Accuracy(is_logist=True)
runner = RunnerV3(model, optimizer, loss_fn, metric)
log_steps = 100
eval_steps = 50
runner.train(train_loader, dev_loader,
num_epochs=150, log_steps=log_steps, eval_steps = eval_steps,
save_path="best_model.pdparams")
可视化观察训练集损失和训练集loss变化情况。
import matplotlib.pyplot as plt
def plot_training_loss_acc(runner, fig_name,
fig_size=(16, 6),
sample_step=20,
loss_legend_loc="upper right",
acc_legend_loc="lower right",
train_color="#e4007f",
dev_color='#f19ec2',
fontsize='large',
train_linestyle="-",
dev_linestyle='--'):
plt.figure(figsize=fig_size)
plt.subplot(1, 2, 1)
train_items = runner.train_step_losses[::sample_step]
train_steps = [x[0] for x in train_items]
train_losses = [x[1] for x in train_items]
plt.plot(train_steps, train_losses, color=train_color, linestyle=train_linestyle, label="Train loss")
if len(runner.dev_losses) > 0:
dev_steps = [x[0] for x in runner.dev_losses]
dev_losses = [x[1] for x in runner.dev_losses]
plt.plot(dev_steps, dev_losses, color=dev_color, linestyle=dev_linestyle, label="Dev loss")
plt.ylabel("loss", fontsize=fontsize)
plt.xlabel("step", fontsize=fontsize)
plt.legend(loc=loss_legend_loc, fontsize='x-large')
if len(runner.dev_scores) > 0:
plt.subplot(1, 2, 2)
plt.plot(dev_steps, runner.dev_scores,
color=dev_color, linestyle=dev_linestyle, label="Dev accuracy")
plt.ylabel("score", fontsize=fontsize)
plt.xlabel("step", fontsize=fontsize)
plt.legend(loc=acc_legend_loc, fontsize='x-large')
plt.savefig(fig_name)
plt.show()
plot_training_loss_acc(runner, 'fw-loss.pdf')
从输出结果可以看出准确率随着迭代次数增加逐渐上升直至收敛,损失函数则不断下降。
4.5.6 模型评价
runner.load_model('best_model.pdparams')
score, loss = runner.evaluate(test_loader)
print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))
4.5.7 模型预测
X, label = train_dataset[0]
logits = runner.predict(X)
pred_class = torch.argmax(logits[0]).numpy()
label = label.numpy()
print("The true category is {} and the predicted category is {}".format(label, pred_class))
思考题
1. 对比Softmax分类和前馈神经网络分类。(必做)
softmax
import numpy as np
import matplotlib.pyplot as plt
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
from matplotlib.colors import ListedColormap
iris = datasets.load_iris()
list(iris.keys())
X = iris["data"][:, 3:]
y = (iris["target"] == 2).astype(np.int32)
log_reg = LogisticRegression(solver="lbfgs", random_state=42)
log_reg.fit(X, y)
x0, x1 = np.meshgrid(
np.linspace(0, 8, 500).reshape(-1, 1),
np.linspace(0, 4.5, 200).reshape(-1, 1),
)
X_new = np.c_[x0.ravel(), x1.ravel()]
X = iris["data"][:, (2, 3)]
y = iris["target"]
softmax_reg = LogisticRegression(multi_class="multinomial", solver="lbfgs", C=1, random_state=42)
softmax_reg.fit(X, y)
softmax_reg.predict([[5, 2]])
softmax_reg.predict_proba([[5, 2]])
x0, x1 = np.meshgrid(
np.linspace(0, 8, 500).reshape(-1, 1),
np.linspace(0, 3.5, 200).reshape(-1, 1),
)
X_new = np.c_[x0.ravel(), x1.ravel()]
y_proba = softmax_reg.predict_proba(X_new)
y_predict = softmax_reg.predict(X_new)
zz1 = y_proba[:, 1].reshape(x0.shape)
zz = y_predict.reshape(x0.shape)
plt.figure(figsize=(8, 3))
plt.plot(X[y == 2, 0], X[y == 2, 1], "g^", label="Iris virginica")
plt.plot(X[y == 1, 0], X[y == 1, 1], "bs", label="Iris versicolor")
plt.plot(X[y == 0, 0], X[y == 0, 1], "yo", label="Iris setosa")
custom_cmap = ListedColormap(['#fafab0', '#9898ff', '#a0faa0'])
plt.contourf(x0, x1, zz, cmap=custom_cmap)
plt.xlabel("Petal length", fontsize=13)
plt.ylabel("Petal width", fontsize=13)
plt.legend(loc="center left", fontsize=13)
plt.axis([0, 7, 0, 3.5])
plt.title('C=1')
plt.show()
神经网络 这两种分类方式都有很好的拟合效果,Softmax函数可以将上一层的原始数据进行归一化,转化为一个(0,1)之间的数值,这些数值可以被当做概率分布,用来作为多分类的目标预测值。Softmax函数一般作为神经网络的最后一层,接受来自上一层网络的输入值,然后将其转化为概率。
神经网络的优点是分类的准确度高;并行分布处理能力强,分布存储及学习能力强,对噪声神经有较强的鲁棒性和容错能力,能充分逼近复杂的非线性关系;具备联想记忆的功能。
而它的缺点是神经网络需要大量的参数,如网络拓扑结构、权值和阈值的初始值;不能观察之间的学习过程,输出结果难以解释,会影响到结果的可信度和可接受程度;学习时间过长,甚至可能达不到学习的目的。
总结
通过这次实验我基本掌握前馈神经网络框架,以及参数优化问题,对比了不同分类器的效果,这几次前馈神经网络的实验课,我对BP算法又有了跟深刻的理解。 最后是思维导图:
|