IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 人工智能 -> Pytorch构建Transformer实现英文翻译 -> 正文阅读

[人工智能]Pytorch构建Transformer实现英文翻译

# !/usr/bin/env Python3
# -*- coding: utf-8 -*-
# @version: v1.0
# @Author   : Meng Li
# @contact: 925762221@qq.com
# @FILE     : torch_transformer.py
# @Time     : 2022/6/22 15:10
# @Software : PyCharm
# @site: 
# @Description : 自己实现的基于Encoder-Decoder的Transformer模型
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
import math

torch.backends.cudnn.enabled = False
dim = 64  # Q K V 矩阵的维度
embed_size = 512
batch_size = 2
num_heads = 8
num_layers = 6
d_ff = 2048  # FeedForward dimension
dropout = 0.5

sentences = [
    # enc_input           dec_input         dec_output
    ['ich mochte ein bier P', 'S i want a beer .', 'i want a beer . E'],
    ['ich mochte ein cola P', 'S i want a coke .', 'i want a coke . E']
]
src_vocab_size = len(set(np.array([i[0].split(" ") for i in sentences]).flatten()))  # 计算sentences中enc_input中单词最长的长度
# 计算sentences中 dec_input + dec_output 中单词最长的长度
dst_vocab_size = len(
    set(np.array([i[1].split(" ") for i in sentences] + [i[2].split(" ") for i in sentences]).flatten()))


class my_dataset(Dataset):
    def __init__(self, enc_inputs, dec_inputs, dec_outputs):
        super(my_dataset, self).__init__()
        self.enc_inputs = enc_inputs
        self.dec_inputs = dec_inputs
        self.dec_outputs = dec_outputs

    def __getitem__(self, index):
        return self.enc_inputs[index], self.dec_inputs[index], self.dec_outputs[index]

    def __len__(self):
        return self.enc_inputs.size(0)  # 返回张量的第一个维度


def get_attn_pad_mask(seq_q, seq_k):
    """
    :param seq_q:  seq_q -> [Batch_size, len_q]
    :param seq_k:  seq_k -> [Batch_size, len_k]
    :return:
    """
    Batch_size, len_q = seq_q.size()
    Batch_size, len_k = seq_k.size()
    atten_mask = seq_k.eq(0).unsqueeze(1)  # atten_mask -> [Batch_size, 1, len_k]
    atten_mask = atten_mask.expand(Batch_size, len_q, len_k)  # atten_mask -> [Batch_size, len_q, len_k]
    return atten_mask


def get_attn_subsequence_mask(seq):
    """
    seq: [batch_size, tgt_len]
    """
    attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
    subsequence_mask = np.triu(np.ones(attn_shape), k=1)  # Upper triangular matrix
    subsequence_mask = torch.from_numpy(subsequence_mask).byte()
    return subsequence_mask  # [batch_size, tgt_len, tgt_len]


def make_data(seq_data):
    """
    :param seq_data:
    :return: 返回的是三个张量:enc_inputs, dec_inputs, dec_outputs 张量维度分别是[Batch_size, seq_len, embed_size]
    seq_len是句子长度, embed_size是词汇表长度
    """
    src_vocab = [i[0].split(" ") for i in seq_data]
    src_vocab = set(np.array(src_vocab).flatten())
    target_vocab = [i[1].split(" ") for i in seq_data] + [i[2].split(" ") for i in seq_data]
    target_vocab = set(np.array(target_vocab).flatten())

    enc_input_all, dec_input_all, dec_output_all = [], [], []
    src_word2idx = {j: i for i, j in enumerate(src_vocab)}
    dst_word2idx = {j: i for i, j in enumerate(target_vocab)}

    for seq in seq_data:
        enc_input = [src_word2idx[n] for n in seq[0].split(" ")]
        dec_input = [dst_word2idx[i] for i in seq[1].split(" ")]
        dec_output = [dst_word2idx[i] for i in seq[2].split(" ")]

        enc_input_all.append(enc_input)
        dec_input_all.append(dec_input)
        dec_output_all.append(dec_output)  # not one-hot
    # make tensor
    return torch.LongTensor(enc_input_all), torch.LongTensor(dec_input_all), torch.LongTensor(dec_output_all)


enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
train_data = my_dataset(enc_inputs, dec_inputs, dec_outputs)
train_data_iter = DataLoader(train_data, batch_size, shuffle=False)


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        x: [seq_len, batch_size, d_model]
        """
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


class Multi_Head_Attention(nn.Module):
    def __init__(self):
        super().__init__()
        self.W_Q = nn.Linear(embed_size, dim * num_heads, bias=False)  # 将输入矩阵映射为低维度
        self.W_K = nn.Linear(embed_size, dim * num_heads, bias=False)  # 将输入矩阵映射为低维度
        self.W_V = nn.Linear(embed_size, dim * num_heads, bias=False)  # 将输入矩阵映射为低维度
        self.projection = torch.nn.Linear(num_heads * dim, embed_size)  # 将atten的维度转换为与输入的维度一致

    def forward(self, input_Q, input_K, input_V, atten_mask):
        """
        :param input_Q: -> [Batch_size, len_q, embedding_size]
        :param input_K: -> [Batch_size, len_k, embedding_size]
        :param input_V: -> [Batch_size, len_v(=len_k), embedding_size]
        :param atten_mask:  -> [Batch_size, atten_len_k, atten_len_v]
        :return: 这里的dim是QKV矩阵的维度
        # 对输入求得Q、K、V三个矩阵,然后根据Q和K矩阵求得注意力矩阵,最后根据注意力矩阵求得经过Masked后的注意力矩阵
        # 返回的enc_inputs 和 atten 张量维度的一样的
        """
        torch.backends.cudnn.enabled = False
        residual = input_Q  # [Batch_size, len_q, embedding_size]  这里是残差项,多层注意力的输出与此项相加
        _, len_q, embedding_size = input_Q.size()
        Batch_size, atten_len_k, atten_len_v = atten_mask.size()
        # 输入乘以矩阵得到Q、K、V矩阵
        Q = self.W_Q(input_Q)  # Q -> [Batch_size, len_q, dim*num_heads]
        K = self.W_K(input_K)  # K -> [Batch_size, len_k, dim*num_heads]
        V = self.W_V(input_V)  # V -> [Batch_size, len_v, dim*num_heads]
        Q_Kt = torch.matmul(Q, K.transpose(-1, -2))  # QK_t -> [Batch_size, len_q, len_k]
        Q_Kt_div = Q_Kt / math.sqrt(dim)
        atten = torch.matmul(Q_Kt_div, V)  # atten -> [Batch_size, len_q, dim*num_heads]
        atten = atten.unsqueeze(1)  # atten -> [Batch_size, 1, len_q, dim*num_heads]
        atten = atten.view(Batch_size, -1, len_q, dim)  # atten -> [Batch_size, num_heads, len_q, dim]
        atten = nn.Softmax(dim=-1)(atten)  # [batch_size, n_heads, len_q]
        atten_mask = atten_mask.unsqueeze(1)  # atten_mask -> [Batch_size, 1, atten_len_k, atten_len_v]
        # atten_mask -> [Batch_size, num_heads, atten_len_k, atten_len_v]  这里的 atten_len_v == len_q
        atten_mask = atten_mask.repeat(1, num_heads, 1, 1)
        atten = torch.matmul(atten_mask.float(), atten.float())  # atten -> [Batch_size, num_heads, atten_len_k, dim]
        atten = atten.transpose(1, 2)  # atten -> [Batch_size, atten_len_k, num_heads, dim]

        atten = atten.reshape(Batch_size, atten_len_k, -1)  # atten -> [Batch_size, atten_len_k, num_heads * dim]
        atten = self.projection(atten)  # atten -> [Batch_size, atten_len_k, embed_size] atten_len_k == len_q
        # softmax  不改变矩阵的维度,这里对行方向对行向量进行归一化  这里对输出和残差 进行Add && Norm 操作
        atten_ret = (residual + torch.softmax(atten, dim=1))
        atten_ret = nn.LayerNorm(embed_size).to(device)(atten_ret)
        return atten_ret


class Feed_forward(nn.Module):
    """
    对应于原论文中的Feed-Forward流程
    查看某个数据是否存储于cuda上,可键入命令: x.is_cuda 其中x为变量
    """

    def __init__(self):
        super().__init__()
        self.W1 = nn.Linear(embed_size, d_ff).to(device)
        self.W2 = nn.Linear(d_ff, embed_size).to(device)
        self.b1 = torch.rand(d_ff).to(device)
        self.b2 = torch.rand(embed_size).to(device)
        self.relu = nn.ReLU().to(device)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, enc_inputs):
        """
        :param enc_inputs: # enc_inputs -> [Batch_size, seq_len, embedding_size]
        # atten -> [Batch_size, seq_len, embedding_size]
        :return:
        """
        fc1 = self.W1(enc_inputs) + self.b1
        fc1 = self.relu(fc1)
        fc2 = self.W2(fc1) + self.b2  # fc2 -> [Batch_size, seq_len, embedding_size]
        output = fc2  # output -> [Batch_size, seq_len, embedding_size]
        residual = enc_inputs
        Add_And_Norm = nn.LayerNorm(embed_size).cuda()(output + residual)
        return Add_And_Norm


class Encoder_layer(nn.Module):
    def __init__(self):
        super().__init__()
        self.multi_head_attention = Multi_Head_Attention()
        self.feed_forward = Feed_forward()

    def forward(self, enc_inputs, enc_atten_mask):
        """
        :param enc_inputs:  # enc_inputs -> [Batch_size, seq_len, embedding_size]
        :param enc_atten_mask:   # enc_atten_mask -> [Batch_size, seq_len, seq_len]
        :return:
        """
        # 传入多层注意力机制的输入Q、K、V 都假定为一样的
        atten_output = self.multi_head_attention(enc_inputs, enc_inputs, enc_inputs, enc_atten_mask)  # 这里得到的是注意力矩阵
        output = self.feed_forward(atten_output).to(device)  # output -> [Batch_size, seq_len, embeded_size]
        return output, atten_output


class Decoder_layer(nn.Module):
    def __init__(self):
        super().__init__()
        self.masked_multi_head_attention = Multi_Head_Attention()
        self.multi_head_attention = Multi_Head_Attention()
        self.feed_forward = Feed_forward()
        self.embed = torch.nn.Embedding(dst_vocab_size, embed_size)

    def forward(self, dec_input, enc_output, dec_atten_mask, dec_mask_atten_mask):
        """
        :param dec_input: [Batch_size, dst_len]
        :param enc_output: [Batch_size, src_len]
        :param dec_atten_mask: [Batch_size, dst_len, dst_len]
        :param dec_mask_atten_mask: [Batch_size, dst_len, src_len]
        :return: [Batch_size,dst_len,embedding_size]
        查看变量类型   -> 采用 data.dtype
        """
        # 得到Decoder Layer的第一个多层注意力输出,输入为解码层的输入
        masked_atten_outputs = self.masked_multi_head_attention(dec_input, dec_input, dec_input, dec_atten_mask)
        # masked_atten_outputs -> [Batch_size, dst_len, embedding_size]
        # enc_outputs -> [Batch_size, src_len, embedding_size]
        # Decoder层第二个多层注意力机制, K和V采用Encoder编码信息矩阵进行计算,Q采用上一个Decoder block进行计算
        dec_atten_outputs = self.multi_head_attention(masked_atten_outputs, enc_output, enc_output,
                                                      dec_mask_atten_mask)
        output = self.feed_forward(dec_atten_outputs)  # output -> [Batch_size, dst_len, embeded_size]
        return output, dec_atten_outputs


class Encoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.ModuleList(Encoder_layer() for _ in range(num_layers))
        self.embed = torch.nn.Embedding(src_vocab_size, embed_size)
        self.pos = PositionalEncoding(embed_size)

    def forward(self, enc_input_encoder):
        """
        :param enc_input_encoder: ->  [Batch_size, seq_len]
        :param enc_atten_mask: -> [Batch_size, seq_len, seq_len]
        :return:
        """
        enc_atten_mask = get_attn_pad_mask(enc_input_encoder, enc_input_encoder)  # [Batch_size, seq_len, seq_len]
        enc_input_embed = self.embed(enc_input_encoder)  # output -> [Batch_size, seq_len, embed_size]
        output = self.pos(enc_input_embed.transpose(0, 1)).transpose(0, 1)
        for layer in self.layers:
            output, atten = layer(output, enc_atten_mask)  # output -> [Batch_size, seq_len, embedding_size]
        return output


class Decoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.ModuleList(Decoder_layer() for _ in range(num_layers))
        self.embed = torch.nn.Embedding(dst_vocab_size, embed_size)  # 这里的维度一定要正确,否则会由于索引超过范围报错
        self.pos = PositionalEncoding(embed_size)

    def forward(self, decoder_enc_input, decoder_dec_input, decoder_enc_output):
        """
        :param decoder_enc_input: [Batch_size, dst_len]
        :param decoder_dec_input: [Batch_size, dst_len]
        :param decoder_enc_output:[Batch_size, src_len, embedding_size]
        :return:
        """
        dec_inputs_embed = self.embed(decoder_dec_input)  # [Batch_size, dst_len, embedding_size]
        # output = self.pos(dec_inputs_embed)
        output = self.pos(dec_inputs_embed.transpose(0, 1)).transpose(0, 1).to(device)
        dec_mask_atten_mask = get_attn_pad_mask(dec_inputs, dec_inputs).to(device)  # [Batch_size, dst_len, dst_len]
        dec_atten_mask = get_attn_pad_mask(decoder_dec_input, decoder_dec_input).to(device)  # [Batch_size, dst_len, src_len]
        for layer in self.layers:
            output, atten = layer(output, decoder_enc_output, dec_mask_atten_mask, dec_atten_mask)
            # output, atten = layer(output, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
        # output -> [Batch_size, seq_len, embedding_size]
        # return nn.Softmax(dim=0)(output)
        return output


class Transformer(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = Encoder().to(device)
        self.decoder = Decoder().to(device)
        self.lr_rate = 1e-3
        self.optim = torch.optim.Adam(self.parameters(), lr=self.lr_rate)
        self.crition = nn.CrossEntropyLoss(ignore_index=0)
        self.projection = nn.Linear(embed_size, dst_vocab_size, bias=False).to(device)

    def forward(self, enc_input, dec_input, dec_output):
        """
        :param enc_input:  -> [Batch_size, src_len]
        :param dec_input:  -> [Batch_size, dst_len]
        :param dec_output: -> [Batch_size, dst_len]
        :return:
        """
        encode_outputs = self.encoder(enc_input)  # enc_outputs -> [Batch_size, src_len, embedding_size]
        # decode_output -> [Batch_size, dst_len, embedding_size]
        decode_output = self.decoder(enc_input, dec_input, encode_outputs)
        decode_output = self.projection(decode_output)  # decode_output -> [Batch_size, dst_len, dst_vocab_size]
        out_put = torch.argmax(decode_output, 2)  # 沿维度为2的维度求取最大值的索引
        loss = 0
        # decode_output = decode_output.view(-1, out_put.size(-1))
        # out_put = out_put.view(
        # for i in range(len(decode_output)):  # 对seq的每一个输出进行二分类损失计算
        #     loss += self.crition(decode_output[i], dec_output[i])
        dec_output = dec_output.view(-1)  # decode_output -> [Batch_size * dst_len]
        decode_output = decode_output.view(-1, decode_output.size(-1))  # [Batch_size * dst_len, dst_vocab_size]
        # print("dec_output.size() ", dec_output.size())
        # print("decode_output.size()", decode_output.size())
        loss = self.crition(decode_output, dec_output)
        return out_put, loss


device = "cuda" if torch.cuda.is_available() else "cpu"
model = Transformer().to(device)
lr_rate = 1e-3
# optimizer = torch.optim.Adam(model.parameters(), lr=lr_rate)
optimizer = torch.optim.SGD(model.parameters(), lr=lr_rate, momentum=0.99)

for i in range(1000):
    for enc_inputs_i, dec_inputs_i, dec_output_i in train_data_iter:
        enc_inputs_i, dec_inputs_i, dec_output_i = enc_inputs_i.to(device), dec_inputs_i.to(device), dec_output_i.to(
            device)
        predict, loss_i = model(enc_inputs_i, dec_inputs_i, dec_output_i)
        optimizer.zero_grad()
        loss_i.backward()
        optimizer.step()
        if i % 100 == 0:
            print("step {0}  loss {1}".format(i, loss_i))
            # print("predict ", predict)

先上代码。

根据Transformer的论文,采用Pytorch框架基于Encoder_Decoder+Multi-Head Attention多层注意力模型构建的Transformer框架,用来进行机器翻译

为了方便,语料库采用自己构造的语料库

    # enc_input           dec_input         dec_output
    ['ich mochte ein bier P', 'S i want a beer .', 'i want a beer . E'],
    ['ich mochte ein cola P', 'S i want a coke .', 'i want a coke . E']

数据集的构建采用DataSet和DataLoader两个原生类,将语料库采用键值对的形式进行保存

enc_input: [[1, 2, 3, 4, 0], [1, 2, 3, 5, 0]]
dec_input: [[6, 1, 2, 3, 4, 8], [6, 1, 2, 3, 5, 8]]
dec_output:[[1, 2, 3, 4, 8, 7], [1, 2, 3, 5, 8, 7]]

这三个输入分别为编码器的输入,解码器的输入以及解码器的输出。这里解码器的输入为真实的解码器的输出是为了在编解码器模型中采用Teacher-Forcing模型进行训练。(与不采用Teacher-Forcing方法相比,在小规模的样本中模型的准确率区别不大,大规模样本中没有测试过)

根据Transforer原论文,每个词向量维度为512维,这里采用Torch.nn.Embedding将词索引转化为向量

self.embed = torch.nn.Embedding(src_vocab_size, embed_size)
src_vocab_size 为语料库的大小,embed_size为向量的维度。由于Transformer没有向RNN/LSTM/GRU等循环神经网络一样对具有时序性的序列进行建模,故这里有一个基于位置的Encoding:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        x: [seq_len, batch_size, d_model]
        """
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

