MASR语音识别算法简介
1. 简介
MASR是一款基于Pytorch实现的自动语音识别框架,MASR全称是神奇的自动语音识别框架(Magical Automatic Speech Recognition),MASR致力于简单,实用的语音识别项目。可部署在服务器,Nvidia Jetson设备,未来还计划支持Android等移动设备。
MASR 使用的是门控卷积神经网络(Gated Convolutional Network),网络结构类似于 Facebook 在 2016 年提出的 Wav2letter,只使用卷积神经网络(CNN)实现的语音识别。但是使用的激活函数不是 ReLU 或者是 HardTanh ,而是 GLU (门控线性单元)。因此称作门控卷积网络。根据我的实验,使用 GLU 的收敛速度比 HardTanh 要快。
- 以下用字错误率 CER 来衡量模型的表现,CER = 编辑距离 / 句子长度,越低越好,大致可以理解为 1 - CER 就是识别准确率。
2. 声音预处理
2.1 声音的本质与模数化
音乐是人类的通用语言,不分国界不分种族。
抖音短视频爆火的关键因素之一,就是普通人也能便捷地使用BGM表达自我。
从感性角度看,音乐可以有很多种解释,如:
- 音乐是有逻辑的声音。
- 音乐是以声音和时间为材料的艺术。
- 音乐是思想情感的表达,是精神的延续。
- ……
而从数学角度看,音乐就是时间和频率的关系。
声音的本质是波,人类听觉的原理就是波引起了耳朵鼓膜的振动。
人们用不同乐器、不同力度,在一段连续时间里敲击,就组合出了时间和频率的关系。
一切物体都有自己的频率,所以整个世界也可以理解为一篇乐章。
当音乐被计算机数字化后,我们就可以用文件的形式保存它。但现实世界的声音是连续的,而计算机世界是离散(由0和1组成)的。想要用计算机捕捉声音,就得把连续信息转为离散的数据,这个过程就是信号的“模数转化”。
处理过程中最关键的参数就是“采样率”,即每秒钟用多少份数据表达声音信号。此外每份数据大小以及声道数,与采样率一起,决定了保存后声音和原声间的差距。
和图像一样,音乐也有很多种压缩算法。所谓“无损音乐”,就是确保源文件信息不丢失情况下压缩数据,常见格式如flac 、ape 、wav ;更常见的音乐格式是mp3 ,是一种有损压缩格式,虽然老旧,但依旧流行。
Python标准模块wave 支持wav 文件读写,但涉及到压缩算法时,都需要借助外部模块。其中功能最全也最流行的就是ffmpeg ,它是开源视频处理软件,支持绝大多数的音视频格式编码,被广泛引用于各大视频网站和商业软件。
2.2 使用soundfile读取音频文件
本实例是使用soundfile去读取音频文件,下面是读取WAV文件的代码。
samples, sample_rate = soundfile.read(file, dtype='float32')
可以看到读取完成以后,soundfile返回两个值,一个是采样的结果,如果是单通道,采样结果的形状应该为(N, ),如这里读取了test.wav后,返回的samples是(134240,)。如果是立体声,结果是(N, number channels)。这里读取完成返回的sample_rate=16000。
2.3 音频数据处理
音频振幅的归一化
将得到的音频数据进行归一化的处理,这个函数需要传入一个目标的RMS值,默认为20。
def normalize(self, target_db=-20, max_gain_db=300.0):
"""将音频归一化,使其具有所需的有效值(以分贝为单位)
:param target_db: Target RMS value in decibels. This value should be
less than 0.0 as 0.0 is full-scale audio.
:type target_db: float
:param max_gain_db: Max amount of gain in dB that can be applied for
normalization. This is to prevent nans when
attempting to normalize a signal consisting of
all zeros.
:type max_gain_db: float
:raises ValueError: If the required gain to normalize the segment to
the target_db value exceeds max_gain_db.
"""
gain = target_db - self.rms_db
if gain > max_gain_db:
raise ValueError(
"无法将段规范化到 %f dB,因为可能的增益已经超过max_gain_db (%f dB)" % (target_db, max_gain_db))
self.gain_db(min(max_gain_db, target_db - self.rms_db))
傅里叶变换把时域特征转化成频域特征
预处理的第二步是把提取的时域特征进行快速傅里叶变换得到频域特征,关于傅里叶变换的具体原理,这里就不讲述了,有兴趣可以去网上了解下,傅里叶变换的相关代码如下,其中用到了numpy模块下面的API:
@staticmethod
def _compute_linear(samples, sample_rate, stride_ms=10.0, window_ms=20.0, eps=1e-14):
stride_size = int(0.001 * sample_rate * stride_ms)
window_size = int(0.001 * sample_rate * window_ms)
truncate_size = (len(samples) - window_size) % stride_size
samples = samples[:len(samples) - truncate_size]
nshape = (window_size, (len(samples) - window_size) // stride_size + 1)
nstrides = (samples.strides[0], samples.strides[0] * stride_size)
windows = np.lib.stride_tricks.as_strided(samples, shape=nshape, strides=nstrides)
assert np.all(windows[:, 1] == samples[stride_size:(stride_size + window_size)])
weighting = np.hanning(window_size)[:, None]
fft = np.fft.rfft(windows * weighting, n=None, axis=0)
fft = np.absolute(fft)
fft = fft ** 2
scale = np.sum(weighting ** 2) * sample_rate
fft[1:-1, :] *= (2.0 / scale)
fft[(0, -1), :] /= scale
freqs = float(sample_rate) / window_size * np.arange(fft.shape[0])
ind = np.where(freqs <= (sample_rate / 2))[0][-1] + 1
linear_feat = np.log(fft[:ind, :] + eps)
return linear_feat
该函数返回经过傅里叶变换后的值,这里的linear_feat形状为(161, 838),其中161为特征的大小,838是特征的长度,后续会在linear_feat前面增加一维,表示batch_size。
3. 模型结构
算法的模型文件是位于masr/model_utils/deepspeech2/model.py 下面,利用pytorch打印模型的结构,输出如下所示:
DeepSpeech2Model(
(conv): ConvStack(
(conv1): ConvBn(
(conv): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2))
(act): GELU()
)
(conv2): ConvBn(
(conv): Conv2d(32, 32, kernel_size=(3, 3), stride=(2, 2))
(act): GELU()
)
)
(rnn): RNNStack(
(rnns): ModuleList(
(0): RNNForward(
(rnn): GRU(1248, 1024, batch_first=True)
(norm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
)
(1): RNNForward(
(rnn): GRU(1024, 1024, batch_first=True)
(norm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
)
(2): RNNForward(
(rnn): GRU(1024, 1024, batch_first=True)
(norm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
)
(3): RNNForward(
(rnn): GRU(1024, 1024, batch_first=True)
(norm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
)
(4): RNNForward(
(rnn): GRU(1024, 1024, batch_first=True)
(norm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
)
)
)
(output): Linear(in_features=1024, out_features=3894, bias=True)
)
从pytorch打印的summary可以看到,DeepSpeech2Model模型是由一个CNN(卷积神经网络)层组,一个RNN(循环神经网络)层组已经一个Linear(线性)层所组成,下面我们分别介绍这几个层。
3.1 CNN网络
CNN网络的输入数据的形状应该为(B, D, T)其中,B是batch_size, D是audio_feature,T是audio_length。音频的文件是由soundfile 库读取的,在我们的测试音频test.wav 中经过读取,输出的numpy数组的形状为(1, 161, 838),即batch=1一个音频,audio_feature=161,audio_length=838。这个最大的卷积层,作者把他命名成ConvStack,即堆叠的卷积组,这里截取了部分的代码。
class ConvStack(nn.Module):
"""具有堆叠卷积层的卷积组
:param feat_size: 输入音频的特征大小
:type feat_size: int
:param conv_out_channels: 卷积层输出大小
:type conv_out_channels: int
"""
def __init__(self, feat_size, conv_out_channels):
super().__init__()
self.conv1 = ConvBn(in_channels=1,
out_channels=conv_out_channels,
kernel_size=3,
stride=2,
input_dim=feat_size)
self.conv2 = ConvBn(in_channels=conv_out_channels,
out_channels=conv_out_channels,
kernel_size=3,
stride=2,
input_dim=self.conv1.output_dim)
self.output_dim = self.conv2.output_dim * conv_out_channels
def forward(self, x, x_len):
"""
x: shape [B, D, T]
x_len : shape [B]
"""
x = x.permute(0, 2, 1)
x = x.unsqueeze(1)
x, x_len = self.conv1(x, x_len)
x, x_len = self.conv2(x, x_len)
x = x.permute(0, 2, 1, 3)
x = x.reshape([x.shape[0], x.shape[1], -1])
return x, x_len
上面的代码可以看到,堆叠的卷积层里面嵌套了两个自定义的ConvBn层。可以看到这两个ConvBn层的卷积核大小都为3,步长是2,输出的通道数为32。在进入卷积层时候,除了需要传音频的数据(B, D, T),还需要一个名为audio_length 的值。这里测试样例中audio_length=838。
由于普通卷积层的数据应该为(Batch_size, channels, h, w),因为demo的声音文件是非立体声,所以这里的channel=1。这需要我们对输入的数据做一个转换,由(B, D, T)转换成(B, C=1, T, D)。这样就可以进行卷积了。需要注意的是,卷积当中,作者使用了gelu(gaussian error linear units) 就是我们常说的高斯误差线性单元 ,它是一种高性能的神经网络激活函数,因为gelu的非线性变化是一种符合预期的随机正则变换方式,总体来说,经过一层卷积与二层卷积以后,音频数据的形状变化如下面所示:
torch.Size([1, 1, 838, 161]) tensor([838.])
torch.Size([1, 32, 418, 80]) tensor([418], dtype=torch.int32)
torch.Size([1, 32, 208, 39]) tensor([208], dtype=torch.int32)
torch.Size([1, 208, 1248]) tensor([208], dtype=torch.int32)
可以看到音频的T,D值在降低,试试因为卷积层提取特征的缘故,每一层,作者会顺道返回音频数据output_dim。output_dim的计算公式为。
self.output_dim = (input_dim - self.kernel_size) // self.stride + 1
上面的公式实质上就是输出dim的计算公式了。细心的读者可能已经发现了,output_dim即为T(audio_length)值,也就是data中第2维的值,所以这里也可以直接返回data第二维的值。卷积层后,作者通过交换通道顺序,继续把D值(audio feature)与通道值相乘后并成一维输出。即上面的32*39=1248,得到了一个三维的tensor。最终卷积stack的输出结果为(1, 208, 1248)分别为(B,T,D)。
3.2 RNN网络
和conv_stack卷积层堆叠的结构类似,RNN网络也是由5个循环神经网络堆砌而成的,其最小的神经单元为一个单向的GRU。RNN堆叠层可以详细看下面的代码:
class RNNStack(nn.Module):
"""堆叠单向GRU层
:param i_size: GRU层的输入大小
:type i_size: int
:param h_size: GRU层的隐层大小
:type h_size: int
:param num_rnn_layers: rnn层数
:type num_rnn_layers: int
:return: RNN组的输出层
:rtype: nn.Layer
"""
def __init__(self, i_size: int, h_size: int, num_rnn_layers: int):
super().__init__()
self.rnns = nn.ModuleList()
self.output_dim = h_size
self.num_rnn_layers = num_rnn_layers
self.rnns.append(RNNForward(rnn_input_size=i_size, h_size=h_size))
for i in range(0, self.num_rnn_layers - 1):
self.rnns.append(RNNForward(rnn_input_size=h_size, h_size=h_size))
def forward(self, x, x_lens, init_state_h_box=None):
if init_state_h_box is not None:
init_state_list = torch.split(init_state_h_box, 1, dim=0)
else:
init_state_list = [None] * self.num_rnn_layers
final_chunk_state_list = []
for rnn, init_state in zip(self.rnns, init_state_list):
x, final_state = rnn(x, x_lens, init_state)
final_chunk_state_list.append(final_state)
final_chunk_state_h_box = torch.stack(final_chunk_state_list, dim=0)
return x, final_chunk_state_h_box
代码中,输入x是来自于上一层的输出形状是(1, 208, 1248),作者这里定义了一个RNNForward的类,RNNForward的部分代码如下所示:
class RNNForward(nn.Module):
def __init__(self, rnn_input_size, h_size):
super().__init__()
self.rnn = nn.GRU(input_size=rnn_input_size,
hidden_size=h_size,
bidirectional=False,
batch_first=True)
self.norm = nn.LayerNorm(h_size)
def forward(self, x, x_lens, init_state):
x = nn.utils.rnn.pack_padded_sequence(x, x_lens.cpu(), batch_first=True)
x, final_state = self.rnn(x, init_state)
x, _ = nn.utils.rnn.pad_packed_sequence(x, batch_first=True)
x = self.norm(x)
return x, final_state
RNNForward里面本质上是GRU,5层的RNNForward堆叠而成RNN Stack,RNNForward的input_size来自于上一层的输出,这里是1248,hidden_size=1024。RNNStack最后通过运输将会返回一个(1, 208, 1024)的向量作为输出,以及(5, 1, 1, 1024),final_chunk_state_h_box,即5层的隐状态输出。
3.3 线性回归层
模型最后经过线性回归层,线性回归层的隐藏层数设置为vocab_size的小,data会由原来(1, 208, 1024)变成(1, 208, vocab_size),这里的单词总数是3894,所以最后的输出logits为(1, 208, 3894)。
3.3 返回数据以及后处理
在test.wav文件中,模型输出的形状为(1, 208, 3894),分别为batch_size,sequence_length,以及vocab_size。接着就到模型的后处理环节。后处理环节的代码如下所示:
def greedy_decoder(probs_seq, vocabulary, blank_index=0):
"""CTC贪婪(最佳路径)解码器
由最可能的令牌组成的路径将被进一步后处理到去掉连续重复和所有空白
:param probs_seq: 每一条都是2D的概率表。每个元素都是浮点数概率的列表一个字符
:type probs_seq: numpy.ndarray
:param vocabulary: 词汇列表
:type vocabulary: list
:param blank_index 需要移除的空白索引
:type blank_index int
:return: 解码后得到的字符串
:rtype: baseline
"""
max_index_list = list(np.array(probs_seq).argmax(axis=1))
max_prob_list = [probs_seq[i][max_index_list[i]] for i in range(len(max_index_list)) if max_index_list[i] != blank_index]
index_list = [index_group[0] for index_group in groupby(max_index_list)]
index_list = [index for index in index_list if index != blank_index]
text = ''.join([vocabulary[index] for index in index_list])
score = 0
if len(max_prob_list) > 0:
score = float(sum(max_prob_list) / len(max_prob_list)) * 100.0
return score, text.replace('<space>', ' ')
按照作者的说法,后处理部分即需要对返回的结果进行解码。解码器是分为2种,有ctc_beam_search , ctc_greedy 。
ctc_greedy
ctc_greedy 分成以下几步:
- 在第二维度即sequence_length维度上面取出最大值的索引,取索引的同时,去掉空格(空格的索引为0,可参考vocabulary.txt文件)。
- 使用贪心算法,即删除相邻的索引相同的值,只保留一个,把重复的值去掉。
其中去除相邻的同索引,作者使用了itertools里面的groupby工具,部分代码如下所示:
index_list = [index_group[0] for index_group in groupby(max_index_list)]
这里如果输入的max_index_list如下所示:
max_index_list = [1, 2, 2, 3, 3, 1, 1, 2]
那么输出就会变成:
[1, 2, 3, 1, 2]
- 对照词汇表,把索引值转换成字符,即为需要输出的结果。
- 算出得分,得分的计算公式是,所有sequence_length维度上面的最大概率和/sequence_length。
样例中的输出结果为:
90.73734008348904 近几年不但我用书给女儿压岁也劝说清朋友不要给女儿压岁钱而改送压岁书
ctc_beam_search
贪心搜索的性能非常受限, 这种方法忽略了一个输出可能对应多个对齐结果。很多时候,如果我们能拿到nearbest的路径,后续可以利用其他信息来进一步优化搜索的结果。束搜索能近似找出 top 最优的若干条路径。
原理:
基本原理是通过ti-1中beamsize个序列,每个序列分别连接ti中beamsize个节点,得到 beamsize个新序列及对应的score,然后按照score从大到小的顺序选出前beamSize个序列,依次推进。
图示:
假设 beamsize 为2 t=1时:
这个时候只会将两个概率最大的节点放进路径集合中,即有两条路径。
t=2时:
上面的两个路径每个路径都会和下一个时间点的每一项组成新的路径,因此一共有2*3=6条路径。
然后我们还是只保留概率最大的两条路径(次大的两个路径相等,这里舍弃掉一个)。
总而言之,每次时刻只保留beam_size条路径即可,到最后就可以生成beam_size条候选的路径。
|