NNDL 实验五 前馈神经网络(3)鸢尾花分类
一、深入研究鸢尾花数据集
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import Perceptron
"""自定义感知机模型"""
class Model:
def __init__(self):
self.w = np.ones(len(data[0]) - 1, dtype=np.float32)
self.b = 0
self.l_rate = 0.1
def sign(self, x, w, b):
y = np.dot(x, w) + b
return y
def fit(self, X_train, y_train):
is_wrong = False
while not is_wrong:
wrong_count = 0
for d in range(len(X_train)):
X = X_train[d]
y = y_train[d]
if y * self.sign(X, self.w, self.b) <= 0:
self.w = self.w + self.l_rate * np.dot(y, X)
self.b = self.b + self.l_rate * y
wrong_count += 1
if wrong_count == 0:
is_wrong = True
return 'Perceptron Model!'
def score(self):
pass
df = pd.read_csv('Iris.csv', usecols=[1, 2, 3, 4, 5])
"""绘制训练集基本散点图,便于人工分析,观察数据集的线性可分性"""
plt.figure(figsize=(8, 5))
plt.scatter(df[:50]['SepalLengthCm'], df[:50]['SepalWidthCm'], label='Iris-setosa')
plt.scatter(df[50:100]['SepalLengthCm'], df[50:100]['SepalWidthCm'], label='Iris-versicolor')
plt.scatter(df[100:150]['SepalLengthCm'], df[100:150]['SepalWidthCm'], label='Iris-virginica')
plt.xlabel('SepalLengthCm')
plt.ylabel('SepalWidthCm')
plt.title('Scattered distribution of length and width of iris sepals.')
plt.legend()
plt.show()
data = np.array(df.iloc[:100, [0, 1, -1]])
X, y = data[:, :-1], data[:, -1]
y = np.array([1 if i == 'Iris-setosa' else -1 for i in y])
"""自定义感知机模型,开始训练"""
perceptron = Model()
perceptron.fit(X, y)
print(perceptron.w, perceptron.b)
x_points = np.linspace(4, 7, 10)
y_ = -(perceptron.w[0] * x_points + perceptron.b) / perceptron.w[1]
plt.plot(x_points, y_)
plt.scatter(df[:50]['SepalLengthCm'], df[:50]['SepalWidthCm'], label='Iris-setosa')
plt.scatter(df[50:100]['SepalLengthCm'], df[50:100]['SepalWidthCm'], label='Iris-versicolor')
plt.xlabel('SepalLengthCm')
plt.ylabel('SepalWidthCm')
plt.title('Training results of Custom perceptron model.')
plt.legend()
plt.show()
"""sklearn感知机模型,开始训练"""
clf = Perceptron()
clf.fit(X, y)
x_points = np.linspace(4, 7, 10)
y_ = -(clf.coef_[0][0] * x_points + clf.intercept_[0]) / clf.coef_[0][1]
plt.plot(x_points, y_)
plt.scatter(df[:50]['SepalLengthCm'], df[:50]['SepalWidthCm'], label='Iris-setosa')
plt.scatter(df[50:100]['SepalLengthCm'], df[50:100]['SepalWidthCm'], label='Iris-versicolor')
plt.xlabel('SepalLengthCm')
plt.ylabel('SepalWidthCm')
plt.title('Training results of sklearn perceptron model.')
plt.legend()
plt.show()
画出数据集中150个数据的前两个特征的散点分布图:
二、4.5 实践:基于前馈神经网络完成鸢尾花分类
继续使用第三章中的鸢尾花分类任务,将Softmax分类器替换为前馈神经网络。
- 损失函数:交叉熵损失;
- 优化器:随机梯度下降法;
- 评价指标:准确率。
4.5.1 小批量梯度下降法
在梯度下降法中,目标函数是整个训练集上的风险函数,这种方式称为批量梯度下降法(Batch Gradient Descent,BGD)。 批量梯度下降法在每次迭代时需要计算每个样本上损失函数的梯度并求和。当训练集中的样本数量𝑁很大时,空间复杂度比较高,每次迭代的计算开销也很大。 为了减少每次迭代的计算复杂度,我们可以在每次迭代时只采集一小部分样本,计算在这组样本上损失函数的梯度并更新参数,这种优化方式称为 小批量梯度下降法(Mini-Batch Gradient Descent,Mini-Batch GD)。 第𝑡次迭代时,随机选取一个包含𝐾个样本的子集𝑡,计算这个子集上每个样本损失函数的梯度并进行平均,然后再进行参数更新。
其中𝐾为批量大小(Batch Size)。𝐾通常不会设置很大,一般在1~100之间。在实际应用中为了提高计算效率,通常设置为2的幂2的n次方。 在实际应用中,小批量随机梯度下降法有收敛快、计算开销小的优点,因此逐渐成为大规模的机器学习中的主要优化算法。 此外,随机梯度下降相当于在批量梯度下降的梯度上引入了随机噪声。在非凸优化问题中,随机梯度下降更容易逃离局部最优点。 小批量随机梯度下降法的训练过程如下:
4.5.1.1 数据分组
为了小批量梯度下降法,我们需要对数据进行随机分组。目前,机器学习中通常做法是构建一个数据迭代器,每个迭代过程中从全部数据集中获取一批指定数量的数据。 数据迭代器的实现原理如下图所示: 首先,将数据集封装为Dataset类,传入一组索引值,根据索引从数据集合中获取数据; 其次,构建DataLoader类,需要指定数据批量的大小和是否需要对数据进行乱序,通过该类即可批量获取数据。 在实践过程中,通常使用进行参数优化。在飞桨中,使用paddle.io.DataLoader加载minibatch的数据, paddle.io.DataLoader API可以生成一个迭代器,其中通过设置batch_size参数来指定minibatch的长度,通过设置shuffle参数为True,可以在生成minibatch的索引列表时将索引顺序打乱。
4.5.2 数据处理
构造IrisDataset类进行数据读取,继承自paddle.io.Dataset类。paddle.io.Dataset是用来封装 Dataset的方法和行为的抽象类,通过一个索引获取指定的样本,同时对该样本进行数据处理。当继承paddle.io.Dataset来定义数据读取类时,实现如下方法: getitem:根据给定索引获取数据集中指定样本,并对样本进行数据处理; len:返回数据集样本个数。 代码实现如下:
import numpy as np
import torch
from torch.utils.data import Dataset,DataLoader
from sklearn import datasets
import copy
import matplotlib
import matplotlib.pyplot as plt
def load_data(shuffle=True):
"""
加载鸢尾花数据
输入:
- shuffle:是否打乱数据,数据类型为bool
输出:
- X:特征数据,shape=[150,4]
- y:标签数据, shape=[150,3]
"""
X = np.array(datasets.load_iris()['data'], dtype=np.float32)
y = np.array(datasets.load_iris()['target'], dtype=np.int64)
X = torch.tensor(X)
y = torch.tensor(y)
X_min = torch.min(X, axis=0)
X_max = torch.max(X, axis=0)
X = (X-X_min.values) / (X_max.values-X_min.values)
if shuffle:
idx = torch.randperm(X.shape[0])
X_new = copy.deepcopy(X)
y_new = copy.deepcopy(y)
for i in range(X.shape[0]):
X_new[i] = X[idx[i]]
y_new[i] = y[idx[i]]
X = X_new
y = y_new
return X, y
class IrisDataset(Dataset):
def __init__(self, mode='train', num_train=120, num_dev=15):
super(IrisDataset, self).__init__()
X, y = load_data(shuffle=True)
if mode == 'train':
self.X, self.y = X[:num_train], y[:num_train]
elif mode == 'dev':
self.X, self.y = X[num_train:num_train + num_dev], y[num_train:num_train + num_dev]
else:
self.X, self.y = X[num_train + num_dev:], y[num_train + num_dev:]
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
def __len__(self):
return len(self.y)
def get_(self):
return self.X,self.y
torch.manual_seed(12)
train_dataset = IrisDataset(mode='train')
dev_dataset = IrisDataset(mode='dev')
test_dataset = IrisDataset(mode='test')
X00,y00 = train_dataset.get_()
x0=X00[y00==0]
x1=X00[y00==1]
x2=X00[y00==2]
for i in [0]:
plt.scatter(x0[:,i],x0[:,i+1],c='r',marker='o',label='setosa')
plt.scatter(x1[:,i],x1[:,i+1],c='g',marker='o',label='virgincia')
plt.scatter(x2[:,i],x2[:,i+1],c='blue',marker='o',label='versicolor')
plt.legend(loc=2)
plt.show()
print ("length of train set: ", len(train_dataset))
4.5.2.2 用DataLoader进行封装
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = DataLoader(dev_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
4.5.3 模型构建
输入层神经元个数为4,输出层神经元个数为3,隐含层神经元个数为6。
import torchmetrics as Metric
class Accuracy():
def __init__(self, is_logist=True):
"""
输入:
- is_logist: outputs是logist还是激活后的值
"""
self.num_correct = 0
self.num_count = 0
self.is_logist = is_logist
def update(self, outputs, labels):
"""
输入:
- outputs: 预测值, shape=[N,class_num]
- labels: 标签值, shape=[N,1]
"""
if outputs.shape[1] == 1:
outputs = torch.squeeze(outputs, axis=-1)
if self.is_logist:
preds = troch.can_cast((outputs>=0), dtype=torch.float32)
else:
preds = torch.can_cast((outputs>=0.5), dtype=torch.float32)
else:
preds = torch.argmax(outputs, dim=1).int()
labels = torch.squeeze(labels, dim=-1)
batch_correct = torch.sum(torch.tensor(preds == labels, dtype=torch.float32)).numpy()
batch_count = len(labels)
self.num_correct += batch_correct
self.num_count += batch_count
def accumulate(self):
if self.num_count == 0:
return 0
return self.num_correct / self.num_count
def reset(self):
self.num_correct = 0
self.num_count = 0
def name(self):
return "Accuracy"
4.5.4 完善Runner类
基于RunnerV2类进行完善实现了RunnerV3类。其中训练过程使用自动梯度计算,使用DataLoader 加载批量数据,使用随机梯度下降法进行参数优化;模型保存时,使用state_dict 方法获取模型参数;模型加载时,使用set_state_dict 方法加载模型参数.
由于这里使用随机梯度下降法对参数优化,所以数据以批次的形式输入到模型中进行训练,那么评价指标计算也是分别在每个批次进行的,要想获得每个epoch整体的评价结果,需要对历史评价结果进行累积。这里定义Accuracy 类实现该功能。
import torchmetrics as Metric
class Accuracy():
def __init__(self, is_logist=True):
"""
输入:
- is_logist: outputs是logist还是激活后的值
"""
self.num_correct = 0
self.num_count = 0
self.is_logist = is_logist
def update(self, outputs, labels):
"""
输入:
- outputs: 预测值, shape=[N,class_num]
- labels: 标签值, shape=[N,1]
"""
if outputs.shape[1] == 1:
outputs = torch.squeeze(outputs, axis=-1)
if self.is_logist:
preds = troch.can_cast((outputs>=0), dtype=torch.float32)
else:
preds = torch.can_cast((outputs>=0.5), dtype=torch.float32)
else:
preds = torch.argmax(outputs, dim=1).int()
labels = torch.squeeze(labels, dim=-1)
batch_correct = torch.sum(torch.tensor(preds == labels, dtype=torch.float32)).numpy()
batch_count = len(labels)
self.num_correct += batch_correct
self.num_count += batch_count
def accumulate(self):
if self.num_count == 0:
return 0
return self.num_correct / self.num_count
def reset(self):
self.num_correct = 0
self.num_count = 0
def name(self):
return "Accuracy"
RunnerV3类的代码实现如下:
import torch.nn.functional as F
class RunnerV3(object):
def __init__(self, model, optimizer, loss_fn, metric, **kwargs):
self.model = model
self.optimizer = optimizer
self.loss_fn = loss_fn
self.metric = metric
self.dev_scores = []
self.train_epoch_losses = []
self.train_step_losses = []
self.dev_losses = []
self.best_score = 0
def train(self, train_loader, dev_loader=None, **kwargs):
self.model.train()
num_epochs = kwargs.get("num_epochs", 0)
log_steps = kwargs.get("log_steps", 100)
eval_steps = kwargs.get("eval_steps", 0)
save_path = kwargs.get("save_path", "best_model.pdparams")
custom_print_log = kwargs.get("custom_print_log", None)
num_training_steps = num_epochs * len(train_loader)
if eval_steps:
if self.metric is None:
raise RuntimeError('Error: Metric can not be None!')
if dev_loader is None:
raise RuntimeError('Error: dev_loader can not be None!')
global_step = 0
for epoch in range(num_epochs):
total_loss = 0
for step, data in enumerate(train_loader):
X, y = data
logits = self.model(X)
loss = self.loss_fn(logits, y)
total_loss += loss
self.train_step_losses.append((global_step,loss.item()))
if log_steps and global_step%log_steps==0:
print(f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")
loss.backward()
if custom_print_log:
custom_print_log(self)
self.optimizer.step()
self.optimizer.zero_grad()
if eval_steps>0 and global_step>0 and \
(global_step%eval_steps == 0 or global_step==(num_training_steps-1)):
dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)
print(f"[Evaluate] dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")
self.model.train()
if dev_score > self.best_score:
self.save_model(save_path)
print(f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}")
self.best_score = dev_score
global_step += 1
trn_loss = (total_loss / len(train_loader)).item()
self.train_epoch_losses.append(trn_loss)
print("[Train] Training done!")
@torch.no_grad()
def evaluate(self, dev_loader, **kwargs):
assert self.metric is not None
self.model.eval()
global_step = kwargs.get("global_step", -1)
total_loss = 0
self.metric.reset()
for batch_id, data in enumerate(dev_loader):
X, y = data
logits = self.model(X)
loss = self.loss_fn(logits, y).item()
total_loss += loss
self.metric.update(logits, y)
dev_loss = (total_loss/len(dev_loader))
dev_score = self.metric.accumulate()
if global_step!=-1:
self.dev_losses.append((global_step, dev_loss))
self.dev_scores.append(dev_score)
return dev_score, dev_loss
@torch.no_grad()
def predict(self, x, **kwargs):
self.model.eval()
logits = self.model(x)
return logits
def save_model(self, save_path):
torch.save(self.model.state_dict(), save_path)
def load_model(self, model_path):
model_state_dict = torch.load(model_path)
self.model.state_dict(model_state_dict)
4.5.5 模型训练
import torch.optim as opt
lr = 0.2
model = fnn_model
optimizer = opt.SGD(model.parameters(),lr)
loss_fn = F.cross_entropy
metric = Accuracy(is_logist=True)
runner = RunnerV3(model, optimizer, loss_fn, metric)
使用训练集和验证集进行模型训练,共训练150个epoch。在实验中,保存准确率最高的模型作为最佳模型。代码实现如下:
log_steps = 100
eval_steps = 50
runner.train(train_loader, dev_loader,
num_epochs=150, log_steps=log_steps, eval_steps = eval_steps,
save_path="best_model.pdparams")
可视化观察训练集损失和训练集loss变化情况:
import matplotlib.pyplot as plt
def plot_training_loss_acc(runner, fig_name,
fig_size=(16, 6),
sample_step=20,
loss_legend_loc="upper right",
acc_legend_loc="lower right",
train_color="#e4007f",
dev_color='#f19ec2',
fontsize='large',
train_linestyle="-",
dev_linestyle='--'):
plt.figure(figsize=fig_size)
plt.subplot(1,2,1)
train_items = runner.train_step_losses[::sample_step]
train_steps=[x[0] for x in train_items]
train_losses = [x[1] for x in train_items]
plt.plot(train_steps, train_losses, color=train_color, linestyle=train_linestyle, label="Train loss")
if len(runner.dev_losses)>0:
dev_steps=[x[0] for x in runner.dev_losses]
dev_losses = [x[1] for x in runner.dev_losses]
plt.plot(dev_steps, dev_losses, color=dev_color, linestyle=dev_linestyle, label="Dev loss")
plt.ylabel("loss", fontsize=fontsize)
plt.xlabel("step", fontsize=fontsize)
plt.legend(loc=loss_legend_loc, fontsize='x-large')
if len(runner.dev_scores)>0:
plt.subplot(1,2,2)
plt.plot(dev_steps, runner.dev_scores,
color=dev_color, linestyle=dev_linestyle, label="Dev accuracy")
plt.ylabel("score", fontsize=fontsize)
plt.xlabel("step", fontsize=fontsize)
plt.legend(loc=acc_legend_loc, fontsize='x-large')
plt.savefig(fig_name)
plt.show()
plot_training_loss_acc(runner, 'fw-loss.pdf')
从输出结果可以看出准确率随着迭代次数增加逐渐上升,损失函数下降。
4.5.6 模型评价
使用测试数据对在训练过程中保存的最佳模型进行评价,观察模型在测试集上的准确率以及Loss情况。代码实现如下:
runner.load_model('best_model.pdparams')
score, loss = runner.evaluate(test_loader)
print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))
4.5.7 模型预测
同样地,也可以使用保存好的模型,对测试集中的某一个数据进行模型预测,观察模型效果。代码实现如下:
X, label = test_dataset.get_()
logits = runner.predict(X)
pred_class = torch.argmax(logits[0]).numpy()
label = label[0].numpy()
print("The true category is {} and the predicted category is {}".format(label, pred_class))
思考题
1. 对比Softmax分类和前馈神经网络分类。
Softmax分类器是逻辑回归分类器(LR)面对多分类任务的一般化变形,softmax计算简单,既可以进行二分类和多分类,和逻辑回归不同的是将激活函数变为softmax. 前馈神经网络是一种单向多层的网络结构,信息从输入层开始,逐层向一个方向传递,即单向传递,一直到输出层结束。前馈的意思就是指传播方向指的是前向。 C=0.1
C=4时:
C=8:
C=12:
C=100:
C=1000:
分类效果图可参考:
2.自定义隐藏层层数和每个隐藏层中的神经元个数,尝试找到最优超参数完成多分类。
隐藏层个数改为3:
将一个神经元的个数改为4:
再将隐藏层个数改为4:
再将隐藏层个数改为5:
根据结果可以看出:当隐层个数为6,神经元个数为均4是效果较好一点。
3. 对比SVM与FNN分类效果,谈谈自己看法。
SVM: 优点: 1.非线性映射理论基础,利用核函数代替了高维空间的映射,最大化间隔是核心,支持向量是训练的结果,最终结果是少量的向量决定的,可以提出较大的样本,所以有较小的鲁棒性。 2.使用核函数可以解决非线性的分类 3.分类思想很简单,就是将样本与决策面的间隔最大化 缺点: 1、SVM算法对大规模训练样本难以实施。SVM的空间消耗主要是存储训练样本和核矩阵,由于SVM是借助二次规划来求解支持向量,而求解二次规划将涉及m阶矩阵的计算(m为样本的个数),当m数目很大时该矩阵的存储和计算将耗费大量的机器内存和运算时间。 2、用SVM解决多分类问题存在困难,但是可以通过间接的方法去做。经典的支持向量机算法只给出了二类分类的算法,而在数据挖掘的实际应用中,一般要解决多类的分类问题。 3、对缺失数据敏感,对参数和核函数的选择敏感 FNN: 优点:神经网络有很强的非线性拟合能力,可映射任意复杂的非线性关系,而且学习规则简单,便于计算机实现。具有很强的鲁棒性、记忆能力、非线性映射能力以及强大的自学习能力,因此有很大的应用市场。可实现非线性映射,有自学能力,有推广概括能力。 缺点: 1、两阶段的训练模型,应用过程不太方便,且模型能力受限于FM表征能力的上限 2、只关注于高阶组合特征的交叉 3、两阶段训练方式也有问题。FM中特征组合,使用的隐向量的点积运算。将FM得到的隐向量移植到DNN中作为DNN的输入, 全连接层这时候会将输入向量的所有元素加权求和,且不会对Field进行区分。 这个其实又回到了之前说Deep Crossing时的问题,全连接隐层把所有特征进行了统一的交叉学习, 这在很多场景下其实是不太合理的。 4.采用梯度下降法,速度慢,有可能进入局部最小值而训练失败,新加入的样本有影响,可能会出现欠学习或过学习。
4. 尝试基于MNIST手写数字识别数据集,设计合适的前馈神经网络进行实验,并取得95%以上的准确率。
import torch
from torch import nn
from torch.autograd import Variable
from torch.utils.data import DataLoader
import torchvision.datasets as dataset
import torchvision.transforms as transforms
batch_size = 100
train_dataset = dataset.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = dataset.MNIST(root='./data', train=False, transform=transforms.ToTensor(), download=True)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)
input_size = 784
hidden_size = 500
num_classes = 10
class module(nn.Module):
def __init__(self, input_size, hidden_size, output_num):
super(module, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, output_num)
def forward(self, x):
out = self.fc1(x)
out = torch.relu(out)
out = self.fc2(out)
return out
module = module(input_size, hidden_size, num_classes)
lr = 1e-1
epochs = 5
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(module.parameters(), lr=lr)
for epoch in range(epochs + 1):
print("==============第 {} 轮 训练开始==============".format(epoch + 1))
for i, (images, labels) in enumerate(train_loader):
images = Variable(images.view(-1, 28 * 28))
labels = Variable(labels)
outputs = module(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if i % 100 == 0:
print("交叉熵损失为: %.5f" % loss.item())
T = 0
CC = 0
for images, labels in test_loader:
images = Variable(images.view(-1, 28 * 28))
labels = Variable(labels)
outputs = module(images)
_, predicts = torch.max(outputs.data, 1)
T += labels.size(0)
CC += (predicts == labels).sum()
print("识别准确率为:%.2f %%" % (100 * CC / T))
总结:
本次实验我了解了Softmax分类和前馈神经网络分类的区别,SVM与FNN分类效果的区别以及他们的优缺点,并且感觉每次网络搭建的步骤都大差不离,总的框架结构差不多。 前馈神经网络思维导图:
|