# !/usr/bin/env Python3
# -*- coding: utf-8 -*-
# @version: v1.0
# @Author : Meng Li
# @contact: 925762221@qq.com
# @FILE : torch_seq2seq.py
# @Time : 2022/6/8 11:11
# @Software : PyCharm
# @site:
# @Description :
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchsummary
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
class Seq2seq(nn.Module):
def __init__(self, in_features, hidden_size):
super().__init__()
self.in_features = in_features
self.hidden_size = hidden_size
self.encoder = nn.RNN(input_size=in_features, hidden_size=hidden_size, dropout=0.5) # encoder
self.decoder = nn.RNN(input_size=in_features, hidden_size=hidden_size, dropout=0.5) # 翻译的解码器
self.crition = nn.CrossEntropyLoss()
self.fc = nn.Linear(hidden_size, in_features)
def forward(self, enc_input, dec_input, dec_output):
# enc_input.size() : [Batch_size,seq_len,embedding_size] -> [seq_len,Batch_size,embedding_size]
enc_input = enc_input.permute(1, 0, 2) # [seq_len,Batch_size,embedding_size]
dec_input = dec_input.permute(1, 0, 2) # [seq_len,Batch_size,embedding_size]
# output:[seq_len,Batch_size,hidden_size]
seq_len, batch_size, embedding_size = enc_input.size()
h_0 = torch.rand(1, batch_size, self.hidden_size)
_, ht = self.encoder(enc_input, h_0) # en_ht:[num_layers * num_directions,Batch_size,hidden_size]
de_output, _ = self.decoder(dec_input, ht) # de_output:[seq_len,Batch_size,in_features]
output = self.fc(de_output)
output = output.permute(1, 0, 2)
loss = 0
for i in range(len(output)): # 对seq的每一个输出进行二分类损失计算
loss += self.crition(output[i], dec_output[i])
return output, loss
class Seq2seq_lstm(nn.Module):
def __init__(self, in_features, hidden_size):
super().__init__()
self.in_features = in_features
self.hidden_size = hidden_size
self.encoder = nn.LSTM(input_size=in_features, hidden_size=hidden_size, dropout=0.5, num_layers=1) # encoder
self.decoder = nn.LSTM(input_size=in_features, hidden_size=hidden_size, dropout=0.5, num_layers=1) # 翻译的解码器
self.crition = nn.CrossEntropyLoss()
self.fc = nn.Linear(hidden_size, in_features)
def forward(self, enc_input, dec_input, dec_output):
# enc_input.size() : [Batch_size,seq_len,embedding_size] -> [seq_len,Batch_size,embedding_size]
enc_input = enc_input.permute(1, 0, 2) # [seq_len,Batch_size,embedding_size]
dec_input = dec_input.permute(1, 0, 2) # [seq_len,Batch_size,embedding_size]
# output:[seq_len,Batch_size,hidden_size]
seq_len, batch_size, embedding_size = enc_input.size()
h_0 = torch.rand(1, batch_size, self.hidden_size)
c_0 = torch.rand(1, batch_size, self.hidden_size)
_, (ht, ct) = self.encoder(enc_input, (h_0, c_0)) # en_ht:[num_layers * num_directions,Batch_size,hidden_size]
de_output, (_, _) = self.decoder(dec_input, (ht, ct)) # de_output:[seq_len,Batch_size,in_features]
output = self.fc(de_output)
output = output.permute(1, 0, 2)
loss = 0
for i in range(len(output)): # 对seq的每一个输出进行二分类损失计算
loss += self.crition(output[i], dec_output[i])
return output, loss
class my_dataset(Dataset):
def __init__(self, enc_input, dec_input, dec_output):
super().__init__()
self.enc_input = enc_input
self.dec_input = dec_input
self.dec_output = dec_output
def __getitem__(self, index):
return self.enc_input[index], self.dec_input[index], self.dec_output[index]
def __len__(self):
return self.enc_input.size(0)
def make_data1(seq_data):
vocab = [i for i in "SE?abcdefghijklmnopqrstuvwxyz"]
word2idx = {j: i for i, j in enumerate(vocab)}
V = np.max([len(j) for i in seq_data for j in i]) # 求最长元素的长度
enc_input = []
dec_input = []
dec_output = []
for seq in seq_data:
enc_input.append(np.eye(len(word2idx))[[word2idx[i] for i in seq[0] + (V - len(seq[0])) * "?" + 'E']])
dec_input.append(np.eye(len(word2idx))[[word2idx[i] for i in 'S' + seq[1] + (V - len(seq[1])) * "?"]])
dec_output.append([word2idx[i] for i in seq[1] + (V - len(seq[1])) * "?" + 'E'])
return torch.tensor(enc_input).double(), torch.tensor(dec_input).double(), torch.LongTensor(dec_output).double()
def train():
vocab = [i for i in "SE?abcdefghijklmnopqrstuvwxyz上下人低国女孩王男白色高黑"]
word2idx = {j: i for i, j in enumerate(vocab)}
idx2word = {i: j for i, j in enumerate(vocab)}
seq_data = [['man', '男人'], ['black', '黑色'], ['king', '国王'], ['girl', '女孩'], ['up', '上'],
['high', '高'],['women', '女人'],['white', '白色'],['boy', '男孩'],['down', '下'],['low', '低'],['queen', '女王']]
enc_input, dec_input, dec_output = make_data(seq_data)
batch_size = 3
in_features = len(vocab)
hidden_size = 128
train_data = my_dataset(enc_input, dec_input, dec_output)
train_iter = DataLoader(train_data, batch_size, shuffle=True)
net = Seq2seq_lstm(in_features, hidden_size)
net.train()
learning_rate = 0.001
optimizer = optim.Adam(net.parameters(), lr=learning_rate)
loss = 0
for i in range(1000):
for en_input, de_input, de_output in train_iter:
output, loss = net(en_input, de_input, de_output)
pre = torch.argmax(output, 2)
# pre_ques = [[idx2word[j] for j in i] for i in en_input.numpy()]
pre_ret = [[idx2word[j] for j in i] for i in pre.detach().numpy()]
optimizer.zero_grad()
loss.backward()
optimizer.step()
if i % 100 == 0:
print("step {0} loss {1}".format(i, loss))
torch.save(net, "translate.pt")
def make_data(seq_data):
enc_input_all, dec_input_all, dec_output_all = [], [], []
vocab = [i for i in "SE?abcdefghijklmnopqrstuvwxyz上下人低国女孩王男白色高黑"]
word2idx = {j: i for i, j in enumerate(vocab)}
V = np.max([len(j) for i in seq_data for j in i]) # 求最长元素的长度
for seq in seq_data:
for i in range(2):
seq[i] = seq[i] + '?' * (V - len(seq[i])) # 'man??', 'women'
enc_input = [word2idx[n] for n in (seq[0] + 'E')]
dec_input = [word2idx[i] for i in [i for i in len(enc_input) * '?']]
dec_output = [word2idx[n] for n in (seq[1] + 'E')]
enc_input_all.append(np.eye(len(vocab))[enc_input])
dec_input_all.append(np.eye(len(vocab))[dec_input])
dec_output_all.append(dec_output) # not one-hot
# make tensor
return torch.Tensor(enc_input_all), torch.Tensor(dec_input_all), torch.LongTensor(dec_output_all)
def translate(word):
vocab = [i for i in "SE?abcdefghijklmnopqrstuvwxyz上下人低国女孩王男白色高黑"]
idx2word = {i: j for i, j in enumerate(vocab)}
V = 5
x, y, z = make_data([[word, "?" * V]])
if not os.path.exists("translate.pt"):
train()
net = torch.load("translate.pt")
pre, loss = net(x, y, z)
pre = torch.argmax(pre, 2)[0]
pre_word = [idx2word[i] for i in pre.numpy()]
pre_word = "".join([i.replace("?", "") for i in pre_word])
print(word, "-> ", pre_word[:pre_word.index('E')])
if __name__ == '__main__':
before_test = ['man', 'black', 'king', 'girl', 'up', 'high', 'women', 'white', 'boy', 'down', 'low', 'queen', 'mman', 'woman']
[translate(i) for i in before_test]
# train()
废话不说,直接上代码
?可以看到,红色方框里,这两个非标准的英文单词还是根据最相似法翻译成了对应的中文汉字
?本文是实现英文翻译成中文,整个算法是基于LSTM的Seq2seq模型。
word2idx 是字母/汉字 到整数的映射 , 这么做是将语料转化为计算机可识别的数字,将该数字转化成one-hot形式的向量,作为编码器的输入
编码器和解码器都是LSTM的循环神经网络,编码器的输入为源语言表示的待翻译的语句向量,编码器的输出作为解码器的隐含层的输入,解码器的输入为大小为|V|,元素内容为“?” 的向量。
|