注意力提示
自主性和非自主性的注意力提示解释了人类的注意力提示解释了人类的注意力方式。在注意力机制的背景下,我们将自主性提示称为”查询“,注意力机制通过注意力汇聚将选择引导至中间特征表示(值),每个值都都与一个键(key)配对,称为中间特征表示的非自主提示,设计注意力汇聚,给定的查询(自主性提示)可以和值(非自主性)进行匹配,得到最匹配的值。 注意力汇聚得到的是加权平均的总和值,其中权重是在给定的查询和不同键之间计算得出的。
注意力的可视化
import torch
from d2l import torch as d2l
#@save
# metrices的形状(要现实的行数,要现实的列数,查询的数据,键的数目)
def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),
cmap='Reds'):
"""显示矩阵热图"""
d2l.use_svg_display()
num_rows, num_cols = matrices.shape[0], matrices.shape[1]
fig, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize,
sharex=True, sharey=True, squeeze=False)
for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)):
for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)):
pcm = ax.imshow(matrix.detach().numpy(), cmap=cmap)
if i == num_rows - 1:
ax.set_xlabel(xlabel)
if j == 0:
ax.set_ylabel(ylabel)
if titles:
ax.set_title(titles[j])
fig.colorbar(pcm, ax=axes, shrink=0.6);
d2l.plt.show()
# torch.eye()生成一个单位矩阵
# 使用一个简单的例子,仅当查询和键相同时,注意力权重为1,否则为0
attention_weights = torch.eye(10).reshape((1, 1, 10, 10))
print(attention_weights)
show_heatmaps(attention_weights, xlabel='Keys', ylabel='Queries')
运行结果:
tensor([[[[1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
[0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
[0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
[0., 0., 0., 0., 0., 0., 0., 0., 1., 0.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 1.]]]])
注意力汇聚
查询和键之间的交互形成了注意力汇聚,注意力汇聚有选择的聚合了值以生成最终的输出。1964年提出的和回归模型是一个简单但完整的例子,可以用于演示具有注意力机制的机器学习。 给定成对的数据集{(x1,y1),…,(xn,yn)},由人工数据集获得
y
i
=
2
s
i
n
(
x
i
)
+
(
x
i
)
0.8
+
ε
y_i=2sin(x_i)+(x_i)^{0.8}+\varepsilon
yi?=2sin(xi?)+(xi?)0.8+ε
ε
\varepsilon
ε服从均值为0和标准差为0.5的正态分布,生成了50个训练样本和50个测试样本,为了更好的可视化注意力模式,需要对训练样本进行排序。
import torch
from torch import nn
from d2l import torch as d2l
n_train =50 #训练样本数
x_train,_=torch.sort(torch.rand(n_train)*5) #排序后的训练样本
def f(x):
return 2*torch.sin(x)+x**0.8
y_train = f(x_train)+torch.normal(0.0,0.5,(n_train,)) # 训练样本的输出
x_test = torch.arange(0,5,0.1) #测试样本
y_truth = f(x_test)
n_test = len(x_test)
平均汇聚
f
(
x
)
=
1
n
∑
i
=
1
n
y
i
f(x) = \frac{1}{n}\sum_{i=1}^{n}y_i
f(x)=n1?i=1∑n?yi?
# 绘制所有的训练样本,不带噪声项的真实数据标记为truth,学习到的预测函数标记为pred
def plot_kernel_reg(y_hat):
d2l.plot(x_test, [y_truth, y_hat], 'x', 'y', legend=['Truth', 'Pred'],
xlim=[0, 5], ylim=[-1, 5])
d2l.plt.plot(x_train, y_train, 'o', alpha=0.5);
d2l.plt.show()
# 基于平均汇聚来计算所有训练样本输出值的平均值
# repeat_interleave(inputs,repeats)重复张量的元素,返回一个张量
y_hat = torch.repeat_interleave(y_train.mean(),n_test)
plot_kernel_reg(y_hat)
非参数注意力汇聚
f
(
x
)
=
∑
i
=
1
n
K
(
x
?
x
i
)
∑
j
=
1
n
K
(
x
?
x
j
)
y
i
f(x)=\sum_{i=1}^{n}\frac{K(x-x_i)}{\sum_{j=1}^{n}K(x-x_j)}y_i
f(x)=i=1∑n?∑j=1n?K(x?xj?)K(x?xi?)?yi? 上述为核回归,转换成一个更加通用的注意力机制
f
(
x
)
=
∑
i
1
n
α
(
x
,
x
i
)
y
i
f(x)=\sum_{i_1}^{n}\alpha(x,x_i)y_i
f(x)=i1?∑n?α(x,xi?)yi?
(
x
i
,
y
i
)
(x_i,y_i)
(xi?,yi?)是键值对,注意力汇聚是
y
i
y_i
yi?的加权平均,将查询
x
x
x和
x
i
x_i
xi?之间的关系建模为注意力权重
α
(
x
,
x
i
)
\alpha(x,x_i)
α(x,xi?),这个权重将被重新分配每一个对应值
y
i
y_i
yi?,对于任何查询,模型在所有键值对注意力权重都是一个有效的概率分布:它们是非负的,并且总和为1. 如果一个键
x
i
x_i
xi?越是接近给定的查询
x
x
x,分配给这个键对应值的
y
i
y_i
yi?的注意力权重也就越大,也就获得了更多的注意力。
# x_repeat的形状(n_test,n_train)
# 每一行都包含着相同的测试输出
X_repeat = x_test.repeat_interleave(n_train).reshape((-1,n_train))
# x_train包含着键,attention_weight的形状为(n_test,n_train)
# 每一行都包含着要在给定的每个查询的值y_train之间分配的注意力权重
attention_weights = nn.functional.softmax(-(X_repeat-x_train)**2/2,dim=1)
# y_hat的每个元素都是值的加权平均值,其中权重是注意力权重
y_hat = torch.matmul(attention_weights,y_train)
plot_kernel_reg(y_hat)
d2l.show_heatmaps(attention_weights.unsqueeze(0).unsqueeze(0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')
d2l.plt.show()
带参数注意力汇聚
假设第一个小批量数据包含n个矩阵
X
1
,
X
2
,
.
.
.
.
,
X
n
X_1,X_2,....,X_n
X1?,X2?,....,Xn?,形状为
a
?
b
a*b
a?b,第二个小批量包含n个矩阵
Y
1
,
Y
2
,
.
.
.
.
.
.
Y
n
Y_1,Y_2,......Y_n
Y1?,Y2?,......Yn?,形状为
b
?
c
b*c
b?c.他的批量矩阵乘法得到n个矩阵
X
1
Y
1
,
X
2
Y
2
,
.
.
.
.
.
.
X
n
Y
n
X_1Y_1,X_2Y_2,......X_nY_n
X1?Y1?,X2?Y2?,......Xn?Yn?,形状为
a
?
c
a*c
a?c。假定两个张量的形状分别是
(
n
,
a
,
b
)
(n,a,b)
(n,a,b)和
(
n
,
b
,
c
)
(n,b,c)
(n,b,c),输出的形状为
(
n
,
a
,
c
)
(n,a,c)
(n,a,c)。
# 使用小批量矩阵乘法来计算小批量数据中的加权平均值
weights = torch.ones((2,10))*0.1
print(weights)
values = torch.arange(20.0).reshape((2,10))
print(values)
print(weights.unsqueeze(1))
print(weights.unsqueeze(1).size())
print(values.unsqueeze(-1))
print(values.unsqueeze(-1).size())
# unsqueeze是给tensor增加维度的 -1添加到最后一个维度
# bmm计算两个tensor的矩阵乘法
print(torch.bmm(weights.unsqueeze(1),values.unsqueeze(-1)))
运行结果:
tensor([[0.1000, 0.1000, 0.1000, 0.1000, 0.1000, 0.1000, 0.1000, 0.1000, 0.1000,
0.1000],
[0.1000, 0.1000, 0.1000, 0.1000, 0.1000, 0.1000, 0.1000, 0.1000, 0.1000,
0.1000]])
tensor([[ 0., 1., 2., 3., 4., 5., 6., 7., 8., 9.],
[10., 11., 12., 13., 14., 15., 16., 17., 18., 19.]])
torch.Size([2, 1, 10])
torch.Size([2, 10, 1])
tensor([[[ 4.5000]],
[[14.5000]]])
import torch
from torch import nn
from d2l import torch as d2l
n_train =50 #训练样本数
x_train,_=torch.sort(torch.rand(n_train)*5) #排序后的训练样本
print("x_train:",x_train )
def f(x):
return 2*torch.sin(x)+x**0.8
y_train = f(x_train)+torch.normal(0.0,0.5,(n_train,)) # 训练样本的输出
print("y_train:",y_train)
x_test = torch.arange(0,5,0.1) #测试样本
print("x_test:",x_test)
y_truth = f(x_test)
print("y_truth:",y_truth)
n_test = len(x_test)
# 绘制所有的训练样本,不带噪声项的真实数据标记为truth,学习到的预测函数标记为pred
def plot_kernel_reg(y_hat):
d2l.plot(x_test, [y_truth, y_hat], 'x', 'y', legend=['Truth', 'Pred'],
xlim=[0, 5], ylim=[-1, 5])
d2l.plt.plot(x_train, y_train, 'o', alpha=0.5);
d2l.plt.show()
# x_repeat的形状(n_test,n_train)
# 每一行都包含着相同的测试输入(例:同样的查询)
X_repeat = x_test.repeat_interleave(n_train).reshape((-1,n_train))
print("x_repeat:",X_repeat)
# 定义模型
class NWKernalRegression(nn.Module):
def __init__(self,**kwargs):
super().__init__(**kwargs)
# 参数初始化
self.w = nn.Parameter(torch.rand(1,),requires_grad=True)
def forward(self,queries,keys,values):
# queriers和attention_weights的形状为(查询个数,”键值对个数“)
queries = queries.repeat_interleave(keys.shape[1]).reshape((-1,keys.shape[1]))
print("quires:",queries,queries.size())
self.attention_weights = nn.functional.softmax(-((queries-keys)*self.w)**2/2,dim=1)
print("attention_weight:",self.attention_weights,self.attention_weights.size())
# values的形状为(查询个数,”键-值“对个数)
return torch.bmm(self.attention_weights.unsqueeze(1),
values.unsqueeze(-1)).reshape(-1)
# 将训练数据集变换为键-值用于训练注意力模型,在带参数的注意力汇聚模型中,
# 任何一个训练样本的输入都会和除自己以外的所有训练样本的键-值对进行计算,从而得到预测的输出
# x_title的形状为(n_train,n_test)每一行都包含着相同的训练输入
X_title = x_train.repeat((n_train,1))
print("x_title:",X_title)
# Y_title的形状为(n_train,n_train)每一行都包含着相同的训练输出
Y_title = y_train.repeat((n_train,1))
print("y_title:",Y_title)
# keys的形状为(n_train,n_train-1)
keys = X_title[(1-torch.eye(n_train)).type(torch.bool)].reshape((n_train,-1))
print("keys:",keys,keys.size())
# values的形状为(查询的个数和键-值对的个数)
values = Y_title[(1-torch.eye(n_train)).type(torch.bool)].reshape((n_train,-1))
print("values:",values,keys.size())
# 训练注意力汇聚模型
net = NWKernalRegression()
loss = nn.MSELoss()
trainer = torch.optim.SGD(net.parameters(),lr=0.5)
animator = d2l.Animator(xlabel='epoch',ylabel='loss',xlim=[1,5])
for epoch in range(5):
trainer.zero_grad()
y_hat=net(x_train,keys,values)
print("y_hat:",y_hat)
l = loss(y_hat,y_train)
l.sum().backward()
trainer.step()
print(f'epoch{epoch+1},loss{float(l.sum()):.6f}')
animator.add(epoch+1,float(l.sum()))
# keys的形状:(n_test,n_train),每一行包含着相同的训练输入(例如,相同的键)
keys = x_train.repeat((n_test, 1))
# value的形状:(n_test,n_train)
values = y_train.repeat((n_test, 1))
y_hat = net(x_test, keys, values).unsqueeze(1).detach()
plot_kernel_reg(y_hat)
d2l.plt.show()
d2l.show_heatmaps(net.attention_weights.unsqueeze(0).unsqueeze(0),
xlabel='Sorted training inputs',
ylabel='Sorted testing inputs')
d2l.plt.show()
运行结果:
ssh:
x_train: tensor([0.1983, 0.3003, 0.3372, 0.5042, 0.5287, 0.6151, 0.6587, 0.7610, 0.7934,
0.8153, 0.8815, 0.9094, 0.9100, 1.0540, 1.0883, 1.2154, 1.2212, 1.3882,
1.4241, 1.6184, 1.8055, 1.8171, 1.8389, 1.8682, 1.9756, 1.9760, 2.0217,
2.0919, 2.3083, 2.4956, 2.6010, 2.7502, 2.7820, 2.9685, 2.9838, 3.0017,
3.0213, 3.0642, 3.4218, 3.5900, 3.7580, 3.9802, 4.0082, 4.1155, 4.2845,
4.3037, 4.4370, 4.4597, 4.6968, 4.7213])
y_train: tensor([0.9865, 0.2737, 1.2464, 1.6648, 1.7726, 1.3423, 2.2108, 2.1374, 2.4340,
2.2951, 1.8302, 2.7708, 2.3294, 2.4843, 2.9969, 2.0623, 2.7401, 4.3678,
2.8606, 3.4338, 3.0798, 3.4558, 3.3164, 3.1875, 3.0468, 4.3134, 4.2747,
3.3277, 3.2211, 2.8200, 3.1799, 2.9567, 3.6772, 3.2616, 1.7372, 2.2806,
2.3665, 3.2968, 1.5537, 1.4779, 1.0978, 1.6817, 2.4996, 2.2633, 1.1986,
1.0452, 1.5530, 0.8664, 1.0779, 1.6598])
x_test: tensor([0.0000, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000,
0.9000, 1.0000, 1.1000, 1.2000, 1.3000, 1.4000, 1.5000, 1.6000, 1.7000,
1.8000, 1.9000, 2.0000, 2.1000, 2.2000, 2.3000, 2.4000, 2.5000, 2.6000,
2.7000, 2.8000, 2.9000, 3.0000, 3.1000, 3.2000, 3.3000, 3.4000, 3.5000,
3.6000, 3.7000, 3.8000, 3.9000, 4.0000, 4.1000, 4.2000, 4.3000, 4.4000,
4.5000, 4.6000, 4.7000, 4.8000, 4.9000])
y_truth: tensor([0.0000, 0.3582, 0.6733, 0.9727, 1.2593, 1.5332, 1.7938, 2.0402, 2.2712,
2.4858, 2.6829, 2.8616, 3.0211, 3.1607, 3.2798, 3.3782, 3.4556, 3.5122,
3.5481, 3.5637, 3.5597, 3.5368, 3.4960, 3.4385, 3.3654, 3.2783, 3.1787,
3.0683, 2.9489, 2.8223, 2.6905, 2.5554, 2.4191, 2.2835, 2.1508, 2.0227,
1.9013, 1.7885, 1.6858, 1.5951, 1.5178, 1.4554, 1.4089, 1.3797, 1.3684,
1.3759, 1.4027, 1.4490, 1.5151, 1.6009])
x_repeat: tensor([[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000],
[0.1000, 0.1000, 0.1000, ..., 0.1000, 0.1000, 0.1000],
[0.2000, 0.2000, 0.2000, ..., 0.2000, 0.2000, 0.2000],
...,
[4.7000, 4.7000, 4.7000, ..., 4.7000, 4.7000, 4.7000],
[4.8000, 4.8000, 4.8000, ..., 4.8000, 4.8000, 4.8000],
[4.9000, 4.9000, 4.9000, ..., 4.9000, 4.9000, 4.9000]])
x_title: tensor([[0.1983, 0.3003, 0.3372, ..., 4.4597, 4.6968, 4.7213],
[0.1983, 0.3003, 0.3372, ..., 4.4597, 4.6968, 4.7213],
[0.1983, 0.3003, 0.3372, ..., 4.4597, 4.6968, 4.7213],
...,
[0.1983, 0.3003, 0.3372, ..., 4.4597, 4.6968, 4.7213],
[0.1983, 0.3003, 0.3372, ..., 4.4597, 4.6968, 4.7213],
[0.1983, 0.3003, 0.3372, ..., 4.4597, 4.6968, 4.7213]])
y_title: tensor([[0.9865, 0.2737, 1.2464, ..., 0.8664, 1.0779, 1.6598],
[0.9865, 0.2737, 1.2464, ..., 0.8664, 1.0779, 1.6598],
[0.9865, 0.2737, 1.2464, ..., 0.8664, 1.0779, 1.6598],
...,
[0.9865, 0.2737, 1.2464, ..., 0.8664, 1.0779, 1.6598],
[0.9865, 0.2737, 1.2464, ..., 0.8664, 1.0779, 1.6598],
[0.9865, 0.2737, 1.2464, ..., 0.8664, 1.0779, 1.6598]])
keys: tensor([[0.3003, 0.3372, 0.5042, ..., 4.4597, 4.6968, 4.7213],
[0.1983, 0.3372, 0.5042, ..., 4.4597, 4.6968, 4.7213],
[0.1983, 0.3003, 0.5042, ..., 4.4597, 4.6968, 4.7213],
...,
[0.1983, 0.3003, 0.3372, ..., 4.4370, 4.6968, 4.7213],
[0.1983, 0.3003, 0.3372, ..., 4.4370, 4.4597, 4.7213],
[0.1983, 0.3003, 0.3372, ..., 4.4370, 4.4597, 4.6968]]) torch.Size([50, 49])
values: tensor([[0.2737, 1.2464, 1.6648, ..., 0.8664, 1.0779, 1.6598],
[0.9865, 1.2464, 1.6648, ..., 0.8664, 1.0779, 1.6598],
[0.9865, 0.2737, 1.6648, ..., 0.8664, 1.0779, 1.6598],
...,
[0.9865, 0.2737, 1.2464, ..., 1.5530, 1.0779, 1.6598],
[0.9865, 0.2737, 1.2464, ..., 1.5530, 0.8664, 1.6598],
[0.9865, 0.2737, 1.2464, ..., 1.5530, 0.8664, 1.0779]]) torch.Size([50, 49])
quires: tensor([[0.1983, 0.1983, 0.1983, ..., 0.1983, 0.1983, 0.1983],
[0.3003, 0.3003, 0.3003, ..., 0.3003, 0.3003, 0.3003],
[0.3372, 0.3372, 0.3372, ..., 0.3372, 0.3372, 0.3372],
...,
[4.4597, 4.4597, 4.4597, ..., 4.4597, 4.4597, 4.4597],
[4.6968, 4.6968, 4.6968, ..., 4.6968, 4.6968, 4.6968],
[4.7213, 4.7213, 4.7213, ..., 4.7213, 4.7213, 4.7213]]) torch.Size([50, 49])
attention_weight: tensor([[4.8726e-02, 4.8590e-02, 4.7473e-02, ..., 1.6540e-04, 8.6291e-05,
8.0517e-05],
[4.5988e-02, 4.6119e-02, 4.5541e-02, ..., 2.0429e-04, 1.0821e-04,
1.0113e-04],
[4.4965e-02, 4.5218e-02, 4.4844e-02, ..., 2.2042e-04, 1.1739e-04,
1.0977e-04],
...,
[2.1213e-04, 2.7761e-04, 3.0549e-04, ..., 6.2686e-02, 6.1602e-02,
6.1366e-02],
[1.2760e-04, 1.6954e-04, 1.8759e-04, ..., 7.0776e-02, 7.1027e-02,
7.2275e-02],
[1.2098e-04, 1.6099e-04, 1.7823e-04, ..., 7.1613e-02, 7.1892e-02,
7.3437e-02]], grad_fn=<SoftmaxBackward0>) torch.Size([50, 49])
y_hat: tensor([2.4165, 2.4758, 2.4408, 2.4680, 2.4703, 2.5112, 2.4889, 2.5197, 2.5174,
2.5285, 2.5629, 2.5362, 2.5523, 2.5832, 2.5739, 2.6349, 2.6135, 2.5948,
2.6507, 2.6635, 2.6957, 2.6847, 2.6908, 2.6970, 2.7067, 2.6663, 2.6685,
2.6990, 2.6921, 2.6828, 2.6523, 2.6271, 2.5939, 2.5547, 2.6055, 2.5799,
2.5705, 2.5216, 2.4544, 2.3884, 2.3345, 2.2121, 2.1589, 2.1238, 2.1121,
2.1134, 2.0317, 2.0661, 1.9708, 1.9202],
grad_fn=<ReshapeAliasBackward0>)
epoch1,loss0.703564
注意力评分函数
掩蔽softmax操作
softmax操作用于输出一个概率分布作为注意力权重。 在某些情况下,并非所有的值都应该被纳入到注意力汇聚中。为了仅将有意义的词元作为值来获取注意力汇聚, 我们可以指定一个有效序列长度(即词元的个数), 以便在计算softmax时过滤掉超出指定范围的位置,其中任何超出有效长度的位置都被掩蔽并置为0
import math
import torch
from torch import nn
from d2l import torch as d2l
'''
softmax操作用于输出一个概率分布作为注意力权重
并非所有的值都应该被纳入注意力汇聚中
指定一个有效序列长度(词元的个数),在计算softmax时候过滤掉超出范围的位置
'''
#@save
def masked_softmax(X, valid_lens):
"""通过在最后一个轴上掩蔽元素来执行softmax操作"""
# X:3D张量,valid_lens:1D或2D张量
if valid_lens is None: #不设置时,取全部值的softmax
return nn.functional.softmax(X, dim=-1)
else:
shape = X.shape #将shape保存下来,以便取用其中的行列的维度数,以及最终恢复原样
if valid_lens.dim() == 1: #当valid_lens为一维
#若x的维度为(2, 2, 4) 得到第二个维度的数值2,并将valid_lens复制2次,得到一个
valid_lens = torch.repeat_interleave(valid_lens, shape[1]) #经过这一步[2, 3]会变为[2, 2, 3, 3]
else:
valid_lens = valid_lens.reshape(-1) #直接将其变为一维
# 最后一轴上被掩蔽的元素使用一个非常大的负值替换,从而其softmax输出为0
#X.reshape(-1, shape[-1])将X展开为n行4列,n在这里为2*2,形状为(4, 4) 再对每一行进行2, 2, 3, 3的掩码操作
X = d2l.sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6) #得到的X是一个展开的二维张量
return nn.functional.softmax(X.reshape(shape), dim=-1)
a = masked_softmax(torch.rand(2, 2, 4), torch.tensor([2, 3]))
#输入:batch_size为2,每个batch为(2, 4) 遮蔽:第一个batch取前两个,第二个batch取前三个,其余值为0 再进行softmax
print(a)
b = masked_softmax(torch.rand(2, 2, 4), torch.tensor([[1, 3], [2, 4]]))
#遮蔽: [1, 3]表示第一个batch的第一个元素取第一列,第二个元素取前三列,[2, 4]表示第二个batch中第一个元素取前两列第二个元素取前四列,进行softmax
print(b)
运行结果:
tensor([[[0.4487, 0.5513, 0.0000, 0.0000],
[0.5038, 0.4962, 0.0000, 0.0000]],
[[0.4978, 0.2558, 0.2464, 0.0000],
[0.3831, 0.3293, 0.2875, 0.0000]]])
tensor([[[1.0000, 0.0000, 0.0000, 0.0000],
[0.2776, 0.4385, 0.2838, 0.0000]],
[[0.5788, 0.4212, 0.0000, 0.0000],
[0.2212, 0.3062, 0.2362, 0.2363]]])
加性注意力
class AdditiveAttention(nn.Module):
"""加性注意力"""
def __init__(self, key_size, query_size, num_hiddens, dropout, **kwargs):
super(AdditiveAttention, self).__init__(**kwargs)
# 输入k维输出h维
self.W_k = nn.Linear(key_size, num_hiddens, bias=False)
# 输入q维输出h维度
self.W_q = nn.Linear(query_size, num_hiddens, bias=False)
# 输入h维输出1维
self.w_v = nn.Linear(num_hiddens, 1, bias=False)
# 以P的概率进行正则化
self.dropout = nn.Dropout(dropout)
def forward(self, queries, keys, values, valid_lens):
# queries维度(bathc_size, q_num, h) keys维度(bathc_size, k_num, h)
queries, keys = self.W_q(queries), self.W_k(keys)
print("queries,keys:",queries.size(),keys.size())
# 在维度扩展后,方便求和
# queries的形状:(batch_size,查询的个数,1,num_hiddens)
# key的形状:(batch_size,1,“键-值”对的个数,num_hiddens)
# 使用广播方式进行求和
features = queries.unsqueeze(2) + keys.unsqueeze(1)
print("features:",features.size())
features = torch.tanh(features)
# self.w_v仅有一个输出,因此从形状中移除最后那个维度。
# scores的形状:(batch_size,查询的个数,“键-值”对的个数)
scores = self.w_v(features).squeeze(-1)
print("score:",scores.size())
self.attention_weights = masked_softmax(scores, valid_lens)
# values的形状:(batch_size,“键-值”对的个数,值的维度)
return torch.bmm(self.dropout(self.attention_weights), values)
#query的batch_size为2,1个query.query长度时20 key的batch_size为2,有10个key, key的长度是2
queries, keys = torch.normal(0, 1, (2, 1, 20)), torch.ones((2, 10, 2))
# values的小批量,两个值矩阵是相同的
# 有10个value,value的长度为2 进行一次复制变为(2, 10, 4)
values = torch.arange(40, dtype=torch.float32).reshape(1, 10, 4).repeat(
2, 1, 1)
valid_lens = torch.tensor([2, 6])
attention = AdditiveAttention(key_size=2, query_size=20, num_hiddens=8,
dropout=0.1)
attention.eval() # 开启评估模式
print(attention(queries, keys, values, valid_lens))
运行结果:
queries,keys: torch.Size([2, 1, 8]) torch.Size([2, 10, 8])
features: torch.Size([2, 1, 10, 8])
score: torch.Size([2, 1, 10])
tensor([[[ 2.0000, 3.0000, 4.0000, 5.0000]],
[[10.0000, 11.0000, 12.0000, 13.0000]]], grad_fn=<BmmBackward0>)
缩放点积注意力
# 缩放点积注意机制 要求查询和键具有相同的长度
#@save
class DotProductAttention(nn.Module):
"""缩放点积注意力"""
def __init__(self, dropout, **kwargs):
super(DotProductAttention, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
# queries的形状:(batch_size,查询的个数,d)
# keys的形状:(batch_size,“键-值”对的个数,d)
# values的形状:(batch_size,“键-值”对的个数,值的维度)
# valid_lens的形状:(batch_size,)或者(batch_size,查询的个数)
def forward(self, queries, keys, values, valid_lens=None):
d = queries.shape[-1]
# 设置transpose_b=True为了交换keys的最后两个维度
scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d)
print(scores.size())
self.attention_weights = masked_softmax(scores, valid_lens)
return torch.bmm(self.dropout(self.attention_weights), values)
# 令查询的特征的维度和键的特征的维度相同
queries = torch.normal(0, 1, (2, 1, 2))
attention = DotProductAttention(dropout=0.5)
attention.eval()
print(attention(queries, keys, values, valid_lens))
运行结果:
torch.Size([2, 1, 10])
tensor([[[ 2.0000, 3.0000, 4.0000, 5.0000]],
[[10.0000, 11.0000, 12.0000, 13.0000]]])
将注意力汇聚的输出计算可以作为值的加权平均,选择不同的注意力评分函数会带来不同的注意力汇聚操作。
Bahdanau注意力
在预测词元时,如果不是所有输入的词元都相关,模型将仅对齐输入序列中宇当前预测相关的部分,通过将上下文变量视为注意力集中的输出来实现的。 上下文的变量
c
c
c在任何解码的时间步
t
′
t'
t′都会被
c
t
′
c_{t'}
ct′?替换。假设输入序列中有
T
T
T个词元,解码时间步
t
′
t'
t′的上下文变量是注意力中的输出。
c
t
′
=
∑
t
=
1
T
α
(
s
t
′
?
1
,
h
t
)
h
t
c_{t'}=\sum_{t=1}^{T}\alpha(s_{t'-1},h_t)h_t
ct′?=t=1∑T?α(st′?1?,ht?)ht? 时间步
t
’
?
1
t’-1
t’?1时解码器的隐状态
s
t
′
?
1
s_{t'-1}
st′?1?是查询,编码器隐状态
h
t
h_t
ht?即是键也是值。 机器翻译中,每个生成的词可能相关于源句子中不同的词。 key,value是rnn中对每个词的输出 query是解码器对上一个词的输出
import torch
from torch import nn
from d2l import torch as d2l
# 实现循环神经网络编码器-解码器,只需要重新定义解码器即可
#@save
class AttentionDecoder(d2l.Decoder):
"""带有注意力机制解码器的基本接口"""
def __init__(self, **kwargs):
super(AttentionDecoder, self).__init__(**kwargs)
@property
def attention_weights(self):
raise NotImplementedError
'''
初始化解码器的状态,需要的输入:
1.编码器在所有时间步的最终层隐状态,将作为注意力的键和值
2.上一时间步的编码器全层隐状态,将作为初始化解码器的隐状态
3.编码器有效长度
4.注意力的输出和下一词的词嵌入合并进入RNN
'''
# 在每个解码时间步中,解码器上一个时间步的最终层隐状态将用作查询
# 将注意力输出和输入嵌入都连接为循环神经网络的输入
class Seq2SeqAttentionDecoder(AttentionDecoder):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,dropout=0, **kwargs):
super(Seq2SeqAttentionDecoder, self).__init__(**kwargs)
# 加性注意力机制
self.attention = d2l.AdditiveAttention(num_hiddens, num_hiddens, num_hiddens, dropout)
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers,dropout=dropout)
self.dense = nn.Linear(num_hiddens, vocab_size)
def init_state(self, enc_outputs, enc_valid_lens, *args):
# outputs的形状为(batch_size,num_steps,num_hiddens).
# hidden_state的形状为(num_layers,batch_size,num_hiddens)
outputs, hidden_state = enc_outputs
return (outputs.permute(1, 0, 2), hidden_state, enc_valid_lens)
def forward(self, X, state):
# enc_outputs的形状为(batch_size,num_steps,num_hiddens).
# hidden_state的形状为(num_layers,batch_size,
# num_hiddens)
enc_outputs, hidden_state, enc_valid_lens = state
# 输出X的形状为(num_steps,batch_size,embed_size)
X = self.embedding(X).permute(1, 0, 2)
outputs, self._attention_weights = [], []
for x in X:
# query的形状为(batch_size,1,num_hiddens)
# hidden_state[-1]是最后一层的输出
query = torch.unsqueeze(hidden_state[-1], dim=1)
# context的形状为(batch_size,1,num_hiddens)
context = self.attention(query, enc_outputs, enc_outputs, enc_valid_lens)
# 在特征维度上连结
x = torch.cat((context, torch.unsqueeze(x, dim=1)), dim=-1)
# 将x变形为(1,batch_size,embed_size+num_hiddens)
out, hidden_state = self.rnn(x.permute(1, 0, 2), hidden_state)
outputs.append(out)
self._attention_weights.append(self.attention.attention_weights)
# 全连接层变换后,outputs的形状为
# (num_steps,batch_size,vocab_size)
outputs = self.dense(torch.cat(outputs, dim=0))
return outputs.permute(1, 0, 2), [enc_outputs, hidden_state,enc_valid_lens]
@property
def attention_weights(self):
return self._attention_weights
# 使用包含7个时间步的4个序列输入的小批测试解码器
encoder = d2l.Seq2SeqEncoder(vocab_size=10,embed_size=8,num_hiddens=16,num_layers=2)
encoder.eval()
decoder = Seq2SeqAttentionDecoder(vocab_size=10,embed_size=8,num_hiddens=16,num_layers=2)
decoder.eval()
X = torch.zeros((4,7),dtype=torch.long)
state = decoder.init_state(encoder(X),None)
output,state = decoder(X,state)
print(output.shape,len(state),state[0].shape,len(state[1]),state[1][0].shape)
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 250, d2l.try_gpu()
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = d2l.Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers, dropout)
decoder = Seq2SeqAttentionDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout)
net = d2l.EncoderDecoder(encoder, decoder)
d2l.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation, dec_attention_weight_seq = d2l.predict_seq2seq(
net, eng, src_vocab, tgt_vocab, num_steps, device, True)
print(f'{eng} => {translation}, ',
f'bleu {d2l.bleu(translation, fra, k=2):.3f}')
attention_weights = torch.cat([step[0][0][0] for step in dec_attention_weight_seq], 0).reshape((1, 1, -1, num_steps))
# 加上一个包含序列结束词元
d2l.show_heatmaps(
attention_weights[:, :, :, :len(engs[-1].split()) + 1].cpu(),
xlabel='Key positions', ylabel='Query positions')
d2l.plt.show()
loss 0.019, 5176.1 tokens/sec on cuda:0 go . => va !, bleu 1.000 i lost . => j’ai perdu ., bleu 1.000 he’s calm . => il est paresseux ., bleu 0.658 i’m home . => je suis chez moi ., bleu 1.000
多头注意力机制
将h个注意力汇聚的输出拼接在一起,并且通过另一个可以学习的线性投影进行变换,以产生最后的输出,称为多头注意力,对于h个注意力汇聚输出,每一个注意力汇聚都被称为一个头。
import math
import torch
from torch import nn
from d2l import torch as d2l
'''
使用缩放点积注意力作为每一个注意力头
设定pq=pk=pv=po/h
将查询、键和值的线性变换的输出数量设置为pqh=pkh=pvh=po
并行计算h个头
'''
#@save
class MultiHeadAttention(nn.Module):
"""多头注意力"""
def __init__(self, key_size, query_size, value_size, num_hiddens,
num_heads, dropout, bias=False, **kwargs):
super(MultiHeadAttention, self).__init__(**kwargs)
self.num_heads = num_heads
# 不用学习w的权重
self.attention = d2l.DotProductAttention(dropout)
self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)
def forward(self, queries, keys, values, valid_lens):
# queries,keys,values的形状:(batch_size,查询或者“键-值”对的个数,num_hiddens)
# valid_lens 的形状:(batch_size,)或(batch_size,查询的个数)
# 经过变换后,输出的queries,keys,values 的形状:(batch_size*num_heads,查询或者“键-值”对的个数,num_hiddens/num_heads)
queries = transpose_qkv(self.W_q(queries), self.num_heads)
keys = transpose_qkv(self.W_k(keys), self.num_heads)
values = transpose_qkv(self.W_v(values), self.num_heads)
if valid_lens is not None:
# 在轴0,将第一项(标量或者矢量)复制num_heads次,
# 然后如此复制第二项,然后诸如此类。
valid_lens = torch.repeat_interleave(
valid_lens, repeats=self.num_heads, dim=0)
# output的形状:(batch_size*num_heads,查询的个数,num_hiddens/num_heads)
output = self.attention(queries, keys, values, valid_lens)
# output_concat的形状:(batch_size,查询的个数,num_hiddens)
output_concat = transpose_output(output, self.num_heads)
return self.W_o(output_concat)
#@save
def transpose_qkv(X, num_heads):
"""为了多注意力头的并行计算而变换形状"""
# 输入X的形状:(batch_size,查询或者“键-值”对的个数,num_hiddens)
# 输出X的形状:(batch_size,查询或者“键-值”对的个数,num_heads,num_hiddens/num_heads)
X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)
# 输出X的形状:(batch_size,num_heads,查询或者“键-值”对的个数,num_hiddens/num_heads)
X = X.permute(0, 2, 1, 3)
# 最终输出的形状:(batch_size*num_heads,查询或者“键-值”对的个数,num_hiddens/num_heads)
return X.reshape(-1, X.shape[2], X.shape[3])
#@save
def transpose_output(X, num_heads):
"""逆转transpose_qkv函数的操作"""
X = X.reshape(-1, num_heads, X.shape[1], X.shape[2])
X = X.permute(0, 2, 1, 3)
return X.reshape(X.shape[0], X.shape[1], -1)
'''
使用键和值相同的例子进行测试
多头注意力输出的形状是(batch_size,num_queries,num_hiddens)
'''
num_hiddens, num_heads = 100, 5
attention = MultiHeadAttention(num_hiddens, num_hiddens, num_hiddens,
num_hiddens, num_heads, 0.5)
print(attention.eval())
batch_size, num_queries = 2, 4
num_kvpairs, valid_lens = 6, torch.tensor([3, 2])
X = torch.ones((batch_size, num_queries, num_hiddens))
Y = torch.ones((batch_size, num_kvpairs, num_hiddens))
print(attention(X, Y, Y, valid_lens).size())
运行结果:
MultiHeadAttention(
(attention): DotProductAttention(
(dropout): Dropout(p=0.5, inplace=False)
)
(W_q): Linear(in_features=100, out_features=100, bias=False)
(W_k): Linear(in_features=100, out_features=100, bias=False)
(W_v): Linear(in_features=100, out_features=100, bias=False)
(W_o): Linear(in_features=100, out_features=100, bias=False)
)
torch.Size([2, 4, 100])
自注意力和位置编码
查询,键和值来自同一组输入,被称为自注意力
# 自注意机制
num_hiddens, num_heads = 100, 5
attention = d2l.MultiHeadAttention(num_hiddens, num_hiddens, num_hiddens,
num_hiddens, num_heads, 0.5)
print(attention.eval())
batch_size, num_queries, valid_lens = 2, 4, torch.tensor([3, 2])
X = torch.ones((batch_size, num_queries, num_hiddens))
print(attention(X, X, X, valid_lens).size())
运行结果:
MultiHeadAttention(
(attention): DotProductAttention(
(dropout): Dropout(p=0.5, inplace=False)
)
(W_q): Linear(in_features=100, out_features=100, bias=False)
(W_k): Linear(in_features=100, out_features=100, bias=False)
(W_v): Linear(in_features=100, out_features=100, bias=False)
(W_o): Linear(in_features=100, out_features=100, bias=False)
)
torch.Size([2, 4, 100])
总而言之,卷积神经网络和自注意力都拥有并行计算的优势, 而且自注意力的最大路径长度最短。 但是因为其计算复杂度是关于序列长度的二次方,所以在很长的序列中计算会非常慢。 自注意力因为并行计算放弃了顺序操作,为了使用序列的顺序信息,在输入表示中添加位置编码来注入绝对或相对的位置信息。
#@save
class PositionalEncoding(nn.Module):
"""位置编码"""
def __init__(self, num_hiddens, dropout, max_len=1000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(dropout)
# 创建一个足够长的P
self.P = torch.zeros((1, max_len, num_hiddens))
X = torch.arange(max_len, dtype=torch.float32).reshape(
-1, 1) / torch.pow(10000, torch.arange(
0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)
self.P[:, :, 0::2] = torch.sin(X)
self.P[:, :, 1::2] = torch.cos(X)
def forward(self, X):
X = X + self.P[:, :X.shape[1], :].to(X.device)
return self.dropout(X)
encoding_dim, num_steps = 32, 60
pos_encoding = PositionalEncoding(encoding_dim, 0)
pos_encoding.eval()
X = pos_encoding(torch.zeros((1, num_steps, encoding_dim)))
P = pos_encoding.P[:, :X.shape[1], :]
d2l.plot(torch.arange(num_steps), P[0, :, 6:10].T, xlabel='Row (position)',
figsize=(6, 2.5), legend=["Col %d" % d for d in torch.arange(6, 10)])
较高比特位的交替频率低于较低的比特位,只是位置编码使用三角函数在编码维度上降低频率。
transformer
transformer模型完全基于注意力机制,没有任何卷积层或循环神经网络层 基于编码器-解码器的结构来处理序列对,训练时解码器使用目标句子作为输入。源序列和目标序列的嵌入表示加上位置编码,在分别输入到编码器和解码器中。transformer的编码器是由对过相同的层叠加而成的,每个层都有两个子层。第一个子层是多头子注意力汇聚;第二个子层是基于位置的前馈网络。· 在计算编码器的自注意力时,查询、键和值都来自前一个编码器层的输出。收到残差网络的启发,每个子层都采用了残差连接。在残差连接的加法计算之后,紧接着应用层规范化,输入序列对应的每个位置,transformer编码器都将输出一个d维的表示向量。 transformer解码器也是由多个相同层叠加而成,并且层中使用了残差连接和层规范化,除了编码器中描述的两个子层之外,解码器还在这两个子层之间插入了第三个子层,称为encoder-decoder attention。查询来自前一个解码器层的输出,而键和值来自整个编码器的输出。在解码器自注意力中,查询、键和值都来自上一个解码器层的输出。但是,解码器中的每个位置都只能考虑位置之前所有位置。
'''
基于位置的前馈网络对序列中的所有位置表示进行变换时使用的是同一个多层感知机MLP
输入X的形状(批量大小,时间步数或序列长度,隐单元数或特征维度)
将被一个两层感知机转换成形状为(批量大小,时间步数,ffn_num_outputs)的输出张量
'''
class PositionWiseFFN(nn.Module):
"""基于位置的前馈网络
但隐藏层的MLP但是输入是三维
"""
def __init__(self,ffn_num_input,ffn_num_hiddens,ffn_num_outputs,**kwargs):
super(PositionWiseFFN, self).__init__(**kwargs)
# pytorch当输入不是二维的时候,将前面的输入都当成样本维度
self.dense1 = nn.Linear(ffn_num_input,ffn_num_hiddens)
self.relu = nn.ReLU()
self.dense2 = nn.Linear(ffn_num_hiddens,ffn_num_outputs)
def forward(self,X):
return self.dense2(self.relu(self.dense1(X)))
'''
改变张量的最里层维度的尺寸,会改变成基于位置的前馈网络的输出尺寸,因为用同一个多层感知机对所有位置上的输入进行变换
当所有这些位置的输入相同时,它们的输出也是相同的
'''
ffn = PositionWiseFFN(4,4,8)
ffn.eval()
print(ffn(torch.ones((2,3,4)))[0])
运行结果:
tensor([[-0.1585, 0.7526, -0.2140, -0.1525, 0.2685, 0.2211, -0.4938, 0.2770],
[-0.1585, 0.7526, -0.2140, -0.1525, 0.2685, 0.2211, -0.4938, 0.2770],
[-0.1585, 0.7526, -0.2140, -0.1525, 0.2685, 0.2211, -0.4938, 0.2770]],
grad_fn=<SelectBackward0>)
'''
小批量样本内基于批量规范化对数据进行重新中心化和重新缩放的调整
层规范化和批量规范化的目标相同,但是层规范化是基于特征维度进行规范化
尽管批量规范化在计算机视觉中被广泛应用,但在NLP中批量规范化通常不如层规范化的效果好
'''
ln = nn.LayerNorm(2) #针对的是每个样本
bn = nn.BatchNorm1d(2) #针对的是每一列
X = torch.tensor([[1,2],[2,3]],dtype=torch.float32)
# 在训练模式下计算X的均值和方差
print('layer norm:',ln(X),bn(X))
运行结果:
layer norm: tensor([[-1.0000, 1.0000],
[-1.0000, 1.0000]], grad_fn=<NativeLayerNormBackward0>)
batch norm :tensor([[-1.0000, -1.0000],
[ 1.0000, 1.0000]], grad_fn=<NativeBatchNormBackward0>)
class AddNorm(nn.Module):
"""残差连后进行层规范化"""
def __init__(self,normalized_shape,dropout,**kwargs):
super(AddNorm, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
self.ln = nn.LayerNorm(normalized_shape)
def forward(self,X,Y):
return self.ln(self.dropout(Y)+X)
# 残差连接要求两个输入形状相同,以便加法输出张量的形状相同
add_norm = AddNorm([3,4],0.5)
add_norm.eval()
print(add_norm(torch.ones((2,3,4)),torch.ones((2,3,4))).shape)
运行结果:
torch.Size([2, 3, 4])
经过编码器块和解码器块,输入输出的维度是不会发生改变的,方便后续的拼接
#有了组成编码器的基本组件,可以实现编码器中的一个层
# 包含两个子层:多头注意力和基于位置的前馈网络,这两个子层都使用了残差网络连接和紧随层的规范化
class EncoderBlock(nn.Module):
"""transformer编码器块"""
def __init__(self,key_size,query_size,value_size,num_hiddens,norm_shape,ffn_num_input,ffn_num_hiddens,num_heads,
dropout,use_bias=False,**kwargs):
super(EncoderBlock, self).__init__(**kwargs)
self.attention = d2l.MultiHeadAttention(
key_size,query_size,value_size,num_hiddens,num_heads,dropout,use_bias
)
self.addnorm1= AddNorm(norm_shape,dropout)
self.ffn = PositionWiseFFN(ffn_num_input,ffn_num_hiddens,num_hiddens)
self.addnorm2 = AddNorm(norm_shape,dropout)
def forward(self,X,valid_lens):
Y = self.addnorm1(X,self.attention(X,X,X,valid_lens))
return self.addnorm2(Y,self.ffn(Y))
# transformer编码器中的任何层都不会改变输入的形状
X = torch.ones((2,100,24))
valid_lens = torch.tensor([3,2])
encoder_blk = EncoderBlock(24,24,24,24,[100,24],24,48,8,0.5)
encoder_blk.eval()
print(encoder_blk(X,valid_lens).shape)
运行结果:
torch.Size([2, 100, 24])
'''
解码器也是由多个相同的层组成的,每个层中包含三个子层:解码器自注意力,解码器-编码器注意力和基于为位置的前馈神网络
这些子层也被残差连接和紧随的层规范化围绕
在掩蔽多头解码器子注意力层中,查询、键和值都来自上一个解码器层的输出,在训练截断,其输出序列的所有位置的词元都是已知的
在预测阶段,其输出序列的词元是诸葛生成的,在解码器时间步中,只有生成词元才能用于解码器的自注意力计算
为了在解码器中保留自回归的属性,掩蔽子注意力设定了参数,使得任何个查询都只会与解码器中所有生成词元的位置进行注意力计算
'''
class DecoderBlock(nn.Module):
"""解码器中的第i个块"""
def __init__(self,key_size,query_size,value_size,num_hiddens,norm_shape,ffn_num_input,ffn_num_hiddens,num_heads,dropout,i,**kwargs):
super(DecoderBlock, self).__init__(**kwargs)
self.i = i
self.attention1 = d2l.MultiHeadAttention(
key_size,query_size,value_size,num_hiddens,num_heads,dropout
)
self.addnorm1 = AddNorm(norm_shape,dropout)
self.attention2 = d2l.MultiHeadAttention(
key_size,query_size,value_size,num_hiddens,num_heads,dropout
)
self.addnorm2 = AddNorm(norm_shape,dropout)
self.ffn = PositionWiseFFN(ffn_num_input,ffn_num_hiddens,num_hiddens)
self.addnorm3 = AddNorm(norm_shape,dropout)
def forward(self,X,state):
enc_outputs,enc_valid_lens = state[0],state[1]
# 训练阶段,输出序列的所有词元都在同一时间处理,state[2][self.i]初始化为none
# 预测阶段,输出序列是通过词元一个接一个解码的,state[2][self.i]包含着知道当前时间步第i个块解码的输出表示
if state[2][self.i] is None:
key_values = X
else:
key_values = torch.cat((state[2][self.i],X),axis=1)
state[2][self.i] = key_values
if self.training:
batch_size,num_steps,_ = X.shape
# dec_valid_lens的开头:(batch_size,num_steps)
# 其中每一行是[1,2,....,num_steps]
dec_valid_lens = torch.arange(
1,num_steps+1,device=X.device
).repeat(batch_size,1)
else:
dec_valid_lens = None
# 自注意力
X2 = self.attention1(X,key_values,key_values,dec_valid_lens)
Y = self.addnorm1(X,X2)
# 编码器-解码器注意力 enc_outputs的开头:(batch_size,num_steps,num_hiddens)
Y2 = self.attention2(Y,enc_outputs,enc_outputs,enc_valid_lens)
Z = self.addnorm2(Y,Y2)
return self.addnorm3(Z,self.ffn(Z)),state
# 为了方便在解码器和编码器注意力中进行缩放点积运算和残差连接中进行加法计算,编码器和解码器的特征维度嗾使num_hiddens
decoder_blk = DecoderBlock(24,24,24,24,[100,24],24,48,8,0.5,0)
decoder_blk.eval()
X =torch.ones((2,100,24))
state = [encoder_blk(X,valid_lens),valid_lens,[None]]
print(decoder_blk(X,state)[0].shape)
运行结果:
torch.Size([2, 100, 24])
构建transformer块
class TransformerEncoder(d2l.Encoder):
"""transformer编码器"""
def __init__(self, vocab_size, key_size, query_size, value_size,
num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, num_layers, dropout, use_bias=False, **kwargs):
super(TransformerEncoder, self).__init__(**kwargs)
self.num_hiddens = num_hiddens
self.embedding = nn.Embedding(vocab_size, num_hiddens)
self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)
self.blks = nn.Sequential()
for i in range(num_layers):
self.blks.add_module("block"+str(i),
EncoderBlock(key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, dropout, use_bias))
def forward(self, X, valid_lens, *args):
# 因为位置编码值在-1和1之间,
# 因此嵌入值乘以嵌入维度的平方根进行缩放,
# 然后再与位置编码相加。
X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
self.attention_weights = [None] * len(self.blks)
for i, blk in enumerate(self.blks):
X = blk(X, valid_lens)
self.attention_weights[
i] = blk.attention.attention.attention_weights
return X
'''
构建了由num_layers个DecoderBlock实例组成的transformer解码器
最后通过一个全连接层计算所有的vocab_size个词元可能的输出次元的预测值
解码器的子注意力权重和编码器注意力权重都被存储下来,方便可视化
'''
class TransformerDecoder(d2l.AttentionDecoder):
def __init__(self,vocab_size,key_size,query_size,value_size,
num_hiddens,norm_shape,ffn_num_input,ffn_num_hiddens,num_heads,num_layers,dropout,**kwargs):
super(TransformerDecoder, self).__init__(**kwargs)
self.num_hiddens = num_hiddens
self.num_layers = num_layers
self.embedding = nn.Embedding(vocab_size,num_hiddens)
self.pos_encoding = d2l.PositionalEncoding(num_hiddens,dropout)
self.blks = nn.Sequential()
for i in range(num_layers):
self.blks.add_module("block"+str(i),
DecoderBlock(key_size,query_size,value_size,num_hiddens,norm_shape,ffn_num_input,ffn_num_hiddens,num_heads,dropout,i))
self.dense = nn.Linear(num_hiddens,vocab_size)
def init_state(self,enc_outputs,enc_valid_lens,*args):
return [enc_outputs,enc_valid_lens,[None]*self.num_layers]
def forward(self,X,state):
X = self.pos_encoding(self.embedding(X)*math.sqrt(self.num_hiddens))
self._attention_weights = [[None]*len(self.blks) for _ in range(2)]
for i,blk in enumerate(self.blks):
X,state = blk(X,state)
#解码器自注意权重
self._attention_weights[0][i] = blk.attention1.attention.attention_weights
# 编码器-解码器自注意力权重
self._attention_weights[1][i] = blk.attention2.attention.attention_weights
return self.dense(X),state
@property
def attention_weights(self):
return self._attention_weights
# 依照transformer架构来实例化编码器-解码器模型。在这里,指定transformer的编码器和解码器都是2层,都使用4头注意力
# 为了进行序列到序列的学习,在‘英语-法语’机器翻译数据集上训练transformer模型
num_hiddens,num_layers,dropout,batch_size,num_steps = 32,3,0.1,64,10
lr,num_epochs,device = 0.005,200,d2l.try_gpu()
ffn_num_input,ffn_num_hiddens,num_heads = 32,64,4
key_size,query_size,value_size = 32,32,32
norm_shape =[32]
train_iter,src_vocab,tgt_vocab = d2l.load_data_nmt(batch_size,num_steps)
encoder = TransformerEncoder(
len(src_vocab), key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
num_layers, dropout)
decoder = TransformerDecoder(
len(tgt_vocab), key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
num_layers, dropout)
net = d2l.EncoderDecoder(encoder,decoder)
d2l.train_seq2seq(net,train_iter,lr,num_epochs,tgt_vocab,device)
#训练结束后,使用transformer模型将一些句子翻成法语,并计算它们的bleu分数
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng,fra in zip(engs,fras):
translation,dec_attention_weight_seq = d2l.predict_seq2seq(
net,eng,src_vocab,tgt_vocab,num_steps,device,True
)
print(f'{eng} =>{translation},',f'bleu{d2l.bleu(translation,fra,k=2):.3f}')
运行结果:
go . =>va le chercher !, bleu0.000
i lost . =>j'ai perdu ., bleu1.000
he's calm . =>il est parti ., bleu0.658
i'm home . =>je suis chez moi ., bleu1.000
# 可视化transformer的注意力权重,编码器自注意的形状为(编码器层数,注意力头数,num_steps或查询的数目,num_steps或键值对的数目)
enc_attention_weights = torch.cat(net.encoder.attention_weights,0).reshape((num_layers,num_heads,-1,num_steps))
print(enc_attention_weights.shape)
运行结果:
torch.Size([3, 4, 10, 10])
# 在编码器的自注意力中,查询和键都来自相同的输入序列,因为填充词元是不携带信息的,
# 通过指输入序列的有效长度可以避免查询与使用填充词元的位置计算注意力
# 将逐行呈现两层多头注意力的权重,每个注意力头都根据查询、键和值的不同子的表示子空间来表示不同的注意力
d2l.show_heatmaps(
enc_attention_weights.cpu(),xlabel='key position',ylabel='query position',
titles=['head %d'% i for i in range(1,5)],
figsize=(7,3.5))
dec_attention_weights_2d = [head[0].tolist()
for step in dec_attention_weight_seq
for attn in step for blk in attn for head in blk]
dec_attention_weights_filled = torch.tensor(
pd.DataFrame(dec_attention_weights_2d).fillna(0.0).values)
dec_attention_weights = dec_attention_weights_filled.reshape((-1, 2, num_layers, num_heads, num_steps))
dec_self_attention_weights, dec_inter_attention_weights = dec_attention_weights.permute(1, 2, 3, 0, 4)
print(dec_self_attention_weights.shape, dec_inter_attention_weights.shape)
运行结果:
torch.Size([3, 4, 6, 10]) torch.Size([3, 4, 6, 10])
# 可视化编码器自注意力
d2l.show_heatmaps(
dec_self_attention_weights[:, :, :, :len(translation.split()) + 1],
xlabel='Key positions', ylabel='Query positions',
titles=['Head %d' % i for i in range(1, 5)], figsize=(7, 3.5))
d2l.plt.show()
# 可视化解码器自注意力
d2l.show_heatmaps(
dec_inter_attention_weights, xlabel='Key positions',
ylabel='Query positions', titles=['Head %d' % i for i in range(1, 5)],
figsize=(7, 3.5))
d2l.plt.show()
|