其实就是将语料根据相对位置采用正余弦函数进行编码。将编码后的值与self.embed进行累加,作为网络的输入,此时向量中既包含了不同分词之间的位置信息又包含了语义信息

enc_input_embed = self.embed(enc_input_encoder)  
output = self.pos(enc_input_embed.transpose(0, 1)).transpose(0, 1)

在编码器中,

Encoder由num_heads个同样大小的编码器串联。为什么要串联,我个人觉得跟CV里面的加深网络深度差不多的意思。这里采用 nn.ModuleList 将num_heads个编码器连接起来

self.layers = nn.ModuleList(Encoder_layer() for _ in range(num_layers))

解码器的构造跟编码器类似,但是解码器有两个多层注意力层

第一个多层注意力层是将解码器的输入进行注意力转换得到与输入同样大小的输出

比如,解码器的输入在经过位置编码和embedding后的输入:dec_inputs -> [Batch_size, dst_len, embedding_size]

解码器的输出也是同样大小,dec_outputs -> [Batch_size, dst_len, embedding_size]

第二层注意力层的input_Q为第一层的输出,input_K和input_Q为Encoder的输出,这样做的目的是根据解码器的输入找到和编码器的输出之前的相关性。

  人工智能 最新文章
2022吴恩达机器学习课程——第二课(神经网
第十五章 规则学习
FixMatch: Simplifying Semi-Supervised Le
数据挖掘Java——Kmeans算法的实现
大脑皮层的分割方法
【翻译】GPT-3是如何工作的
论文笔记:TEACHTEXT: CrossModal Generaliz
python从零学(六)
详解Python 3.x 导入(import)
【答读者问27】backtrader不支持最新版本的
上一篇文章      下一篇文章      查看所有文章
加:2022-07-03 10:48:33  更:2022-07-03 10:51:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 3:34:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码