一、数据介绍
本次实验采用的数据集是SNLI数据集,是 500,000 标记为英语的句子对。包括蕴含、矛盾,中立三种。 蕴含: 可以通过前提推断出假设。 矛盾: 可以推断出与假设相反。 中立: 所有其他情况。 下载数据集:
import collections
from d2l import mxnet as d2l
from mxnet import gluon, np, npx
npx.set_np()
d2l.DATA_HUB['SNLI'] = ('https://nlp.stanford.edu/projects/snli/snli_1.0.zip',
'9fcde07509c7e87ec61c640c1b2753d9041758e4')
data_dir = d2l.download_extract('SNLI')
二、词嵌入
使用自然语言(或由离散的单个单元组成的任何序列)的规范方法是将每个单词转换为一个单热编码的向量,并将其用于网络的下一阶段。这种方法的缺点是,随着词汇表中单词数目的增加,输入层的大小也会增加。选择的嵌入算法是GloVe,GloVe是一个基于计数的模型,在其中我们制作了一张巨大的表格,该表格显示了每个单词对应于其他单词的频数。显然,如果词汇量很高,并且使用的是诸如Wikipedia之类的大型文本数据集,那么其将形成一张巨大的表格。因此,我们对该表进行降维,以获得大小合理的词嵌入矩阵。
inputs = data.Field(lower=True)
answers = data.Field(sequential=False)
train, dev, test = datasets.SNLI.splits(inputs, answers)
inputs.build_vocab(train, dev, test)
vector = os.path.join(USERHOME, '.vector_cache', 'glove.6B.300d.txt.pt')
三、RNN单元
一个RNN单元能够逐一处理句子中的所有单词。最初,将句子中的第一个单词传递给RNN单元,RNN单元生成输出和中间状态。该状态是序列的连续含义,由于在完成对整个序列的处理之前不会输出此状态,所以将其称为隐藏状态。在处理第一个单词之后,我们有了由RNN单元生成的输出和隐藏状态。输出和隐藏状态都有各自的用途。输出可以被训练以预测句子中的下一个字符或单词。这是大多数语言模型任务的工作方式。
我们将为每个单词使用相同的RNN单元,并将由上一个单词处理生成的隐藏状态作为当前单词的输入进行处理。因此,RNN单元在每个单词处理阶段都有两个输入:单词本身和上一次执行得到的隐藏状态。流程图如下: 实现代码如下:
class RNNCell(nn.Module):
def __init__(self, embed_dim, hidden_size, vocab_dim):
super().__init__()
self.hidden_size = hidden_size
self.input2hidden = nn.Linear(embed_dim + hidden_size, hidden_size)
def forward(self, inputs, hidden):
combined = torch.cat((inputs, hidden), 2)
hidden = torch.relu(self.input2hidden(combined))
return hidden
def init_hidden(self, batch_size):
return torch.zeros(1, batch_size, self.hidden_size)
下面是RNN单元流程图,我们有两个全连接层,它们负责创建输出和输入的隐藏状态。RNNCell的 forward函数接受当前的输入和前一个状态的隐藏状态,然后将它们连接在一起。 一个Linear 层以连接后的张量为输入,并为下一个单元生成隐藏状态。而另一个Linear层则为当前单元生成输出。在返回训练迭代之前,输出将通过softmax进行传递。RNNCell有一个称为init_hidden 的类方法,它可以方便地用于生成第一个隐藏状态,该状态具有我们在从RNNCell初始化对象时传递的隐藏状态大小。在开始遍历序列以获取第一个隐藏状态之前,我们将调用init_hidden,该状态的初始值为零。 四、编码器
class Encoder(nn.Module):
def __init__(self, embed_dim, vocab_dim, hidden_size):
super(Encoder, self).__init__()
self.rnn = RNNCell(embed_dim, hidden_size, vocab_dim)
def forward(self, inputs):
ht = self.rnn.init_hidden(inputs.size(1))
for word in inputs.split(1, dim=0):
ht = self.rnn(word, ht)
return ht
在forward函数中,首先将RNNCell的隐藏状态初始化为零,这可以通过调用之前创建的 init_hidden方法来完成。然后将输入序列在第一维上按大小为1进行拆分,然后进行遍历。这是在假设输入为batch_first之后进行的,因此第一维将是序列长度。为了遍历每个单词,必须遍历第一维。
对于每个单词,使用当前单词(输入)和前一个状态的隐藏状态来调用self.rnn的forward 函数。self.rnn返回下一个单元的输出和隐藏状态,继续循环直到序列结束。对于问题场景,不必担心输出,也不对从输出中获得的损失进行反向传播。相反,假设最后一个隐藏状态拥有句子的含义。
五、分类器
我们的网络的最后一个组件是分类器。因此,现在我们通过编码器处理了两个句子,并得到了它们的最终隐藏状态。现在是时候定义损失函数了。一种方法是计算两个句子中的高维隐藏状态之间的距离。损失可以按以下方式定义: 1)如果句子对是蕴含关系,则将损失最大化为一个较大的正值。 2)如果句子对是矛盾关系,则将损失最小化为一个较大的负值。 3)如果句子对是中性的,则将损失保持在零附近(在两个或三个边界中即可另一种方法是将两个句子的隐藏状态拼接起来,然后将它们传递到另一层集,并定义最终的分类器层,该层可以将拼接的值分类为我们想要的三个类之一。实际的SPINN实现使用该方法,但是其使用的融合机制比简单的拼接更为复杂。 代码如下:
class Merger(nn.Module):
def __init__(self, size, dropout=0.5):
super().__init__()
self.bn = nn.BatchNorm1d(size)
self.dropout = nn.Dropout(p=dropout)
def forward(self, data):
prem = data[0]
hypo = data[1]
diff = prem - hypo
prod = prem * hypo
cated_data = torch.cat([prem, hypo, diff, prod], 2)
cated_data = cated_data.squeeze()
return self.dropout(self.bn(cated_data))
在这里,Merger节点的构建是为了模拟SPINN的实际实现。Merger的forward函数入参为两个序列: prem 和hypoo。我们是首先通过减法确定两个句子间的差异,然后将实际句子与其间的差异和计算得到的乘积连接起来,并将其传递给批次归一化层和 dropout层。 下面将所有组件封装起来
class RNNClassifier(nn.Module):
def __init__(self, config):
super().__init__()
self.embed = nn.Embedding(config.vocab_dim, config.embed_dim)
self.encoder = Encoder(
config.embed_dim, config.vocab_dim, config.hidden_size)
self.classifier = nn.Sequential(
Merger(4 * config.hidden_size, config.dropout),
nn.Linear(4 * config.hidden_size, config.fc1_dim),
nn.ReLU(),
nn.BatchNorm1d(config.fc1_dim),
nn.Dropout(p=config.dropout),
nn.Linear(config.fc1_dim, config.fc2_dim)
)
def forward(self, batch):
prem_embed = self.embed(batch.premise)
hypo_embed = self.embed(batch.hypothesis)
premise = self.encoder(prem_embed)
hypothesis = self.encoder(hypo_embed)
scores = self.classifier((premise, hypothesis))
return scores
最终序列层从 Merger节点开始。合并后的输出的序列长度维数将增加四倍,因为我们将两个句子、两个句子的差和乘积都追加到了Merger的输出中。之后该输出经过一个全连接层,然后在ReLU非线性变换后使用batchnorm1d对其进行归一化。之后的dropout降低了过拟合的概率,随后经过另一个全连接层,该层为我们的输入数据创建了得分。输入数据将决定数据所属的类别(蕴含、矛盾或中性)
六、dropout
dropout它消除了对通常的正则化技术的需求,而正则化技术在dropout之前一直很普遍。借助于dropout,我们随机丢弃了网络中神经元之间的连接(如下图所示),因此网络不得不泛化并且不能偏向任何类型的外部因素。要删除神经元,只需将其输出设置为零即可。丢弃随机神经元可防止网络共适,因此在很大程度上降低了过拟合。 完整代码如下: model.py
import torch.nn as nn
import torch
class RNNCell(nn.Module):
def __init__(self, embed_dim, hidden_size, vocab_dim):
super().__init__()
self.hidden_size = hidden_size
self.input2hidden = nn.Linear(embed_dim + hidden_size, hidden_size)
def forward(self, inputs, hidden):
combined = torch.cat((inputs, hidden), 2)
hidden = torch.relu(self.input2hidden(combined))
return hidden
def init_hidden(self, batch_size):
return torch.zeros(1, batch_size, self.hidden_size)
class Encoder(nn.Module):
def __init__(self, embed_dim, vocab_dim, hidden_size):
super(Encoder, self).__init__()
self.rnn = RNNCell(embed_dim, hidden_size, vocab_dim)
def forward(self, inputs):
ht = self.rnn.init_hidden(inputs.size(1))
for word in inputs.split(1, dim=0):
ht = self.rnn(word, ht)
return ht
class Merger(nn.Module):
def __init__(self, size, dropout=0.5):
super().__init__()
self.bn = nn.BatchNorm1d(size)
self.dropout = nn.Dropout(p=dropout)
def forward(self, data):
prem = data[0]
hypo = data[1]
diff = prem - hypo
prod = prem * hypo
cated_data = torch.cat([prem, hypo, diff, prod], 2)
cated_data = cated_data.squeeze()
return self.dropout(self.bn(cated_data))
class RNNClassifier(nn.Module):
def __init__(self, config):
super().__init__()
self.embed = nn.Embedding(config.vocab_dim, config.embed_dim)
self.encoder = Encoder(
config.embed_dim, config.vocab_dim, config.hidden_size)
self.classifier = nn.Sequential(
Merger(4 * config.hidden_size, config.dropout),
nn.Linear(4 * config.hidden_size, config.fc1_dim),
nn.ReLU(),
nn.BatchNorm1d(config.fc1_dim),
nn.Dropout(p=config.dropout),
nn.Linear(config.fc1_dim, config.fc2_dim)
)
def forward(self, batch):
prem_embed = self.embed(batch.premise)
hypo_embed = self.embed(batch.hypothesis)
premise = self.encoder(prem_embed)
hypothesis = self.encoder(hypo_embed)
scores = self.classifier((premise, hypothesis))
return scores
train.py
import os
import time
from pathlib import Path
from collections import namedtuple
import torch
from torch import optim
import torch.nn as nn
from torchtext import data, datasets
from model import RNNClassifier
ConfigGen = namedtuple(
'ConfigGen',
'vocab_dim out_dim cells birnn dropout fc1_dim fc2_dim embed_dim hidden_size')
ConfigGen.__new__.__defaults__ = (None,) * len(ConfigGen._fields)
USERHOME = str(Path.home())
batch_size = 64
inputs = data.Field(lower=True)
answers = data.Field(sequential=False)
train, dev, test = datasets.SNLI.splits(inputs, answers)
inputs.build_vocab(train, dev, test)
vector = os.path.join(USERHOME, '.vector_cache', 'glove.6B.300d.txt.pt')
if os.path.isfile(vector):
inputs.vocab.vectors = torch.load(vector)
else:
inputs.vocab.load_vectors('glove.6B.300d')
answers.build_vocab(train)
train_iter, dev_iter, test_iter = data.BucketIterator.splits(
(train, dev, test), batch_size=batch_size)
train_iter.init_epoch()
vocab_dim = len(inputs.vocab)
out_dim = len(answers.vocab)
cells = 2
birnn = True
lr = 0.01
epochs = 10
if birnn:
cells *= 2
dropout = 0.5
fc1_dim = 50
fc2_dim = 3
hidden_size = 1000
embed_dim = 300
config = ConfigGen(
vocab_dim, out_dim, cells, birnn,
dropout, fc1_dim, fc2_dim, embed_dim, hidden_size)
model = RNNClassifier(config)
model.embed.weight.data = inputs.vocab.vectors
criterion = nn.CrossEntropyLoss()
def init_weights(m):
if type(m) == nn.Linear:
torch.nn.init.xavier_uniform_(m.weight)
m.bias.data.fill_(0.01)
model.apply(init_weights)
opt = optim.Adam(model.parameters(), lr=lr)
iterations = 0
start = time.time()
best_dev_acc = -1
train_iter.repeat = False
model.train()
for epoch in range(epochs):
train_iter.init_epoch()
n_correct, n_total = 0, 0
for batch_idx, batch in enumerate(train_iter):
opt.zero_grad()
iterations += 1
answer = model(batch)
if torch.isnan(answer).any():
raise RuntimeWarning((
"Found NaN!. Vanishing Gradient kicked in. "
"Fixing that is not in the scope of this illustration. "
"You may raise an issue in github"))
n_correct += (torch.max(answer, 1)
[1].view(batch.label.size()) == batch.label).sum()
n_total += batch.batch_size
train_acc = 100. * n_correct / n_total
loss = criterion(answer, batch.label - 1)
loss.backward()
print(f"Loss: {loss.item()}")
opt.step()
if iterations % 5 == 0:
model.eval()
dev_iter.init_epoch()
n_dev_correct, dev_loss = 0, 0
for dev_batch_idx, dev_batch in enumerate(dev_iter):
answer = model(dev_batch)
n_dev_correct += (torch.max(
answer, 1)[1].view(
dev_batch.label.size()) == dev_batch.label - 1).sum()
dev_loss = criterion(answer, dev_batch.label - 1)
dev_acc = 100. * n_dev_correct / len(dev)
print(dev_acc.item())
model.train()
|