BERT
程序步骤
- 设置基本变量值,数据预处理
- 构建输入样本
- 在样本集中随机选取a和b两个句子
- 把ab两个句子合并为1个模型输入句,在句首加入分类符CLS,在ab中间和句末加入分隔符SEP
- 在模型输入句中随机选取15%单词准备用于mask, 再在这个些选中的单词中,按照论文策略进行mask
- 把所有存储单词的变量都填充至最大长度(有利于统一处理)
- 判断句间关系(ab是否相邻)
- 构建BERT模型
- 按照论文图2构建输入encoder的嵌入矩阵,
Embedding: input = word + position + segment - 根据transformer的方法对输入句子进行padding mask, 在注意力机制中屏蔽掉填充字符
- 根据transformer模型,输入编码器,得到输出
- Task1: 根据记录的mask单词位置,从输出中提取相应位置模型预测的各单词概率,经过激活全连接归一化等处理,得到最终的语言模型(即一个嵌入矩阵)
- Task2: 从输出中提取第一列的元素值(即输入向量CLS对应的输出元素),经过激活全连接归一化等处理,得到最终分类输出
- 训练: 求任务1的损失值loss_lm和任务2的损失值loss_clsf的和,然后进行梯度下降,直到连续三次的loss都小于阈值0.01
- 测试: 遍历每个样本句子对,将其输入模型,从返回的logits_lm和logits_clsf中选择每行最大值,即为预测结果.
"""
Task: 基于BERT的完形填空和句间关系判断
Author: ChengJunkai @github.com/Cheng0829
Email: chengjunkai829@gmail.com
Date: 2022/09/21
Reference: Tae Hwan Jung(Jeff Jung) @graykode
"""
import math, re, random, torch, time, os, sys
import numpy as np
import torch.nn as nn
import torch.optim as optim
def get_attn_pad_mask(seq_q, seq_k):
'''mask大小和(len_q,len_k)一致,
是为了在点积注意力中,与torch.matmul(Q,K)的大小一致'''
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
"""Tensor.data.eq(element)
eq即equal,对Tensor中所有元素进行判断,和element相等即为True,否则为False,返回二值矩阵
Examples:
>>> tensor = torch.FloatTensor([[1, 2, 3], [4, 5, 6]])
>>> tensor.data.eq(1)
tensor([[ True, False, False],
[False, False, False]])
"""
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
return pad_attn_mask.expand(batch_size, len_q, len_k)
'''Attention = Softmax(Q * K^T) * V '''
def Scaled_Dot_Product_Attention(Q, K, V, attn_mask):
"""torch.matmul(Q, K)
torch.matmul是tensor的乘法,输入可以是高维的.
当输入是都是二维时,就是普通的矩阵乘法.
当输入有多维时,把多出的一维作为batch提出来,其他部分做矩阵乘法.
Exeamples:
>>> a = torch.ones(3,4)
>>> b = torch.ones(4,2)
>>> torch.matmul(a,b).shape
torch.Size([3,2])
>>> a = torch.ones(5,3,4)
>>> b = torch.ones(4,2)
>>> torch.matmul(a,b).shape
torch.Size([5,3,2])
>>> a = torch.ones(2,5,3)
>>> b = torch.ones(1,3,4)
>>> torch.matmul(a,b).shape
torch.Size([2,5,4])
"""
scores = torch.matmul(Q, K.transpose(2,3)) / np.sqrt(d_k)
"""scores.masked_fill_(attn_mask, -1e9)
由于scores和attn_mask维度相同,根据attn_mask中的元素值,把和attn_mask中值为True的元素的
位置相同的scores元素的值赋为-1e9
"""
scores.masked_fill_(attn_mask, -1e9)
softmax = nn.Softmax(dim=-1)
attn = softmax(scores)
context = torch.matmul(attn, V)
return context, attn
class MultiHeadAttention(nn.Module):
def __init__(self):
super().__init__()
self.W_Q = nn.Linear(d_model, d_k*n_heads)
self.W_K = nn.Linear(d_model, d_k*n_heads)
self.W_V = nn.Linear(d_model, d_v*n_heads)
self.linear = nn.Linear(n_heads*d_v, d_model)
self.layer_norm = nn.LayerNorm(d_model)
def forward(self, Q, K, V, attn_mask):
residual, batch_size = Q, len(Q)
'''用n_heads=8把512拆成64*8,在不改变计算成本的前提下,让各注意力头相互独立,更有利于学习到不同的特征'''
Q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)
K_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2)
V_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2)
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
context, attn = Scaled_Dot_Product_Attention(Q_s, K_s, V_s, attn_mask)
"""contiguous() 连续的
contiguous: view只能用在连续(contiguous)的变量上.
如果在view之前用了transpose, permute等,
需要用contiguous()来返回一个contiguous copy
"""
context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v)
output = self.linear(context)
"""nn.LayerNorm(output) 样本归一化
和对所有样本的某一特征进行归一化的BatchNorm不同,
LayerNorm是对每个样本进行归一化,而不是一个特征
Tips:
归一化Normalization和Standardization标准化区别:
Normalization(X[i]) = (X[i] - np.min(X)) / (np.max(X) - np.min(X))
Standardization(X[i]) = (X[i] - np.mean(X)) / np.var(X)
"""
output = self.layer_norm(output + residual)
return output, attn
class Position_wise_Feed_Forward_Networks(nn.Module):
def __init__(self):
super().__init__()
'''输出层相当于1*1卷积层,也就是全连接层'''
"""nn.Conv1d
in_channels应该理解为嵌入向量维度,out_channels才是卷积核的个数(厚度)
"""
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)
self.layer_norm = nn.LayerNorm(d_model)
def forward(self, inputs):
residual = inputs
relu = nn.ReLU()
output = relu(self.conv1(inputs.transpose(1, 2)))
output = self.conv2(output).transpose(1, 2)
return self.layer_norm(output + residual)
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_attn = MultiHeadAttention()
self.pos_ffn = Position_wise_Feed_Forward_Networks()
def forward(self, enc_outputs, enc_attn_mask):
enc_outputs, attn = self.enc_attn(enc_outputs, \
enc_outputs, enc_outputs, enc_attn_mask)
enc_outputs = self.pos_ffn(enc_outputs)
return enc_outputs, attn
"""*********以上为Transformer架构代码*******************"""
'''1.数据预处理'''
def pre_process(text):
sentences = re.sub("[.,!?\\-]", '', text.lower()).split('\n')
word_sequence = " ".join(sentences).split()
word_list = []
'''
如果用list(set(word_sequence))来去重,得到的将是一个随机顺序的列表(因为set无序),
这样得到的字典不同,保存的上一次训练的模型很有可能在这一次不能用
(比如上一次的模型预测碰见i:0,love:1,就输出you:2,但这次模型you在字典3号位置,也就无法输出正确结果)
'''
for word in word_sequence:
if word not in word_list:
word_list.append(word)
word_dict = {'[PAD]': 0, '[CLS]': 1, '[SEP]': 2, '[MASK]': 3}
for i, w in enumerate(word_list):
word_dict[w] = i + 4
number_dict = {i:w for i,w in enumerate(word_dict)}
vocab_size = len(word_dict)
sentences_list = []
for sentence in sentences:
arr = [word_dict[s] for s in sentence.split()]
sentences_list.append(arr)
print(word_dict)
return sentences, word_list, word_dict, number_dict, vocab_size, sentences_list
'''论文所用激活函数,在BERT中优于RELU'''
def gelu(x):
"Implementation of the gelu activate_ation function by Hugging Face"
return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))
'''根据句子数据,构建词元的输入向量'''
def make_batch():
batch = []
positive = negative = 0
a_b = []
'''由于随机选取句子,所以可能依照pt文件中已训练的模型也误差较大'''
while positive != batch_size/2 or negative != batch_size/2:
"""random.randrange(start, stop=None, step=1)
在(start, stop)范围内随即返回一个数
Args:
start: 指定范围内的开始值,包含在范围内
stop: 指定范围内的结束值,不包含在范围内,若无参数stop则默认范围为0~start
step: 指定递增基数,默认为1
Examples:
>>> random.randrange(1, 100, 2)
67 # 以2递增,只能返回奇数
"""
sentence_a_index, sentence_b_index = random.randrange(len(sentences)), random.randrange(len(sentences))
sentence_a, sentence_b = sentences_list[sentence_a_index], sentences_list[sentence_b_index]
if (sentence_a_index, sentence_b_index) not in a_b:
a_b.append((sentence_a_index, sentence_b_index))
elif len(a_b) < batch_size:
continue
else:
break
input_ids = [word_dict['[CLS]']] + sentence_a + [word_dict['[SEP]']] + sentence_b + [word_dict['[SEP]']]
segment_ids = [0] * (1 + len(sentence_a) + 1) + [1] * (len(sentence_b) + 1)
n_pred = int(0.15 * len(input_ids))
n_pred = min(max_words_pred, max(1, n_pred))
candidate_mask_tokens = [i for i, token in enumerate(input_ids) if token != word_dict['[CLS]']
and token != word_dict['[SEP]']]
random.shuffle(candidate_mask_tokens)
masked_tokens, masked_pos = [], []
for pos in candidate_mask_tokens[:n_pred]:
masked_pos.append(pos)
masked_tokens.append(input_ids[pos])
if random.random() < 0.8:
input_ids[pos] = word_dict['[MASK]']
elif random.random() < 0.5:
random_index = random.randint(0, vocab_size - 1)
input_ids[pos] = word_dict[number_dict[random_index]]
'''把所有存储单词的变量都填充至最大长度,有利于统一处理.'''
n_pad = max_len - len(input_ids)
input_ids.extend([0] * n_pad)
segment_ids.extend([0] * n_pad)
if max_words_pred > n_pred:
n_pad = max_words_pred - n_pred
masked_tokens.extend([0] * n_pad)
masked_pos.extend([0] * n_pad)
if sentence_a_index + 1 == sentence_b_index and positive < batch_size/2:
batch.append([input_ids, segment_ids, masked_tokens, masked_pos, True])
positive += 1
elif sentence_a_index + 1 != sentence_b_index and negative < batch_size/2:
batch.append([input_ids, segment_ids, masked_tokens, masked_pos, False])
negative += 1
return batch
'''论文图2所设计的输入encoder的嵌入向量'''
class Embedding(nn.Module):
def __init__(self):
super().__init__()
self.tok_embed = nn.Embedding(vocab_size, d_model)
self.pos_embed = nn.Embedding(max_len, d_model)
self.seg_embed = nn.Embedding(n_segments, d_model)
self.norm = nn.LayerNorm(d_model)
def forward(self, x, seg):
seq_len = x.size(1)
pos = torch.arange(seq_len, dtype=torch.long).to(device)
'''
expand_as/expand拓展后,每一个(1,seq_len)都是同步变化的,不能单独修改某一个
单独修改某一个只能用repeat(重复倍数)
'''
pos = pos.unsqueeze(0).expand_as(x)
'''
虽然tok_embed是(29,768),但embedding首参数是指单词类别,x虽然长度=30>29,
但是单词总类别依然肯定不大于29,所以tok_embed可以作为x的嵌入矩阵
seg_embed同理
'''
'''论文图2, Embedding: input = word + position + segment'''
embedding = self.tok_embed(x) + self.pos_embed(pos) + self.seg_embed(seg)
return self.norm(embedding)
'''2.构建模型'''
class BERT(nn.Module):
def __init__(self):
super().__init__()
self.embedding = Embedding()
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
self.fc = nn.Linear(d_model, d_model)
self.norm = nn.LayerNorm(d_model)
self.classifier = nn.Linear(d_model, 2)
self.activate_1 = gelu
self.activate_2 = nn.Tanh()
token_embed_weight = self.embedding.tok_embed.weight
vocab_size, n_dim = token_embed_weight.size()
self.decoder = nn.Linear(n_dim, vocab_size, bias=False)
self.decoder.weight = token_embed_weight
self.decoder_bias = nn.Parameter(torch.zeros(vocab_size))
def forward(self, input_ids, segment_ids, masked_pos):
output = self.embedding(input_ids, segment_ids).to(device)
enc_self_attn_mask = get_attn_pad_mask(input_ids, input_ids)
for layer in self.layers:
output, enc_self_attn = layer(output, enc_self_attn_mask)
'''Task1'''
masked_pos = masked_pos[:, :, None].expand(-1, -1, output.size(-1))
"""torch.gather() 收集输入的特定维度指定位置的数值
Args:
input(tensor): 待操作数.不妨设其维度为(x1, x2, ···, xn)
dim(int): 待操作的维度
index(LongTensor): 如何对input进行操作
"""
predict_masked = torch.gather(output, 1, masked_pos)
predict_masked = self.norm(self.activate_1(self.fc(predict_masked)))
'''所谓的语言模型就是vocab_size个单词的嵌入矩阵'''
logits_lm = self.decoder(predict_masked) + self.decoder_bias
'''Task2'''
'''只训练第一列? CLS能训练? input是cls,但输出不是'''
h_pooled = self.activate_2(self.fc(output[:, 0]))
logits_clsf = self.classifier(h_pooled)
return logits_lm, logits_clsf
if __name__ == '__main__':
device = ['cuda:0' if torch.cuda.is_available() else 'cpu'][0]
max_len = 30
batch_size = 10
max_words_pred = 5
n_layers = 12
n_heads = 12
d_model = 768
d_ff = 4*d_model
d_k = d_v = 64
n_segments = 2
text = (
'Hello, how are you? I am Romeo.\n'
'Hello, Romeo My name is Juliet. Nice to meet you.\n'
'Nice meet you too. How are you today?\n'
'Great. My baseball team won the competition.\n'
'Oh Congratulations, Juliet\n'
'Thanks you Romeo'
)
'''1.数据预处理'''
sentences, word_list, word_dict, number_dict, vocab_size, sentences_list = pre_process(text)
batch = make_batch()
'''2.构建模型'''
model = BERT()
model.to(device)
criterion = nn.CrossEntropyLoss()
'''lr如果是0.001,将会很快陷入局部收敛!'''
optimizer = optim.AdamW(model.parameters(), lr=0.0001)
if os.path.exists('model_param.pt') == True:
model.load_state_dict(torch.load('model_param.pt', map_location=device))
"""map(function, iterable, ...)
map()会根据提供的函数对指定序列做映射。第一个参数function以参数序列中的每一个元素调用function函数,
返回包含每次function函数返回值的新列表。
Args:
function: 函数
iterable: 一个或多个序列
"""
"""zip([iterable, ...])
用于将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表.
Examples
>>> a = [1,2,3]
>>> b = [4,5,6]
>>> list(zip([a,b]))
[(1, 4), (2, 5), (3, 6)]
>>> c = [a,b] # [[1,2,3],[4,5,6]]
>>> list(zip(*c))
[(1, 4), (2, 5), (3, 6)]
>>> list(zip(c))
[([1, 2, 3],), ([4, 5, 6],)]
"""
input_ids, segment_ids, masked_tokens, masked_pos, isNext = map(torch.LongTensor, zip(*batch))
input_ids, segment_ids, masked_tokens, masked_pos, isNext = \
input_ids.to(device), segment_ids.to(device), masked_tokens.to(device), masked_pos.to(device), isNext.to(device)
'''3.训练'''
print('{}\nTrain\n{}'.format('*'*30, '*'*30))
loss_record = []
for epoch in range(1000):
optimizer.zero_grad()
logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos)
"""nn.CrossEntropyLoss()(input,target)
如果target是一维,则只要求input.size(0)=target.size
如果是多维,那么要求input.size(0)=target.size(0)且input.size(-1)=target.size(-1)
"""
loss_lm = criterion(logits_lm.transpose(1, 2), masked_tokens)
loss_clsf = criterion(logits_clsf, isNext)
loss = loss_lm + loss_clsf
loss.backward()
optimizer.step()
if loss >= 0.01:
loss_record = []
else:
loss_record.append(loss.item())
if len(loss_record) == 30:
torch.save(model.state_dict(), 'model_param.pt')
break
if (epoch + 1) % 10 == 0:
print('Epoch:', '%04d' % (epoch + 1), 'Loss = {:.6f}'.format(loss))
if(epoch + 1) % 100 ==0:
torch.save(model.state_dict(), 'model_param.pt')
'''4.测试'''
print('{}\nTest\n{}'.format('*'*30, '*'*30))
print('text:\n%s'%text)
for i in range(len(batch)):
print('*'*30)
input_ids, segment_ids, masked_tokens, masked_pos, isNext = map(torch.LongTensor, zip(batch[i]))
input_ids, segment_ids, masked_tokens, masked_pos, isNext = \
input_ids.to(device), segment_ids.to(device), masked_tokens.to(device), masked_pos.to(device), isNext.to(device)
print('input_ids :', [number_dict[w.item()] for w in input_ids[0] if w.item() != 0])
logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos)
logits_lm, logits_clsf = logits_lm.to('cpu'), logits_clsf.to('cpu')
logits_lm = logits_lm.data.max(2)[1][0].data.numpy()
print('real masked tokens list :', \
[(pos.item(), number_dict[pos.item()]) for pos in masked_tokens[0] if pos.item() != 0])
print('predict masked tokens list :', \
[(pos, number_dict[pos]) for pos in logits_lm if pos != 0])
logits_clsf = logits_clsf.data.max(1)[1].data.numpy()[0]
print('Real isNext :', bool(isNext))
print('predict isNext :', bool(logits_clsf))
|