基本概念
自注意力机制
人类在观察事物时,无法同时仔细观察眼前的一切,只能聚焦到某一个局部。通常我们大脑在简单了解眼前的场景后,能够很快把注意力聚焦到最有价值的局部来仔细观察,从而作出有效判断。或许是基于这样的启发,大家想到了在算法中利用注意力机制。
计算自注意力的第一步是依据每个编码器的输入向量(在这种情况下,是每个单词的embedding)创建三个向量。因此,对于每个单词,我们创建一个Query向量,一个Key向量和一个Value向量。通过将embedding乘以我们在训练过程中训练的三个矩阵来创建这些向量。 计算自注意力的第二步是计算一个分数(score)。假设我们正在计算一个单词“Thinking”的自注意力。我们需要根据该单词对输入句子中的每个单词打分。这个分数决定了当我们为某个位置的单词编码时,在输入句子的其他部分上的重视程度。
第三和第四步是将分数除以一个数(论文中使用的Key向量维数的平方根,即sqrt(k_dim)。这将引入更稳定的渐变。此处也许会存在其他可能的值,但这是默认值),然后将结果通过一个softmax操作传递。Softmax对分数进行归一化,使它们均为正数,并且和为一。 这个softmax分数将会决定在这个位置上的单词会在多大程度上被表达。显然,当前位置单词的softmax得分最高,但有时候,注意一下与当前单词相关的另一个单词也会很有用。
第五步是将每个Value向量乘以softmax分数(对后续求和的准备工作)。这里直觉的反应是保持我们要关注的单词的value完整,并压过那些无关的单词(例如,通过把它们乘以0.001这样的很小的数)。 有时候,我们并不想在翻译的时候看到整个序列,我们只想让它看见它左边的序列,而要把右边的序列遮蔽(Mask)起来。例如在transformer的decoder层中,我们就用到了masked attention,这样的操作可以理解为模型为了防止decoder在解码encoder层输出时“作弊”,提前看到了剩下的答案,因此需要强迫模型根据输入序列左边的结果进行attention。 实现代码:
# 缩放点积注意力层
# 计算公式:
# attention(q,k,v)=soft(q*kT/sqrt(dk)+mask)*v
# 输入矩阵形状:
# q(..., seq_len_q, k_dim)
# k(..., seq_len_k, k_dim)
# v(..., seq_len_v, v_dim)
# mask(..., seq_len_q, seq_len_k)
# 输出矩阵形状
# attention(..., seq_len_q, v_dim)
# attention_weights(..., seq_len_q, seq_len_k)
def scaled_dot_product_attention(q, k, v, mask):
# q*kT
matmul_qk = tf.matmul(q, k, transpose_b=True)
# 使用dk进行缩放,dk=k_dim
dk = tf.shape(k)[-1]
scaled_attention = matmul_qk / tf.sqrt(tf.cast(dk, tf.float32))
# 添加掩码
if mask is not None:
scaled_attention += (mask * -1e9)
# 获取attention weights矩阵
attention_weights = tf.nn.softmax(scaled_attention, axis=-1)
# 获取attention矩阵
attention = tf.matmul(attention_weights, v)
return attention, attention_weights
多头注意力机制
多头注意力机制能让每个注意力去优化每个词汇的不同特征部分,从而均衡同一种注意力机制可能产生的偏差,让词义拥有更多的表达。
举个形象的例子,bank是银行的意思,如果只有一个注意力模块,那么它大概率会学习去关注类似money、loan贷款这样的词。如果我们使用多头机制,那么不同的头就会去关注不同的语义,比如bank还有一种含义是河岸,那么可能有一个头就会去关注类似river这样的词汇,这时多头注意力的价值就体现出来了。
实现代码:
# 多头注意力层
class MultiHeadAttention(tf.keras.layers.Layer):
# d_model为词向量的维数
# num_heads为头的数量,也就是num_heads个q,k,v个矩阵
# seq_len为句子的长度
def __init__(self, num_heads, d_model):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
# k_dim = dk
self.k_dim = d_model // num_heads
# 全连接层
self.wq = tf.keras.layers.Dense(d_model)
self.wk = tf.keras.layers.Dense(d_model)
self.wv = tf.keras.layers.Dense(d_model)
# 全连接层
self.dense = tf.keras.layers.Dense(d_model)
# 分离出多个头
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.k_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
# 输入矩阵形状:
# q(..., seq_len_q, k_dim)
# k(..., seq_len_k, k_dim)
# v(..., seq_len_v, v_dim)
# mask(..., seq_len, seq_len)
# 输出矩阵形状
# attention(..., seq_len_q, v_dim)
# attention_weights(..., seq_len_q, seq_len_k)
# 理论上计算方式:
# x * wq =>q, x * wk =>k, x * wv =>v
# 实际上计算方式:
# q => x_q =>q, k => x_k =>k, v => x_v => v
def __call__(self, q, k, v, mask):
batch_size = tf.shape(q)[0]
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
x_q = self.wq(q)
x_k = self.wk(k)
x_v = self.wv(v)
#
# (batch_size, seq_len, d_model)=>(batch_size, num_heads, seq_len, k_dim)
q = self.split_heads(x_q, batch_size)
k = self.split_heads(x_k, batch_size)
v = self.split_heads(x_v, batch_size)
# attention:(batch_size, num_heads, seq_len, k_dim)
# attention_weights:(batch_size, num_heads, seq_len, seq_len)
attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
# (batch_size, num_heads, seq_len, k_dim)=>(batch_size, seq_len, num_heads, k_dim)
attention = tf.transpose(attention, perm=[0, 2, 1, 3])
# (batch_size, seq_len, num_heads, k_dim)=>(batch_size, seq_len, d_model)
attention = tf.reshape(attention, (batch_size, -1, self.d_model))
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
attention = self.dense(attention)
return attention, attention_weights
位置编码
对于任何神经网络架构,能够有效识别每个词的位置与词之间的顺序是十分关键的。传统的循环神经网络(RNN)本身通过自回归的方式考虑了单词之间的顺序。然而Transformer 架构不同于RNN,Transformer 使用纯粹的自注意力机制来捕获词之间的联系。纯粹的自注意力机制具有置换不变的性质。换句话说,Transformer中的自注意力机制无法捕捉输入元素序列的顺序。因此我们需要一种方法将单词的顺序合并到Transformer架构中,于是位置编码应运而生。
值得一提的是,添加的位置编码向量并不需要训练,它有一个规则的产生方式,每一个位置对应一个特定的编码。如下所示,位置编码的计算公式为: 其中,pos代表的是一个字在句子中的位置,i代表的是dim 的序号。
实现代码:
# 位置编码
# 计算公式:
# PE(pos,2i) = sin(pos/10000^(2i/d_model))
# PE(pos,2i+1) = cos(pos/10000^(2i/d_model))
# pos为单词在句子中的位置,i为单词所映射的特征向量的第i个元素,d_model表示每一个单词映射的向量维数
# 编码中的奇数位用cos来编码,偶数位用sin来编码
def get_positional_encoding(sentence_length, d_model):
pos = np.expand_dims(np.arange(sentence_length), axis=-1)
i = np.expand_dims(np.arange(d_model), axis=0)
result = pos * np.power(10000, np.float32(2 * (i // 2) / d_model))
# 使用sin函数计算偶数列的向量元素
result_sin = np.sin(result[:, 0::2])
# 使用cos函数计算奇数列的向量元素
result_cos = np.cos(result[:, 1::2])
# 将偶数列的奇数列的元素连接在一起
position_embedding = np.concatenate((result_sin, result_cos), axis=-1)
# 增加数组的维数,最终形状为(1, sentence_length, d_model)
position_embedding = np.expand_dims(position_embedding, axis=0)
# 将numpy数组转换成tensorflow张量形式
return tf.cast(position_embedding, tf.float32)
Transfomer
Transformer的内部结构较为复杂,是由编码组件、解码组件和它们之间的连接组成。编码组件部分由一堆编码器(encoder)构成,解码组件部分也是由相同数量(与编码器对应)的解码器(decoder)组成的。所有的编码器在结构上都是相同的,但它们没有共享参数。
每个编码器和解码器都可以分解成两个子层: 从编码器输入的句子首先会经过一个自注意力(self-attention)层,这层帮助编码器在对每个单词编码时关注输入句子的其他单词。自注意力层的输出会传递到前馈(feed-forward)神经网络中。 解码器中也有编码器的自注意力(self-attention)层和前馈(feed-forward)层。除此之外,这两个层之间还有一个mask注意力层,用来关注输入句子的相关部分。 实现代码:
# 前馈神经网络层
class FeedForwardNetwork(tf.keras.layers.Layer):
def __init__(self, d_model, d_ff):
super(FeedForwardNetwork, self).__init__()
self.dense1 = tf.keras.layers.Dense(d_ff, activation="relu")
self.dense2 = tf.keras.layers.Dense(d_model)
def __call__(self, x):
output = self.dense1(x)
output = self.dense2(output)
return output
# 编码器层
class Encoder(tf.keras.layers.Layer):
def __init__(self, num_heads, d_model, d_ff, rate=0.1):
super(Encoder, self).__init__()
# 多头注意力层
self.mha = MultiHeadAttention(num_heads, d_model)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.ln1 = tf.keras.layers.LayerNormalization()
# 前馈网络层
self.ffn = FeedForwardNetwork(d_model, d_ff)
self.dropout2 = tf.keras.layers.Dropout(rate)
self.ln2 = tf.keras.layers.LayerNormalization()
def __call__(self, x, padding_mask, training):
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
mha_output, _ = self.mha(x, x, x, padding_mask)
dropout_output1 = self.dropout1(mha_output, training=training)
ln_output1 = self.ln1(x + dropout_output1)
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
ffn_output = self.ffn(ln_output1)
dropout_output2 = self.dropout2(ffn_output, training=training)
ln_output2 = self.ln2(ln_output1 + dropout_output2)
return ln_output2
# 解码器层
class Decoder(tf.keras.layers.Layer):
def __init__(self, num_heads, d_model, d_ff, rate=0.1):
super(Decoder, self).__init__()
self.mha1 = MultiHeadAttention(num_heads, d_model)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.ln1 = tf.keras.layers.LayerNormalization()
self.mha2 = MultiHeadAttention(num_heads, d_model)
self.dropout2 = tf.keras.layers.Dropout(rate)
self.ln2 = tf.keras.layers.LayerNormalization()
self.ffn = FeedForwardNetwork(d_model, d_ff)
self.dropout3 = tf.keras.layers.Dropout(rate)
self.ln3 = tf.keras.layers.LayerNormalization()
# encoder_output为编码器的输出,形状为(batch_size, seq_len, d_model)
def __call__(self, x, encoder_output, look_ahead_mask, padding_mask, training):
mha_output1, attention_weights1 = self.mha1(x, x, x, look_ahead_mask)
dropout_output1 = self.dropout1(mha_output1, training=training)
ln_output1 = self.ln1(x + dropout_output1)
mha_output2, attention_weights2 = self.mha2(ln_output1, encoder_output, encoder_output, padding_mask)
dropout_output2 = self.dropout2(mha_output2, training=training)
ln_output2 = self.ln2(ln_output1 + dropout_output2)
ffn_output = self.ffn(ln_output2)
dropout_output3 = self.dropout3(ffn_output, training=training)
ln_output3 = self.ln3(ln_output2 + dropout_output3)
return ln_output3, attention_weights1, attention_weights2
class Transformer(tf.keras.Model):
# input_vocab_size表示需要翻译的语言所有句子的词汇表的大小
# target_vocab_size表示翻译成目标语言的所有句子的词汇表的大小
# num_heads表示头的数量,也就是num_heads个q,k,v个矩阵
# d_model表示每一个单词映射的向量维数
# d_ff为前馈神经网络层第一个dense层单词所映射的向量维数
# max_seq_len表示最大句子的长度
# rate为丢弃层丢弃率
# num_layers表示编码器和解码器的数量
def __init__(self, input_vocab_size, target_vocab_size, num_heads,
max_seq_len, d_model, d_ff, num_layers, rate=0.1):
super(Transformer, self).__init__()
self.input_vocab_size = input_vocab_size
self.num_heads = num_heads
self.num_layers = num_layers
# 嵌入层
self.embedding1 = tf.keras.layers.Embedding(input_vocab_size, d_model)
# 位置编码
self.position_embedding1 = get_positional_encoding(max_seq_len, d_model)
# 丢弃层
self.dropout1 = tf.keras.layers.Dropout(rate)
# # 单层编码器
# self.encoder = Encoder(num_heads, d_model, d_ff, rate)
# 多层编码器
self.encoders = [Encoder(num_heads, d_model, d_ff, rate) for _ in range(num_layers)]
# 嵌入层
self.embedding2 = tf.keras.layers.Embedding(target_vocab_size, d_model)
# 位置编码
self.position_embedding2 = get_positional_encoding(max_seq_len, d_model)
# 丢弃层
self.dropout2 = tf.keras.layers.Dropout(rate)
# # 单层解码器
# self.decoder = Decoder(num_heads, d_model, d_ff, rate)
# 多层解码器
self.decoders = [Decoder(num_heads, d_model, d_ff, rate) for _ in range(num_layers)]
# 全连接层
self.dense = tf.keras.layers.Dense(target_vocab_size)
# 数据流向
# encoder_input=>(embedding, dropout, encoders)=>encoder_output
# decoder_input=>(embedding, dropout, encoder_output=>(decoders))=>decoder_output
# decoder_output=>(dense)=>output
def __call__(self, encoder_input, decoder_input, encoder_padding_mask,
decoder_look_ahead_mask, decoder_padding_mask, training):
# (batch_size, seq_len)=>(batch_size, seq_len, d_model)
encoder_output = self.embedding1(encoder_input)
# tf.shape(encoder_input)[1]表示编码器的输入句子长度
# (batch_size, seq_len, d_model)和(1, seq_len, d_model)相加
# 相当于batch中的每一个(seq_len, d_model)形状的矩阵与另外一个(seq_len, d_model)形状的矩阵相加
encoder_output += self.position_embedding1[:, :tf.shape(encoder_input)[1], :]
encoder_output = self.dropout1(encoder_output, training)
# 多层编码器
for i in range(self.num_layers):
encoder_output = self.encoders[i](encoder_output, encoder_padding_mask, training)
# (batch_size, seq_len)=>(batch_size, seq_len, d_model)
decoder_output = self.embedding2(decoder_input)
# tf.shape(encoder_input)[1]表示解码器的输入句子长度
decoder_output += self.position_embedding2[:, :tf.shape(decoder_input)[1], :]
decoder_output = self.dropout2(decoder_output, training)
# 多层解码器
attention_weights = {}
for i in range(self.num_layers):
decoder_output, att1, att2 = self.decoders[i](decoder_output, encoder_output, decoder_look_ahead_mask,
decoder_padding_mask, training)
attention_weights['decoder_layer{}_att1'.format(i + 1)] = att1
attention_weights['decoder_layer{}_att2'.format(i + 1)] = att2
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, target_vocab_size)
output = self.dense(decoder_output)
return output, attention_weights
测试代码
下面是一个用Transformer实现了从葡萄牙语翻译成英语的例子:
import time
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
# 构造数据集
# 获取数据集字典(包含训练、验证、测试数据集)以及元数据
examples, metadata = tfds.load('ted_hrlr_translate/pt_to_en', with_info=True, as_supervised=True)
# 获取训练、验证、测试数据集
train_examples, val_examples, test_examples = examples["train"], examples["validation"], examples["test"]
# 构建key-value词典(分词器)
tokenizer_en = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus((en.numpy() for pt, en in train_examples),
target_vocab_size=2 ** 13)
tokenizer_pt = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus((pt.numpy() for pt, en in train_examples),
target_vocab_size=2 ** 13)
# # 测试代码
# sample_str = 'hello world, tensorflow 2'
# tokenized_str = tokenizer_en.encode(sample_str)
# print(tokenized_str)
# original_str = tokenizer_en.decode(tokenized_str)
# print(original_str)
# 数据集预处理
# 添加start、end标记,用vocab_size、vocab_size+1表示
# 输入pt、en为句子,输出为带有首尾的数字标记,都是Tensor张量类型
def encode(pt, en):
pt = [tokenizer_pt.vocab_size] + tokenizer_pt.encode(pt.numpy()) + [tokenizer_pt.vocab_size + 1]
en = [tokenizer_en.vocab_size] + tokenizer_en.encode(en.numpy()) + [tokenizer_en.vocab_size + 1]
return pt, en
# 调用py_function函数,将结果转换成Tensor张量
def tf_encode(pt, en):
return tf.py_function(encode, [pt, en], [tf.int32, tf.int32])
# 过滤掉长度超过40的句子
def filter_sentence(x, y, max_length=40):
return tf.logical_and(tf.size(x) <= max_length, tf.size(y) <= max_length)
# 训练集
train_dataset = train_examples.map(tf_encode)
train_dataset = train_dataset.filter(filter_sentence)
# 从缓存中加速读取
train_dataset = train_dataset.cache()
# 对于长度没有达到40的张量进行填充,输出shape=(64, 40)
train_dataset = train_dataset.padded_batch(64, padded_shapes=([40], [40]))
train_dataset = train_dataset.prefetch(tf.data.experimental.AUTOTUNE)
# 验证集
validate_dataset = val_examples.map(tf_encode)
validate_dataset = validate_dataset.filter(filter_sentence)
validate_dataset = validate_dataset.padded_batch(64, padded_shapes=([40], [40]))
# 构造模型
# 位置编码
# 计算公式:
# PE(pos,2i) = sin(pos/10000^(2i/d_model))
# PE(pos,2i+1) = cos(pos/10000^(2i/d_model))
# pos为单词在句子中的位置,i为单词所映射的特征向量的第i个元素,d_model表示每一个单词映射的向量维数
# 编码中的奇数位用cos来编码,偶数位用sin来编码
def get_positional_encoding(sentence_length, d_model):
pos = np.expand_dims(np.arange(sentence_length), axis=-1)
i = np.expand_dims(np.arange(d_model), axis=0)
result = pos * np.power(10000, np.float32(2 * (i // 2) / d_model))
# 使用sin函数计算偶数列的向量元素
result_sin = np.sin(result[:, 0::2])
# 使用cos函数计算奇数列的向量元素
result_cos = np.cos(result[:, 1::2])
# 将偶数列的奇数列的元素连接在一起
position_embedding = np.concatenate((result_sin, result_cos), axis=-1)
# 增加数组的维数,最终形状为(1, sentence_length, d_model)
position_embedding = np.expand_dims(position_embedding, axis=0)
# 将numpy数组转换成tensorflow张量形式
return tf.cast(position_embedding, tf.float32)
# 缩放点积注意力层
# 计算公式:
# attention(q,k,v)=soft(q*kT/sqrt(dk)+mask)*v
# 输入矩阵形状:
# q(..., seq_len_q, k_dim)
# k(..., seq_len_k, k_dim)
# v(..., seq_len_v, v_dim)
# mask(..., seq_len_q, seq_len_k)
# 输出矩阵形状
# attention(..., seq_len_q, v_dim)
# attention_weights(..., seq_len_q, seq_len_k)
def scaled_dot_product_attention(q, k, v, mask):
# q*kT
matmul_qk = tf.matmul(q, k, transpose_b=True)
# 使用dk进行缩放,dk=k_dim
dk = tf.shape(k)[-1]
scaled_attention = matmul_qk / tf.sqrt(tf.cast(dk, tf.float32))
# 添加掩码
if mask is not None:
scaled_attention += (mask * -1e9)
# 获取attention weights矩阵
attention_weights = tf.nn.softmax(scaled_attention, axis=-1)
# 获取attention矩阵
attention = tf.matmul(attention_weights, v)
return attention, attention_weights
# # 测试代码
# def print_out(q, k, v):
# temp_out, temp_att = scaled_dot_product_attention(
# q, k, v, None)
# print('attention weight:')
# print(temp_att)
# print('output:')
# print(temp_out)
#
#
# # 显示为numpy类型
# np.set_printoptions(suppress=True)
# temp_k = tf.constant([[10, 0, 0],
# [0, 10, 0],
# [0, 0, 10],
# [0, 0, 10]], dtype=tf.float32) # (4, 3)
# temp_v = tf.constant([[1, 0],
# [10, 0],
# [100, 5],
# [1000, 6]], dtype=tf.float32) # (4, 3)
# # 关注第2个key, 返回对应的value
# temp_q = tf.constant([[0, 10, 0]], dtype=tf.float32)
# print_out(temp_q, temp_k, temp_v)
# 多头注意力层
class MultiHeadAttention(tf.keras.layers.Layer):
# d_model为词向量的维数
# num_heads为头的数量,也就是num_heads个q,k,v个矩阵
# seq_len为句子的长度
def __init__(self, num_heads, d_model):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
# k_dim = dk
self.k_dim = d_model // num_heads
# 全连接层
self.wq = tf.keras.layers.Dense(d_model)
self.wk = tf.keras.layers.Dense(d_model)
self.wv = tf.keras.layers.Dense(d_model)
# 全连接层
self.dense = tf.keras.layers.Dense(d_model)
# 分离出多个头
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.k_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
# 输入矩阵形状:
# q(..., seq_len_q, k_dim)
# k(..., seq_len_k, k_dim)
# v(..., seq_len_v, v_dim)
# mask(..., seq_len, seq_len)
# 输出矩阵形状
# attention(..., seq_len_q, v_dim)
# attention_weights(..., seq_len_q, seq_len_k)
def __call__(self, q, k, v, mask):
batch_size = tf.shape(q)[0]
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
x_q = self.wq(q)
x_k = self.wk(k)
x_v = self.wv(v)
#
# (batch_size, seq_len, d_model)=>(batch_size, num_heads, seq_len, k_dim)
q = self.split_heads(x_q, batch_size)
k = self.split_heads(x_k, batch_size)
v = self.split_heads(x_v, batch_size)
# attention:(batch_size, num_heads, seq_len, k_dim)
# attention_weights:(batch_size, num_heads, seq_len, seq_len)
attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
# (batch_size, num_heads, seq_len, k_dim)=>(batch_size, seq_len, num_heads, k_dim)
attention = tf.transpose(attention, perm=[0, 2, 1, 3])
# (batch_size, seq_len, num_heads, k_dim)=>(batch_size, seq_len, d_model)
attention = tf.reshape(attention, (batch_size, -1, self.d_model))
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
attention = self.dense(attention)
return attention, attention_weights
# # 测试代码
# temp_mha = MultiHeadAttention(d_model=512, num_heads=8, seq_len=60)
# y = tf.random.uniform((1, 60, 512))
# mask = tf.random.uniform((1, 8, 60, 60))
# output, att = temp_mha(y, y, y, mask=mask)
# print(output.shape, att.shape)
# 前馈神经网络层
class FeedForwardNetwork(tf.keras.layers.Layer):
def __init__(self, d_model, d_ff):
super(FeedForwardNetwork, self).__init__()
self.dense1 = tf.keras.layers.Dense(d_ff, activation="relu")
self.dense2 = tf.keras.layers.Dense(d_model)
def __call__(self, x):
output = self.dense1(x)
output = self.dense2(output)
return output
# # 测试代码
# sample_fnn = FeedForwardNetwork(512, 2048)
# print(sample_fnn(tf.random.uniform((64, 50, 512))).shape)
# 编码器层
class Encoder(tf.keras.layers.Layer):
def __init__(self, num_heads, d_model, d_ff, rate=0.1):
super(Encoder, self).__init__()
# 多头注意力层
self.mha = MultiHeadAttention(num_heads, d_model)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.ln1 = tf.keras.layers.LayerNormalization()
# 前馈网络层
self.ffn = FeedForwardNetwork(d_model, d_ff)
self.dropout2 = tf.keras.layers.Dropout(rate)
self.ln2 = tf.keras.layers.LayerNormalization()
def __call__(self, x, padding_mask, training):
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
mha_output, _ = self.mha(x, x, x, padding_mask)
dropout_output1 = self.dropout1(mha_output, training=training)
ln_output1 = self.ln1(x + dropout_output1)
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
ffn_output = self.ffn(ln_output1)
dropout_output2 = self.dropout2(ffn_output, training=training)
ln_output2 = self.ln2(ln_output1 + dropout_output2)
return ln_output2
# # 测试代码
# sample_encoder_layer = Encoder(8, 512, 2048)
# sample_encoder_layer_output = sample_encoder_layer(tf.random.uniform((64, 50, 512)), None, False)
# # print(sample_encoder_layer_output.shape)
# 解码器层
class Decoder(tf.keras.layers.Layer):
def __init__(self, num_heads, d_model, d_ff, rate=0.1):
super(Decoder, self).__init__()
self.mha1 = MultiHeadAttention(num_heads, d_model)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.ln1 = tf.keras.layers.LayerNormalization()
self.mha2 = MultiHeadAttention(num_heads, d_model)
self.dropout2 = tf.keras.layers.Dropout(rate)
self.ln2 = tf.keras.layers.LayerNormalization()
self.ffn = FeedForwardNetwork(d_model, d_ff)
self.dropout3 = tf.keras.layers.Dropout(rate)
self.ln3 = tf.keras.layers.LayerNormalization()
# encoder_output为编码器的输出,形状为(batch_size, seq_len, d_model)
def __call__(self, x, encoder_output, look_ahead_mask, padding_mask, training):
mha_output1, attention_weights1 = self.mha1(x, x, x, look_ahead_mask)
dropout_output1 = self.dropout1(mha_output1, training=training)
ln_output1 = self.ln1(x + dropout_output1)
mha_output2, attention_weights2 = self.mha2(ln_output1, encoder_output, encoder_output, padding_mask)
dropout_output2 = self.dropout2(mha_output2, training=training)
ln_output2 = self.ln2(ln_output1 + dropout_output2)
ffn_output = self.ffn(ln_output2)
dropout_output3 = self.dropout3(ffn_output, training=training)
ln_output3 = self.ln3(ln_output2 + dropout_output3)
return ln_output3, attention_weights1, attention_weights2
# # 测试代码
# sample_decoder_layer = Decoder(8, 512, 2048)
# sample_decoder_layer_output, _, _ = sample_decoder_layer(
# tf.random.uniform((64, 60, 512)), sample_encoder_layer_output, None, None, False)
# print(sample_decoder_layer_output.shape)
class Transformer(tf.keras.Model):
# input_vocab_size表示需要翻译的语言所有句子的词汇表的大小
# target_vocab_size表示翻译成目标语言的所有句子的词汇表的大小
# num_heads表示头的数量,也就是num_heads个q,k,v个矩阵
# d_model表示每一个单词映射的向量维数
# d_ff为前馈神经网络层第一个dense层单词所映射的向量维数
# max_seq_len表示最大句子的长度
# rate为丢弃层丢弃率
# num_layers表示编码器和解码器的数量
def __init__(self, input_vocab_size, target_vocab_size, num_heads,
max_seq_len, d_model, d_ff, num_layers, rate=0.1):
super(Transformer, self).__init__()
self.input_vocab_size = input_vocab_size
self.num_heads = num_heads
self.num_layers = num_layers
# 嵌入层
self.embedding1 = tf.keras.layers.Embedding(input_vocab_size, d_model)
# 位置编码
self.position_embedding1 = get_positional_encoding(max_seq_len, d_model)
# 丢弃层
self.dropout1 = tf.keras.layers.Dropout(rate)
# # 单层编码器
# self.encoder = Encoder(num_heads, d_model, d_ff, rate)
# 编码器
self.encoders = [Encoder(num_heads, d_model, d_ff, rate) for _ in range(num_layers)]
# 嵌入层
self.embedding2 = tf.keras.layers.Embedding(target_vocab_size, d_model)
# 位置编码
self.position_embedding2 = get_positional_encoding(max_seq_len, d_model)
# 丢弃层
self.dropout2 = tf.keras.layers.Dropout(rate)
# # 单层解码器
# self.decoder = Decoder(num_heads, d_model, d_ff, rate)
# 解码器
self.decoders = [Decoder(num_heads, d_model, d_ff, rate) for _ in range(num_layers)]
# 全连接层
self.dense = tf.keras.layers.Dense(target_vocab_size)
# 数据流向
# encoder_input=>(embedding, dropout, encoders)=>encoder_output
# decoder_input=>(embedding, dropout, encoder_output=>(decoders))=>decoder_output
# decoder_output=>(dense)=>output
def __call__(self, encoder_input, decoder_input, encoder_padding_mask,
decoder_look_ahead_mask, decoder_padding_mask, training):
# (batch_size, seq_len)=>(batch_size, seq_len, d_model)
encoder_output = self.embedding1(encoder_input)
# tf.shape(encoder_input)[1]表示编码器的输入句子长度
# (batch_size, seq_len, d_model)和(1, seq_len, d_model)相加
# 相当于batch中的每一个(seq_len, d_model)形状的矩阵与另外一个(seq_len, d_model)形状的矩阵相加
encoder_output += self.position_embedding1[:, :tf.shape(encoder_input)[1], :]
encoder_output = self.dropout1(encoder_output, training)
# 多层编码器
for i in range(self.num_layers):
encoder_output = self.encoders[i](encoder_output, encoder_padding_mask, training)
# (batch_size, seq_len)=>(batch_size, seq_len, d_model)
decoder_output = self.embedding2(decoder_input)
# tf.shape(encoder_input)[1]表示解码器的输入句子长度
decoder_output += self.position_embedding2[:, :tf.shape(decoder_input)[1], :]
decoder_output = self.dropout2(decoder_output, training)
# 多层解码器
attention_weights = {}
for i in range(self.num_layers):
decoder_output, att1, att2 = self.decoders[i](decoder_output, encoder_output, decoder_look_ahead_mask,
decoder_padding_mask, training)
attention_weights['decoder_layer{}_att1'.format(i + 1)] = att1
attention_weights['decoder_layer{}_att2'.format(i + 1)] = att2
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, target_vocab_size)
output = self.dense(decoder_output)
return output, attention_weights
# # 测试代码
# sample_transformer = Transformer(
# num_layers=2, d_model=512, num_heads=8, d_ff=1024,
# input_vocab_size=8500, target_vocab_size=8000, max_seq_len=120
# )
# temp_input = tf.random.uniform((64, 62))
# temp_target = tf.random.uniform((64, 26))
# fn_out, _ = sample_transformer(temp_input, temp_target, training=False,
# encoder_padding_mask=None,
# decoder_look_ahead_mask=None,
# decoder_padding_mask=None,
# )
# print(fn_out.shape)
# 编译模型
transformer_model = Transformer(input_vocab_size=tokenizer_pt.vocab_size + 2,
target_vocab_size=tokenizer_en.vocab_size + 2,
num_heads=4,
max_seq_len=40,
d_model=128,
d_ff=512,
num_layers=4)
class CustomizedSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warm_up_steps=4000):
super(CustomizedSchedule, self).__init__()
self.d_model = tf.cast(d_model, tf.float32)
self.warm_up_steps = warm_up_steps
def __call__(self, step):
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warm_up_steps ** (-1.5))
arg3 = tf.math.rsqrt(self.d_model)
return arg3 * tf.math.minimum(arg1, arg2)
learning_rate = CustomizedSchedule(128) # 模型越大,lr调整越小,不宜采用过高的lr
optimizer = tf.optimizers.Adam(learning_rate=learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
# 损失函数的计算方法
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True, reduction="none")
def loss_func(y_true, y_pre):
loss_ = loss(y_true, y_pre)
mask = tf.math.logical_not(tf.math.equal(y_true, 0))
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
return tf.reduce_mean(loss_)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
# 训练模型
# 创建填充掩码
def create_padding_mark(seq):
# 获取为0的padding项
seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
# 扩充维度以便用于attention矩阵
return seq[:, np.newaxis, np.newaxis, :] # (batch_size,1,1,seq_len)
def create_look_ahead_mark(size):
# 1 - 对角线和取下三角的全部对角线(-1->全部)
# 这样就可以构造出每个时刻未预测token的掩码
mark = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
return mark # (seq_len, seq_len)
# 构建掩码
def create_mask(inputs, targets):
encoder_padding_mask = create_padding_mark(inputs)
# 这个掩码用于掩输入解码层第二层的编码层输出
decoder_padding_mask = create_padding_mark(inputs)
# look_ahead 掩码, 掩掉未预测的词
look_ahead_mask = create_look_ahead_mark(tf.shape(targets)[1])
# 解码层第一层得到padding掩码
decode_targets_padding_mask = create_padding_mark(targets)
# 合并解码层第一层掩码
combine_mask = tf.maximum(decode_targets_padding_mask, look_ahead_mask)
return encoder_padding_mask, combine_mask, decoder_padding_mask
@tf.function
def train_step(inputs, targets):
tar_inp = targets[:, :-1]
tar_real = targets[:, 1:]
# 构造掩码
encoder_padding_mask, combined_mask, decoder_padding_mask = create_mask(inputs, tar_inp)
with tf.GradientTape() as tape:
predictions, _ = transformer_model(inputs,
tar_inp,
encoder_padding_mask,
combined_mask,
decoder_padding_mask,
True)
loss_ = loss_func(tar_real, predictions)
# 求梯度
gradients = tape.gradient(loss_, transformer_model.trainable_variables)
# 反向传播
optimizer.apply_gradients(zip(gradients, transformer_model.trainable_variables))
# 记录loss和准确率
train_loss(loss_)
train_accuracy(tar_real, predictions)
def train():
for epoch in range(20):
start = time.time()
# 重置记录项
train_loss.reset_states()
train_accuracy.reset_states()
# inputs 葡萄牙语, targets英语
for batch, (inputs, targets) in enumerate(train_dataset):
# 训练
train_step(inputs, targets)
if batch % 500 == 0:
print('epoch {}, batch {}, loss:{:.4f}, acc:{:.4f}'.format(
epoch + 1, batch, train_loss.result(), train_accuracy.result()
))
print('epoch {}, loss:{:.4f}, acc:{:.4f}'.format(
epoch + 1, train_loss.result(), train_accuracy.result()
))
print('time in 1 epoch:{} secs\n'.format(time.time() - start))
# 模型预测
def evaluate(inputs):
inputs_id = [tokenizer_pt.vocab_size] + tokenizer_pt.encode(inputs) + [tokenizer_pt.vocab_size + 1]
# 编码器的输入,为需要翻译的句子
encoder_inputs = tf.expand_dims(inputs_id, axis=0) # (1, input_length)
# 解码器的输入,初始只有一个开始标识
decoder_inputs = tf.expand_dims([tokenizer_en.vocab_size], axis=0) # (1, 1)
# 句子长度最大为40,每个循环翻译出一个单词
attention_weights = {}
for i in range(40):
encoder_padding_mask, decoder_look_ahead_mask, decoder_padding_mask = create_mask(encoder_inputs,
decoder_inputs)
predictions, attention_weights = transformer_model(encoder_inputs,
decoder_inputs,
encoder_padding_mask,
decoder_look_ahead_mask,
decoder_padding_mask,
False)
# 取出要预测的单词
predictions = predictions[:, -1, :]
# 获取单词对应的id
predictions_id = tf.cast(tf.argmax(predictions, axis=-1), dtype=tf.int32)
# 如果出现了结束标记
if tf.equal(predictions_id, tokenizer_en.vocab_size + 1):
decoder_inputs = tf.concat([decoder_inputs, [predictions_id]], axis=-1)
return tf.squeeze(decoder_inputs, axis=0), attention_weights
# 连接该单词组成新的输入
decoder_inputs = tf.concat([decoder_inputs, [predictions_id]], axis=-1)
# 输出翻译后的句子
return tf.squeeze(decoder_inputs, axis=0), attention_weights
# 测试代码
train()
input_ = "tinham comido peixe com batatas fritas ?" # did they eat fish and chips ?
output_, _ = evaluate(input_)
print(input_)
print(output_)
print(tokenizer_en.decode(output_[1: -1]))
|