1. Sequence-to-Sequence 简介
大多数常见的 sequence-to-sequence (seq2seq) model 为 encoder-decoder model,主要由两个部分组成,分别是 Encoder 和 Decoder,而这两个部分大多数是由 recurrent neural network (RNN) 实现。
Encoder 是将一连串的输入,如文字、影片、声音讯号等,编码为单个向量,这个向量可以想像为整个输入的抽象表示,包含了整个输入的资讯。 Decoder 是將 Encoder 输出的向量进行逐步解码,一次输出一个结果,直到将最终的目标全部输出为止,每次输出会影响下一个输出,一般会在开始输入 < BOS > 来表示开始解码,会在结尾出输出 < EOS > 来表示解码结束。
2. 任务介绍
- 英文翻译为中文
- 输入: 一句英文 (e.g. tom is a student .)
- 输出: 中文翻译 (e.g. 汤姆 是 个 学生 。)
3. 实现过程
首先要做的是下载资料,主要是用来下载本次任务需要的数据集
!gdown --id '1r4px0i-NcrnXy1-tkBsIwvYwbWnxAhcg' --output data.tar.gz
!tar -zxvf data.tar.gz
!mkdir ckpt
!ls
之后导入需要用到的包(如果nltk 包没有下载的话,可使用第一段代码进行下载)
!pip3 install --user nltk
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.utils.data as data
import torch.utils.data.sampler as sampler
import torchvision
from torchvision import datasets, transforms
import numpy as np
import sys
import os
import random
import json
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
需要注意的是,不同的句子往往有着不同的长度,这无疑给训练带来了不小的麻烦(因为 RNN 的输入维度要进行相应的改变)。为了解决这个麻烦,我们使用 <pad> 长度较短的句子进行填充。因此这里定义一个长度转换的类
import numpy as np
class LabelTransform(object):
def __init__(self, size, pad):
self.size = size
self.pad = pad
def __call__(self, label):
label = np.pad(label, (0, (self.size - label.shape[0])), mode='constant', constant_values=self.pad)
return label
下一步就是数据的准备了,我们定义一个Dataset。
import re
import json
class EN2CNDataset(data.Dataset):
def __init__(self, root, max_output_len, set_name):
self.root = root
self.word2int_cn, self.int2word_cn = self.get_dictionary('cn')
self.word2int_en, self.int2word_en = self.get_dictionary('en')
self.data = []
with open(os.path.join(self.root, f'{set_name}.txt'), "r") as f:
for line in f:
self.data.append(line)
print (f'{set_name} dataset size: {len(self.data)}')
self.cn_vocab_size = len(self.word2int_cn)
self.en_vocab_size = len(self.word2int_en)
self.transform = LabelTransform(max_output_len, self.word2int_en['<PAD>'])
def get_dictionary(self, language):
with open(os.path.join(self.root, f'word2int_{language}.json'), "r") as f:
word2int = json.load(f)
with open(os.path.join(self.root, f'int2word_{language}.json'), "r") as f:
int2word = json.load(f)
return word2int, int2word
def __len__(self):
return len(self.data)
def __getitem__(self, Index):
sentences = self.data[Index]
sentences = re.split('[\t\n]', sentences)
sentences = list(filter(None, sentences))
assert len(sentences) == 2
BOS = self.word2int_en['<BOS>']
EOS = self.word2int_en['<EOS>']
UNK = self.word2int_en['<UNK>']
en, cn = [BOS], [BOS]
sentence = re.split(' ', sentences[0])
sentence = list(filter(None, sentence))
for word in sentence:
en.append(self.word2int_en.get(word, UNK))
en.append(EOS)
sentence = re.split(' ', sentences[1])
sentence = list(filter(None, sentence))
for word in sentence:
cn.append(self.word2int_cn.get(word, UNK))
cn.append(EOS)
en, cn = np.asarray(en), np.asarray(cn)
en, cn = self.transform(en), self.transform(cn)
en, cn = torch.LongTensor(en), torch.LongTensor(cn)
return en, cn
接下来就是构建自己的模型
Encoder
- seq2seq模型的编码器为RNN。对于每个输入,Encoder 会输出一个向量和一个隐藏层状态(hidden state),并将隐藏层状态用于下一个输入,换句话说,Encoder 会逐步读入输入序列。
- 参数:
- en_vocab_size 是英文字典的大小,也就是英文的 subword 的个数
- emb_dim 是 embedding 的维度,主要将 one-hot vector 的单词向量压缩到指定的维度,可以使用预先训练好的 word embedding,如 Glove 和 word2vector
- hid_dim 是 RNN 输出和隐藏状态的维度
- n_layers 是 RNN 要叠多少层
- dropout 是决定有多少的机率将某某个节点变为 0,主要是为了防止 overfitting ,一般来说是在训练集使用,测试集不使用
- Encoder 的输入和输出:
- 輸入:
- 英文的整数序列 e.g. 1, 28, 29, 205, 2
- 輸出:
- outputs: 最上层 RNN 全部的输出,可以用 Attention 再进行处理
- hidden: 每层最后的隐藏状态,将传输到后面的 Decoder 进行解码
class Encoder(nn.Module):
def __init__(self, en_vocab_size, emb_dim, hid_dim, n_layers, dropout):
super().__init__()
self.embedding = nn.Embedding(en_vocab_size, emb_dim)
self.hid_dim = hid_dim
self.n_layers = n_layers
self.rnn = nn.GRU(emb_dim, hid_dim, n_layers, dropout=dropout, batch_first=True, bidirectional=True)
self.dropout = nn.Dropout(dropout)
def forward(self, input):
embedding = self.embedding(input)
outputs, hidden = self.rnn(self.dropout(embedding))
return outputs, hidden
Decoder
class Decoder(nn.Module):
def __init__(self, cn_vocab_size, emb_dim, hid_dim, n_layers, dropout, isatt):
super().__init__()
self.cn_vocab_size = cn_vocab_size
self.hid_dim = hid_dim * 2
self.n_layers = n_layers
self.embedding = nn.Embedding(cn_vocab_size, config.emb_dim)
self.isatt = isatt
self.attention = Attention(hid_dim)
self.input_dim = emb_dim
self.rnn = nn.GRU(self.input_dim, self.hid_dim, self.n_layers, dropout = dropout, batch_first=True)
self.embedding2vocab1 = nn.Linear(self.hid_dim, self.hid_dim * 2)
self.embedding2vocab2 = nn.Linear(self.hid_dim * 2, self.hid_dim * 4)
self.embedding2vocab3 = nn.Linear(self.hid_dim * 4, self.cn_vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, encoder_outputs):
input = input.unsqueeze(1)
embedded = self.dropout(self.embedding(input))
if self.isatt:
attn = self.attention(encoder_outputs, hidden)
output, hidden = self.rnn(embedded, hidden)
output = self.embedding2vocab1(output.squeeze(1))
output = self.embedding2vocab2(output)
prediction = self.embedding2vocab3(output)
return prediction, hidden
Attention
-
当输入过长时,或是单独靠 “content vector” 无法获取整个输入的意思时,用 Attention Mechanism 来提供 Decoder 更多的资讯 -
主要是根据现在 Decoder hidden state ,去计算在 Encoder outputs 中,那些与其有较高的关系,根据关系的数值来决定传给 Decoder 哪些额外的资讯 -
常见 Attention 的操作是用 Neural Network / Dot Product 来计算 Decoder hidden state 和 Encoder outputs 之间的关系,再对所有算出來的数值做 softmax ,最后根据过完 softmax 的值对 Encoder outputs 做 weight sum -
李宏毅老师的课程在此处并没有给出具体的代码,需要大家自己补充。大家可以参考这篇文章 Seq2Seq (Attention) 的 PyTorch 实现 或者B站的视频 PyTorch35——基于注意力机制的Seq2Seq的PyTorch实现示例。
class Attention(nn.Module):
def __init__(self, hid_dim):
super(Attention, self).__init__()
self.hid_dim = hid_dim
def forward(self, encoder_outputs, decoder_hidden):
attention=None
return attention
Seq2seq模型
- 由 Encoder 和 Decoder 组成
- 接收输入并传给 Encoder
- 将 Encoder 的输出传给 Decoder
- 不断地将 Decoder 的输出传回 Decoder ,进行解码
- 当解码完成,将 Decoder 的输出传回
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
assert encoder.n_layers == decoder.n_layers, \
"Encoder and decoder must have equal number of layers!"
def forward(self, input, target, teacher_forcing_ratio):
batch_size = target.shape[0]
target_len = target.shape[1]
vocab_size = self.decoder.cn_vocab_size
outputs = torch.zeros(batch_size, target_len, vocab_size).to(self.device)
encoder_outputs, hidden = self.encoder(input)
hidden = hidden.view(self.encoder.n_layers, 2, batch_size, -1)
hidden = torch.cat((hidden[:, -2, :, :], hidden[:, -1, :, :]), dim=2)
input = target[:, 0]
preds = []
for t in range(1, target_len):
output, hidden = self.decoder(input, hidden, encoder_outputs)
outputs[:, t] = output
teacher_force = random.random() <= teacher_forcing_ratio
top1 = output.argmax(1)
input = target[:, t] if teacher_force and t < target_len else top1
preds.append(top1.unsqueeze(1))
preds = torch.cat(preds, 1)
return outputs, preds
def inference(self, input, target):
batch_size = input.shape[0]
input_len = input.shape[1]
vocab_size = self.decoder.cn_vocab_size
outputs = torch.zeros(batch_size, input_len, vocab_size).to(self.device)
encoder_outputs, hidden = self.encoder(input)
hidden = hidden.view(self.encoder.n_layers, 2, batch_size, -1)
hidden = torch.cat((hidden[:, -2, :, :], hidden[:, -1, :, :]), dim=2)
input = target[:, 0]
preds = []
for t in range(1, input_len):
output, hidden = self.decoder(input, hidden, encoder_outputs)
outputs[:, t] = output
top1 = output.argmax(1)
input = top1
preds.append(top1.unsqueeze(1))
preds = torch.cat(preds, 1)
return outputs, preds
|