🚀?本文实现了带有Attention机制的Seq2Seq,并实现了其内部的LSTM encoder&decoder的多层或双向结构。(decoder单向,因为它要输出正确顺序的序列)
?
??Seq2Seq原理学习(包含attention机制的讲解)参考这篇: 点击进入
?Seq2Seq原理也十分简单,就是由两个LSTM组成:一个作为encoder,一个作为Decoder。 ?比如在机器翻译任务中: ?训练阶段: ?将一个英文句子的每个token向量依次传入(即整个句子的矩阵表示,传进去他会依次进行)LSTM Encoder,然后得到最后时刻的 h,c ,将他们作为LSTM Decoder的初始 h,c。decoder的输入是起始符,然后依次传入各个单词来更新状态,将最好输出经过全连接层映射成单词的概率分布形式,和label的one-hot形式计算loss反向传播更新模型参数。 ?预测阶段: ?预测阶段的encoder和训练阶段的encoder一样,得到最后时刻的 h,c 作为decoder的初始 h,c 。然后一开始输入起始符给decoder,然后根据模型的预测输出单词作为下一次的输入来不断更新状态,直到预测到终止符,输出翻译的序列。
?Attention机制: ?而Seq2Seq里的attention机制,其实就是考虑了encoder的所有时刻的h,decoder每次更新状态h都会计算跟encoder的所有时刻的h的相关性(具体看最上面那个链接的原理),看更关注哪一部分。而不是像非attention那样,只使用encoder的最后时刻的状态来更新decoder的状态,因为如果只使用最后时刻的状态的话,有可能其遗忘掉了在比较靠前的单词的信息。
? ?
基于Attention Seq2Seq 实现机器翻译任务(德语->英语):
?下面模型部分我已经整理好,只要修改对应几个参数,即可将seq2seq的lstm encoder&decoder实现成多层双向LSTM(decoder不用双向,因为它要输出正确顺序的序列)。 ?【这里说明一下,模型实现的seq2seq里的lstm enccoder&decoder的hidden dim即输出维度和层数是相同的。】
?数据集:使用torchtext的自带数据集:Multi30k。
?导入相关库:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random
from torchtext.datasets import TranslationDataset, Multi30k
import spacy
from torchtext.data import Field, BucketIterator
import time
import math
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device,'能用')
?数据集以及文本预处理:
?在你的命令行环境中输入以下命令来下载英语和德语的文本处理模型:
python -m spacy download en
python -m spacy download de
?导入德语和英语模型:
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')
?文本预处理: ?使用Spacy库分词。 ?使用torchtext的Field对象和BucketIterator迭代器可以很方便的处理文本:添加起始符和终止符、大写转小写等操作。
def tokenize_de(text):
return [tok.text for tok in spacy_de.tokenizer(text)][::-1]
def tokenize_en(text):
return [tok.text for tok in spacy_en.tokenizer(text)]
SRC = Field(tokenize = tokenize_de,
init_token = '<sos>',
eos_token = '<eos>',
lower = True)
TRG = Field(tokenize = tokenize_en,
init_token = '<sos>',
eos_token = '<eos>',
lower = True)
train_data, valid_data, test_data = Multi30k.splits(exts = ('.de', '.en'),
fields = (SRC, TRG))
print(len(train_data),len(valid_data),len(test_data))
print(vars(train_data.examples[0]))
?用训练集建立源语言和目标语言的词典:
SRC.build_vocab(train_data, min_freq = 2)
TRG.build_vocab(train_data, min_freq = 2)
print(f"Unique tokens in source (de) vocabulary: {len(SRC.vocab)}")
print(f"Unique tokens in target (en) vocabulary: {len(TRG.vocab)}")
?使用torchtext的BucketIterator迭代器,它可以把长度差不多的句子分到一个batch内,将各句子填充到该batch内的最大长度。
BATCH_SIZE = 16
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
(train_data, valid_data, test_data),
batch_size = BATCH_SIZE,
device = device)
batch = next(iter(train_iterator))
print(batch)
?搭建Encoder:
?至于为什么在encoder的双向lstm里加了个为h和c分别加了全连接层,是因为encoder双向而decoder是单向的,所以encoder的输出h和c的维度是enc_hid_dim*2,需要将其转成适合decoder的dec_hid_dim。所以通过一层全连接将其维度进行映射到合适的维度。同时考虑到双向且多层时的情况,所以要将每个层的双向输出结果h进行合并。 (具体输出h和c的索引是指哪一层和哪一个方向,可以看这个链接解释:点击进入)
h[0]:第一层正向最后时间步隐藏状态 h[1]:第一层反向最后时间步隐藏状态 h[2]:第二层正向最后时间步隐藏状态 h[3]:第二层反向最后时间步隐藏状态 依次类推,c的也同理。
class Encoder(nn.Module):
def __init__(self,vocab_size, embedding_dim, enc_hidden_size, dec_hidden_size, num_layers, bidirectional=False):
super(Encoder, self).__init__()
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.enc_hidden_size = enc_hidden_size
self.dec_hidden_size = dec_hidden_size
self.num_layers = num_layers
self.bidirectional = bidirectional
self.embedding = nn.Embedding(self.vocab_size, embedding_dim)
self.lstm = nn.LSTM(input_size=self.embedding_dim, hidden_size=self.enc_hidden_size, num_layers=self.num_layers, bidirectional=self.bidirectional)
if bidirectional:
self.fc1 = nn.Linear(self.enc_hidden_size*2, self.dec_hidden_size)
self.fc2 = nn.Linear(self.enc_hidden_size*2, self.dec_hidden_size)
def forward(self, x):
x = self.embedding(x)
batch_size = x.shape[1]
if self.bidirectional:
h0 = torch.randn(self.num_layers*2, batch_size, self.enc_hidden_size).to(device)
c0 = torch.randn(self.num_layers*2, batch_size, self.enc_hidden_size).to(device)
else:
h0 = torch.randn(self.num_layers, batch_size, self.enc_hidden_size).to(device)
c0 = torch.randn(self.num_layers, batch_size, self.enc_hidden_size).to(device)
out, (h, c) = self.lstm(x, (h0,c0))
if self.bidirectional:
if self.num_layers>=2:
h = torch.tanh(self.fc1(torch.cat((h[0:self.num_layers,:,:], h[self.num_layers:self.num_layers*2,:,:]), dim = 2)))
c = torch.tanh(self.fc2(torch.cat((c[0:self.num_layers,:,:], c[self.num_layers:self.num_layers*2,:,:]), dim = 2)))
else:
h = torch.tanh(self.fc1(torch.cat((h[0,:,:], h[1,:,:]), dim = 1))).unsqueeze(0)
c = torch.tanh(self.fc2(torch.cat((c[0,:,:], c[1,:,:]), dim = 1))).unsqueeze(0)
return out, h, c
?搭建Attention Decoder:
?下面实现的attention是以下这个,也就是文章最上面那个链接讲的第一个attention。就是将 ?其实就是把decoder的当前状态和encoder的所有状态做concat得到更高的向量,然后求矩阵W与这个向量的乘积(通过一个全连接层实现),得到一个向量,然后再将tanh作用于向量每一个元素,将他压到-1和1之间,最后计算向量V与刚才计算出来的向量的内积(通过一个全连接层实现),这里的向量V和矩阵W都是参数,需要从训练数据里学习,算出m个a后,需要对他们做一个softmax变换,得到的结果即为相关性系数了。 ?下面这个Attention类就是用来计算这个decoder的当前状态和encoder的所有状态的相关性的:
class Attention(nn.Module):
def __init__(self, enc_hid_dim, dec_hid_dim, bidirectional=False):
super(Attention, self).__init__()
self.bidirectional = bidirectional
if self.bidirectional:
self.w = nn.Linear((enc_hid_dim*2)+dec_hid_dim, dec_hid_dim,bias=False)
else:
self.w = nn.Linear(enc_hid_dim+dec_hid_dim, dec_hid_dim,bias=False)
self.v = nn.Linear(dec_hid_dim, 1, bias=False)
def forward(self, h, enc_out):
h = h.squeeze(0)
src_len = enc_out.shape[0]
if len(h.shape)==3: h = h[0]
h = h.unsqueeze(1).repeat(1,src_len,1)
enc_out = enc_out.transpose(0,1)
energy = torch.tanh(self.w(torch.cat((h,enc_out), dim=2)))
attention = self.v(energy).squeeze(2)
return F.softmax(attention, dim=1)
?然后下面这个类就是在decoder里加入了attention机制。【然后相关变量的维度信息也注释标了出来。】 ?下图(取自文章最上面链接里的图)中的c0是指经过了encoder的所有状态和相关性系数相乘和的结果,称为context vector。下面代码中我用att_c表示。说白了,其实就是将decoder的输入x1’和c0做个concat后传进去lstm,lstm再将它们和s0做concat等一些列操作。 ?然后最后这个将3者合并传入全连接:pred = self.fc(torch.cat((dec_output, att_c, embedded), dim = 1)),正常我们只将dec_output传进去全连接。这个将3者合并送入全连接是我参考文末链接里做法,应该是某篇attention seq2seq论文里的一个做法。
class ATT_Decoder(nn.Module):
def __init__(self,vocab_size, embedding_dim, enc_hidden_size, dec_hidden_size, num_layers, attention, bidirectional=False):
super(ATT_Decoder, self).__init__()
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.enc_hidden_size = enc_hidden_size
self.dec_hidden_size = dec_hidden_size
self.num_layers = num_layers
self.attention = attention
self.bidirectional = bidirectional
self.embedding = nn.Embedding(self.vocab_size, embedding_dim)
if self.bidirectional:
self.lstm = nn.LSTM(input_size=(self.enc_hidden_size*2)+self.embedding_dim, hidden_size=self.dec_hidden_size, num_layers=self.num_layers)
self.fc = nn.Linear((self.enc_hidden_size*2)+self.dec_hidden_size+self.embedding_dim, self.vocab_size)
else:
self.lstm = nn.LSTM(input_size=self.enc_hidden_size+self.embedding_dim, hidden_size=self.dec_hidden_size, num_layers=self.num_layers)
self.fc = nn.Linear(self.enc_hidden_size+self.dec_hidden_size+self.embedding_dim, self.vocab_size)
def forward(self, dec_input, h, c, enc_output):
dec_input = dec_input.unsqueeze(1)
embedded = self.embedding(dec_input).transpose(0,1)
att = self.attention(h, enc_output).unsqueeze(1)
enc_output = enc_output.transpose(0,1)
att_c = torch.bmm(att, enc_output).transpose(0,1)
lstm_input = torch.cat((embedded,att_c), dim=2)
dec_output, (dec_h, dec_c) = self.lstm(lstm_input, (h, c))
embedded = embedded.squeeze(0)
dec_output = dec_output.squeeze(0).squeeze(0)
att_c = att_c.squeeze(0)
pred = self.fc(torch.cat((dec_output, att_c, embedded), dim = 1))
return pred, dec_h,dec_c
?搭建Seq2Seq:
?下面用for循环依次读入目标语言句子中的每个单词,结合attention机制来更新每一时刻的状态。 ?然后训练过程有个部分参考文末链接的Teacher Forcing的机制,就是以一定概率取decoder当前时刻的预测作为下一次decoder的输入,或是以一定概率取目标语言的真实值作为输入。可以增强模型鲁棒性。参考这篇链接:点击进入
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, trg, teacher_forcing_ratio = 0.5):
batch_size = trg.shape[1]
trg_max_len = trg.shape[0]
trg_vocab_size = self.decoder.vocab_size
enc_outputs, h, c = self.encoder(src)
dec_input = trg[0,:]
outs = torch.zeros(trg_max_len, batch_size, trg_vocab_size).to(device)
for t in range(1, trg_max_len):
out, h, c = self.decoder(dec_input, h, c,enc_outputs)
outs[t] = out
teacher_force = random.random() < teacher_forcing_ratio
pred_token_id = out.max(1)[1]
dec_input = trg[t] if teacher_force else pred_token_id
return outs
?设置相关参数、实例化模型、模型参数初始化:
SRC_VOCAB_SIZE = len(SRC.vocab)
TRG_VOCAB_SIZE = len(TRG.vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
ENC_LAYERS = 1
DEC_LAYERS = 1
ENC_HID_DIM = 512
DEC_HID_DIM = 512
BI_DIRECTIONAL = True
enc = Encoder(SRC_VOCAB_SIZE, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_LAYERS, bidirectional=BI_DIRECTIONAL)
attn = Attention(ENC_HID_DIM, DEC_HID_DIM, bidirectional=BI_DIRECTIONAL)
dec = ATT_Decoder(TRG_VOCAB_SIZE, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_LAYERS, attn, bidirectional=BI_DIRECTIONAL)
model = Seq2Seq(enc, dec).to(device)
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX).to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
def init_weights(m):
for name, param in m.named_parameters():
nn.init.uniform_(param.data, -0.08, 0.08)
model.apply(init_weights)
?编写训练函数:
def train(model, iterator, optimizer, criterion, clip):
model.train()
epoch_loss = 0
for i, batch in enumerate(iterator):
src = batch.src
trg = batch.trg
optimizer.zero_grad()
output = model(src, trg)
output = output[1:].view(-1, output.shape[-1])
trg = trg[1:].view(-1)
loss = criterion(output, trg)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(iterator)
?编写验证函数:
def evaluate(model, iterator, criterion):
model.eval()
epoch_loss = 0
with torch.no_grad():
for i, batch in enumerate(iterator):
src = batch.src
trg = batch.trg
output = model(src, trg, 0)
output = output[1:].view(-1, output.shape[-1])
trg = trg[1:].view(-1)
loss = criterion(output, trg)
epoch_loss += loss.item()
return epoch_loss / len(iterator)
?时间计算函数:
def epoch_time(start_time, end_time):
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
?模型训练与验证:
N_EPOCHS = 1
CLIP = 1
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
start_time = time.time()
print('train step')
train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
print('test step')
valid_loss = evaluate(model, valid_iterator, criterion)
end_time = time.time()
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), 'tut1-model.pt')
print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
?使用测试集测试,训练时在验证集中表现最好的模型。
model.load_state_dict(torch.load('tut1-model.pt'))
test_loss = evaluate(model, test_iterator, criterion)
print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')
?最后附上seq2seq(无attention机制,实现了多层,双向)的代码:
?将以下两个类进行替换即可实现:
class Decoder(nn.Module):
def __init__(self,vocab_size, embedding_dim, enc_hidden_size, dec_hidden_size, num_layers):
super(Decoder, self).__init__()
self.vocab_size = vocab_size
self.embedding_dim = embedding_dim
self.enc_hidden_size = enc_hidden_size
self.dec_hidden_size = dec_hidden_size
self.num_layers = num_layers
self.embedding = nn.Embedding(self.vocab_size, embedding_dim)
self.lstm = nn.LSTM(input_size=self.embedding_dim, hidden_size=self.dec_hidden_size, num_layers=self.num_layers)
self.fc = nn.Linear(self.dec_hidden_size, self.vocab_size)
def forward(self, x, h, c):
x = self.embedding(x).unsqueeze(0)
out, (h, c) = self.lstm(x, (h,c))
pred = self.fc(h[-1,:,:]).squeeze(0)
return pred, h, c
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, trg, teacher_forcing_ratio = 0.5):
batch_size = trg.shape[1]
trg_max_len = trg.shape[0]
trg_vocab_size = self.decoder.vocab_size
enc_outputs, h, c = self.encoder(src)
dec_input = trg[0,:]
outs = torch.zeros(trg_max_len, batch_size, trg_vocab_size).to(device)
for t in range(1, trg_max_len):
out, h, c = self.decoder(dec_input, h, c)
outs[t] = out
teacher_force = random.random() < teacher_forcing_ratio
pred_token_id = out.max(1)[1]
dec_input = trg[t] if teacher_force else pred_token_id
return outs
?实例化模型的dec替换成这个:
dec = Decoder(TRG_VOCAB_SIZE, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_LAYERS)
?小结:
?本文一些地方的代码参考的以下链接。不过它们都是参考了国外的这个链接的代码:点击进入
?然而它们要么只是使用了GRU而没用LSTM(GRU会方便很多,因为它的输出h和c是相同的,而LSTM是不同的,代码中要分开处理)。要么没实现双向,要么没有实现多层(单层单向的话,代码实现时:维度那些会很好处理,多层双向时调整维度很晕,要一步步算好维度才不会匹配错误。)
?对于维度不匹配时报错,我的处理方法就是在报错前把维度print出来,看看维度匹不匹配。有可能经过向量的concat以及全连接的映射变化或是矩阵的相乘后,多出一个维度或少了一个维度:1,或者维度错位。那就要squeeze或者unsqueeze,transpose等操作去调整合适的维度。
?还有就是可以单独实例化某个Encoder或是Decoder类,随机产生一个向量输入模型,观察输出的向量的维度信息是否正确。
?参考链接:
[1] Seq2Seq(Attention)的PyTorch实现(超级详细) [2] Pytorch实现Seq2Seq模型:以机器翻译为例 [3] Seq2Seq(attention)原理和pytorch实现
|