前文再续,书接上一回。接下来我们看看wav2vec2怎么构建transformer-encoder。
在我们的wav2vec_model方法中,提取特征模型建立后就开始建立transformer的encoder模型
我们跳进去compoents._get_encoder方法中看看。
encoder
直接看代码:
def _get_encoder(
in_features: int,
embed_dim: int,
dropout_input: float,
pos_conv_kernel: int,
pos_conv_groups: int,
num_layers: int,
num_heads: int,
attention_dropout: float,
ff_interm_features: int,
ff_interm_dropout: float,
dropout: float,
layer_norm_first: bool,
layer_drop: float,
) -> Encoder:
feature_projection = FeatureProjection(in_features, embed_dim, dropout_input)
pos_conv = ConvolutionalPositionalEmbedding(embed_dim, pos_conv_kernel, pos_conv_groups)
encoder_layers = nn.ModuleList()
for _ in range(num_layers):
attention = SelfAttention(
embed_dim=embed_dim,
num_heads=num_heads,
dropout=attention_dropout,
)
feed_forward = FeedForward(
io_features=embed_dim,
intermediate_features=ff_interm_features,
intermediate_dropout=ff_interm_dropout,
output_dropout=dropout,
)
encoder_layers.append(
EncoderLayer(
attention=attention,
dropout=dropout,
layer_norm_first=layer_norm_first,
feed_forward=feed_forward,
)
)
transformer = Transformer(
pos_conv_embed=pos_conv,
dropout=dropout,
layers=encoder_layers,
layer_norm_first=not layer_norm_first,
layer_drop=layer_drop,
)
return Encoder(feature_projection, transformer)
主要工作:
- 特征映射
feature_projection - 特征位置embedding,主要为了记录位置
- transformer的encoder结构的搭建
- 返回encoder模型
现在我们一个一个看里面的代码结构。
特征映射
feature_projection = FeatureProjection(in_features, embed_dim, dropout_input)
in_features 的值是特征提取时最后的output_channel的大小。
in_features=512
out_feature=embed_dim=768
dropout_input=0.1
特征映射用到了FeatureProjection对象,我们点进去看看结构是怎么样的。
里面代码结构如下:
class FeatureProjection(Module):
def __init__(
self,
in_features: int,
out_features: int,
dropout: float,
):
super().__init__()
self.layer_norm = nn.LayerNorm(in_features)
self.projection = nn.Linear(
in_features,
out_features,
)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
"""
Args:
x (Tensor):
Feature Tensor. shape: ``[batch, frame, in_feature]``
Returns:
Tensor: Projected features. ``[batch, frame, out_feature]``.
"""
x = self.layer_norm(x)
x = self.projection(x)
x = self.dropout(x)
return x
好像没什么好说的…,直接下一个。
特征位置embedding
pos_conv = ConvolutionalPositionalEmbedding(embed_dim, pos_conv_kernel, pos_conv_groups)
embed_dim=768
pos_conv_kernel=128
pos_conv_groups=16
具体代码如下:
class ConvolutionalPositionalEmbedding(Module):
"""Positional embedding which is placed at the beginning of Transformer.
Args:
embed_dim (int): Feature dimension of the input Tensor.
kernel_size (int): The number of frames to be use.
groups (int): The number of groups in feature dimensions.
"""
def __init__(
self,
embed_dim: int,
kernel_size: int,
groups: int,
):
super().__init__()
self.embed_dim = embed_dim
self.conv = nn.Conv1d(
in_channels=embed_dim,
out_channels=embed_dim,
kernel_size=kernel_size,
padding=kernel_size // 2,
groups=groups,
)
self.conv = nn.utils.weight_norm(self.conv, name="weight", dim=2)
self.num_remove: int = 1 if kernel_size % 2 == 0 else 0
def __prepare_scriptable__(self):
for hook in self.conv._forward_pre_hooks.values():
if hook.__module__ == "torch.nn.utils.weight_norm" and hook.__class__.__name__ == "WeightNorm":
_LG.warning("Removing weight_norm from %s", self.__class__.__name__)
torch.nn.utils.remove_weight_norm(self.conv)
return self
def forward(self, x):
"""
Args:
x (Tensor): shape ``[batch, frame, feature]``.
Returns:
Tensor: The resulting feature. Shape ``[batch, frame, feature]``.
"""
x = x.transpose(-2, -1)
x = self.conv(x)
if self.num_remove > 0:
x = x[..., : -self.num_remove]
x = torch.nn.functional.gelu(x)
x = x.transpose(-2, -1)
return x
处对象初始化方法中可以看出,位置embedding也是用conv1d进行获得的。
然后权重标准化的目的是防止过拟合(不过我也不太懂)
SelfAttention
接下来看看selfAttention的源代码。
先看看传入的参数:
attention = SelfAttention(
embed_dim=embed_dim,
num_heads=num_heads,
dropout=attention_dropout,
)
embed_dim=768,代表输出的特征维度为768
num_heads=12,代表12头注意力机制
attention_dropout=0.1
然后点进SelfAttention对象中看看结构(把解释都写下面了):
class SelfAttention(Module):
def __init__(
self,
embed_dim: int,
num_heads: int,
dropout: float = 0.0,
):
super().__init__()
head_dim = embed_dim // num_heads
if head_dim * num_heads != embed_dim:
raise ValueError(f"`embed_dim ({embed_dim})` is not divisible by `num_heads ({num_heads})`")
self.embed_dim = embed_dim
self.num_heads = num_heads
self.dropout = torch.nn.Dropout(dropout)
self.head_dim = head_dim
self.scaling = self.head_dim ** -0.5
self.k_proj = nn.Linear(embed_dim, embed_dim, bias=True)
self.v_proj = nn.Linear(embed_dim, embed_dim, bias=True)
self.q_proj = nn.Linear(embed_dim, embed_dim, bias=True)
self.out_proj = nn.Linear(embed_dim, embed_dim, bias=True)
def forward(
self,
x: Tensor,
attention_mask: Optional[Tensor] = None,
) -> Tensor:
if x.ndim != 3 or x.shape[2] != self.embed_dim:
raise ValueError(
f"The expected input shape is (batch, sequence, embed_dim=={self.embed_dim}). " f"Found {x.shape}."
)
batch_size, length, embed_dim = x.size()
if attention_mask is not None:
shape_ = (batch_size, 1, length, length)
if attention_mask.size() != shape_:
raise ValueError(f"The expected attention mask shape is {shape_}. " f"Found {attention_mask.size()}.")
shape = (batch_size, length, self.num_heads, self.head_dim)
q = self.q_proj(x).view(*shape).transpose(2, 1)
k = self.k_proj(x).view(*shape).permute(0, 2, 3, 1)
v = self.v_proj(x).view(*shape).transpose(2, 1)
weights = self.scaling * (q @ k)
if attention_mask is not None:
weights += attention_mask
weights = torch.nn.functional.softmax(weights, dim=-1)
weights = self.dropout(weights)
output = weights @ v
output = output.transpose(2, 1).reshape(batch_size, length, embed_dim)
output = self.out_proj(output)
return output
FeedForward
接下来我们看feed_forward的模型获取。
feed_forward = FeedForward(
io_features=embed_dim,
intermediate_features=ff_interm_features,
intermediate_dropout=ff_interm_dropout,
output_dropout=dropout,
)
FeedForward对象传入的参数有如下几个参数:(名字写得花里胡哨的,还是不是传特征个数)
io_features=embed_dim=768
intermediate_features=ff_interm_features=3072
intermediate_dropout=ff_interm_dropout=0.0
output_dropout=dropout=0.1
看看FeedForward对象的代码如何:
class FeedForward(Module):
"""Layer that follows attention layer in encoder layer."""
def __init__(
self,
io_features: int,
intermediate_features: int,
intermediate_dropout: float,
output_dropout: float,
):
super().__init__()
self.intermediate_dense = nn.Linear(io_features, intermediate_features)
self.intermediate_dropout = nn.Dropout(intermediate_dropout)
self.output_dense = nn.Linear(intermediate_features, io_features)
self.output_dropout = nn.Dropout(output_dropout)
def forward(self, x):
"""
Args:
x (Tensor): shape: `(batch, sequence_length, io_features)`
Returns:
x (Tensor): shape: `(batch, sequence_length, io_features)`
"""
x = self.intermediate_dense(x)
x = torch.nn.functional.gelu(x)
x = self.intermediate_dropout(x)
x = self.output_dense(x)
x = self.output_dropout(x)
return x
哇,简单多了,就两个线性网络层,中间夹了个gelu。
EncoderLayer
现在看第三部分,encoder_layers 的构建。
encoder_layers.append(
EncoderLayer(
attention=attention,
dropout=dropout,
layer_norm_first=layer_norm_first,
feed_forward=feed_forward,
)
)
encoder_layers 是一个nn.ModuleList()对象。所以往里面加入的,就是我们的encoder模型。
我们看看EncoderLayer对象是什么。
首先看看传入的参数有如下参数:
attention=attention,就是传入多头模型
dropout=dropout=0.1
layer_norm_first=layer_norm_first=false
feed_forward=feed_forward,传入全连接层
看看源代码:
class EncoderLayer(Module):
"""A layer unit in encoder. Combines multihead self attention and feed forward."""
def __init__(
self,
attention: Module,
dropout: float,
layer_norm_first: bool,
feed_forward: Module,
):
super().__init__()
self.attention = attention
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(attention.embed_dim)
self.layer_norm_first = layer_norm_first
self.feed_forward = feed_forward
self.final_layer_norm = nn.LayerNorm(attention.embed_dim)
def forward(
self,
x: Tensor,
attention_mask: Optional[Tensor] = None,
):
"""
Args:
x (Tensor): shape: `(batch, sequence_length, embed_dim)`
attention_mask (Tensor or None, optional):
shape: `(batch, 1, sequence_length, sequence_length)`
"""
residual = x
if self.layer_norm_first:
x = self.layer_norm(x)
x = self.attention(x, attention_mask)
x = self.dropout(x)
x = residual + x
if self.layer_norm_first:
x = x + self.feed_forward(self.final_layer_norm(x))
else:
x = self.layer_norm(x)
x = self.final_layer_norm(x + self.feed_forward(x))
return x
Transformer
把encoder结构搞完了,接下来看看transformer对象的结构如何。
先看参数:
transformer = Transformer(
pos_conv_embed=pos_conv,
dropout=dropout,
layers=encoder_layers,
layer_norm_first=not layer_norm_first,
layer_drop=layer_drop,
)
pos_conv_embed=pos_conv,传入位置embedding结构
dropout=dropout=0.1
layers=encoder_layers,传入encoder层数
layer_norm_first=not layer_norm_first=true
layer_drop=layer_drop=0.05
再看看transformer代码结构如何:
class Transformer(Module):
def __init__(
self,
pos_conv_embed: Module,
dropout: float,
layers: Module,
layer_norm_first: bool,
layer_drop: float,
):
super().__init__()
self.pos_conv_embed = pos_conv_embed
self.layer_norm = nn.LayerNorm(pos_conv_embed.embed_dim)
self.layer_norm_first = layer_norm_first
self.layer_drop = layer_drop
self.dropout = nn.Dropout(dropout)
self.layers = layers
def _preprocess(self, x: Tensor):
x = x + self.pos_conv_embed(x)
if self.layer_norm_first:
x = self.layer_norm(x)
x = self.dropout(x)
return x
def forward(
self,
x: Tensor,
attention_mask: Optional[Tensor] = None,
):
x = self._preprocess(x)
for layer in self.layers:
if not (self.training and torch.rand(1).item() <= self.layer_drop):
x = layer(x, attention_mask)
if not self.layer_norm_first:
x = self.layer_norm(x)
return x
def get_intermediate_outputs(
self,
x: Tensor,
attention_mask: Optional[Tensor] = None,
num_layers: Optional[int] = None,
) -> List[Tensor]:
if num_layers is not None:
if not 0 < num_layers <= len(self.layers):
raise ValueError(f"`num_layers` must be between [1, {len(self.layers)}]")
ret: List[Tensor] = []
x = self._preprocess(x)
for layer in self.layers:
x = layer(x, attention_mask)
ret.append(x)
if num_layers is not None and len(ret) >= num_layers:
return ret
return ret
主要工作:简单来说就是把embedding结构和encoder多层结构给整合了
Encoder
最后看一个Encoder类,看看传入的参数。
Encoder(feature_projection, transformer)
feature_projection和transformer都是上面建立过的module。
Encoder代码:
class Encoder(Module):
def __init__(
self,
feature_projection: Module,
transformer: Module,
):
super().__init__()
self.feature_projection = feature_projection
self.transformer = transformer
def _preprocess(
self,
features: Tensor,
lengths: Optional[Tensor] = None,
) -> Tuple[Tensor, Optional[Tensor]]:
x = self.feature_projection(features)
mask: Optional[Tensor] = None
if lengths is not None:
batch_size, max_len, _ = x.shape
mask = torch.arange(max_len, device=lengths.device).expand(batch_size, max_len) >= lengths[:, None]
x[mask] = 0.0
mask = -10000.0 * mask[:, None, None, :].to(dtype=features.dtype)
mask = mask.expand(batch_size, 1, max_len, max_len)
return x, mask
def forward(
self,
features: Tensor,
lengths: Optional[Tensor] = None,
) -> Tensor:
x, mask = self._preprocess(features, lengths)
x = self.transformer(x, attention_mask=mask)
return x
def extract_features(
self,
features: Tensor,
lengths: Optional[Tensor] = None,
num_layers: Optional[int] = None,
) -> List[Tensor]:
x, masks = self._preprocess(features, lengths)
return self.transformer.get_intermediate_outputs(x, attention_mask=masks, num_layers=num_layers)
关于mask方面的设定,我表示没怎么看懂。其他都还好。
不过总的来说,也算是了解了整个模型的构造过程了。
总结
因此,整个wav2vec2_model结构如下:
蓝色字代表步骤解释,并非代码。其中,全部的对象都是为了构造一整个模型而存在。
近期发现
这个模型是用来直接做asr微调任务的,也就是说不需要进行预训练操作。整个模型预训练过的权重也在下面这个步骤下载了:
这导致了,我们看不到预训练的损失函数设置和gamble-softmax和量化过程。
只能直接用这个模型进行微调任务。
这我就不乐意了,虽然看代码的时间学到了不少东西,但强行学不完的感觉也有点憋屈。fairseq的操作也是一样,直接加载pt文件得到模型权重。
我们的wav2vec2.0的微调任务模型就这些。后面还有的就是调用模型过程和ctc解码操作了。有时间会继续看下去的。
|