import jieba import torch import math import numpy as np
from collections import Counter from torch.utils.data import DataLoader from random import randrange, shuffle, random, randint from torch import nn from torch import optim
SAMPLE_COUNT = 40 PRED_MAX = 5 PAD_LEN = 60 BATCH_SIZE = 4 HIDDEN_DIM = 100 HEAD_COUNT = 8 HEAD_DIM = 8 BLOCK_COUNT = 6 EPOCHS = 100
text = [ ? ? 'Hello, how are you? I am Romeo.', ? ? 'Hello, Romeo My name is Juliet. Nice to meet you.', ? ? 'Nice meet you too. How are you today?', ? ? 'Great. My baseball team won the competition.', ? ? 'Oh Congratulations, Juliet', ? ? 'Thank you Romeo.', ? ? 'Where are you going today?', ? ? 'I am going shopping. What about you?', ? ? 'I am going to visit my grandmother. she is not very well.' ]
def space(array): ? ? return [x for x in array if x != ' ']
""" 构建词典 """
paragraph = ' '.join(text) counter = Counter(space(jieba.lcut(paragraph))).most_common()
word2id, id2word = dict(), dict() word2id['_cls_'] = 0 word2id['_sep_'] = 1 word2id['_mask_'] = 2 word2id['_pad_'] = 3 for i, (w, freq) in enumerate(counter): ? ? word2id[w] = i + 4 for key, value in word2id.items(): ? ? id2word[value] = key
""" 构建数据集 """
ids = list() for t in text: ? ? ids.append([word2id[w] for w in space(jieba.lcut(t))])
dataset = list() positive = negative = 0 while not(positive == SAMPLE_COUNT/2 and negative == SAMPLE_COUNT/2): ? ? idx_up, idx_down = randrange(len(text)), randrange(len(text)) ? ? ids_up, ids_down = ids[idx_up], ids[idx_down]
? ? insert = {'word': [word2id['_cls_']] + ids_up + [word2id['_sep_']] + ids_down + [word2id['_sep_']], ? ? ? ? ? ? ? 'segment': [0] * (1 + len(ids_up) + 1) + [1] * (len(ids_down) + 1), ? ? ? ? ? ? ? 'position': list(range(PAD_LEN))}
? ? """ 掩码 """ ? ? pred_count = min(PRED_MAX, max(1, int(len(insert['word']) * 0.15))) ? ? idx_maskable = [idx for idx, ids in enumerate(insert['word']) ? ? ? ? ? ? ? ? ? ? if ids != word2id['_cls_'] and ids != word2id['_sep_']] ? ? shuffle(idx_maskable) ? ? ids_masked, idx_masked = list(), list() ? ? for idx in idx_maskable[:pred_count]: ? ? ? ? idx_masked.append(idx) ? ? ? ? ids_masked.append(insert['word'][idx]) ? ? ? ? if random() < 0.8: ? ? ? ? ? ? insert['word'][idx] = word2id['_mask_'] ? ? ? ? elif random() > 0.9: ? ? ? ? ? ? ids_rand = randint(0, len(word2id) - 1) ? ? ? ? ? ? while ids_rand < 4: ? ? ? ? ? ? ? ? ids_rand = randint(0, len(word2id) - 1) ? ? ? ? ? ? insert['word'][idx] = ids_rand
? ? """ 矩阵化 """ ? ? if PRED_MAX > pred_count: ? ? ? ? pad_count = PRED_MAX - pred_count ? ? ? ? idx_masked.extend([0] * pad_count) ? ? ? ? ids_masked.extend([0] * pad_count)
? ? """ 填充 """ ? ? pad_count = PAD_LEN - len(insert['word']) ? ? if len(insert['word']) < PAD_LEN: ? ? ? ? insert['word'].extend([word2id['_pad_']] * pad_count) ? ? ? ? insert['segment'].extend([0] * pad_count) ? ? else: ? ? ? ? insert['word'] = insert['word'][:PAD_LEN] ? ? ? ? insert['segment'] = insert['segment'][:PAD_LEN]
? ? assert len(insert['word']) == PAD_LEN and len(insert['segment']) == PAD_LEN and len(insert['position']) == PAD_LEN
? ? """ 分类 """ ? ? if idx_up + 1 == idx_down and positive < SAMPLE_COUNT/2: ? ? ? ? dataset.append([insert, idx_masked, ids_masked, 1]) ? ? ? ? positive += 1 ? ? elif idx_up + 1 != idx_down and negative < SAMPLE_COUNT/2: ? ? ? ? dataset.append([insert, idx_masked, ids_masked, 0]) ? ? ? ? negative += 1
def pack(batch): ? ? _word, _segment, _position, _idx_mask, _ids_mask, _label = list(), list(), list(), list(), list(), list() ? ? for sample in batch: ? ? ? ? _word.append(sample[0]['word']) ? ? ? ? _segment.append(sample[0]['segment']) ? ? ? ? _position.append(sample[0]['position']) ? ? ? ? _idx_mask.append(sample[1]) ? ? ? ? _ids_mask.append(sample[2]) ? ? ? ? _label.append(sample[3])
? ? return torch.tensor(_word), torch.tensor(_segment), torch.tensor(_position), \ ? ? ? ? ? ?torch.tensor(_idx_mask), torch.tensor(_ids_mask), torch.tensor(_label)
loader = DataLoader(dataset, shuffle=True, batch_size=BATCH_SIZE, collate_fn=pack)
""" 构建模型 """
def gelu(x): ? ? return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))
class BERT(nn.Module): ? ? def __init__(self): ? ? ? ? super(BERT, self).__init__() ? ? ? ? self.emb_word = nn.Embedding(len(word2id), HIDDEN_DIM) ? ? ? ? self.emb_segment = nn.Embedding(2, HIDDEN_DIM) ? ? ? ? self.emb_position = nn.Embedding(PAD_LEN, HIDDEN_DIM) ? ? ? ? self.norm = nn.LayerNorm(HIDDEN_DIM)
? ? ? ? self.Q = nn.Linear(HIDDEN_DIM, HEAD_COUNT * HEAD_DIM) ? ? ? ? self.K = nn.Linear(HIDDEN_DIM, HEAD_COUNT * HEAD_DIM) ? ? ? ? self.V = nn.Linear(HIDDEN_DIM, HEAD_COUNT * HEAD_DIM)
? ? ? ? self.mix = nn.Linear(HEAD_COUNT * HEAD_DIM, HIDDEN_DIM)
? ? ? ? self.layer = nn.Sequential( ? ? ? ? ? ? nn.Linear(HIDDEN_DIM, HIDDEN_DIM), ? ? ? ? ? ? nn.Dropout(0.4), ? ? ? ? ? ? nn.Tanh() ? ? ? ? ) ? ? ? ? self.classifier = nn.Linear(HIDDEN_DIM, 2)
? ? ? ? self.fc1 = nn.Linear(HIDDEN_DIM, HIDDEN_DIM) ? ? ? ? self.fc2 = nn.Linear(HIDDEN_DIM, len(word2id))
? ? def forward(self, _word, _segment, _position, idx_mask): ? ? ? ? """ 词嵌入 """ ? ? ? ? emb = self.norm(self.emb_word(_word) + self.emb_segment(_segment) + self.emb_position(_position))
? ? ? ? for _ in range(BLOCK_COUNT):
? ? ? ? ? ? """ 门掩码 """ ? ? ? ? ? ? door_masked = word.eq(word2id['_pad_'])
? ? ? ? ? ? """ 特征向量 & 扩展到多头 """ ? ? ? ? ? ? q = self.Q(emb).reshape(BATCH_SIZE, HEAD_COUNT, PAD_LEN, HEAD_DIM) ? ? ? ? ? ? k = self.K(emb).reshape(BATCH_SIZE, HEAD_COUNT, PAD_LEN, HEAD_DIM) ? ? ? ? ? ? v = self.V(emb).reshape(BATCH_SIZE, HEAD_COUNT, PAD_LEN, HEAD_DIM)
? ? ? ? ? ? door_masked = door_masked.reshape(BATCH_SIZE, 1, 1, PAD_LEN).repeat(1, HEAD_COUNT, PAD_LEN, 1)
? ? ? ? ? ? """ 计算门 """ ? ? ? ? ? ? door = torch.matmul(q, k.transpose(-1, -2)) / np.sqrt(HIDDEN_DIM) ? ? ? ? ? ? door = door.masked_fill(door_masked, -1e5) ? ? ? ? ? ? door = nn.Softmax(dim=-1)(door)
? ? ? ? ? ? """ 计算值 """ ? ? ? ? ? ? res = torch.matmul(door, v)
? ? ? ? ? ? """ 多头融合 """ ? ? ? ? ? ? res = res.transpose(1, 2).reshape(BATCH_SIZE, PAD_LEN, HEAD_COUNT * HEAD_DIM) ? ? ? ? ? ? res = self.mix(res)
? ? ? ? ? ? """ 后处理 """ ? ? ? ? ? ? emb = self.norm(emb + res)
? ? ? ? prob = self.classifier(self.layer(emb[:, 0]))
? ? ? ? """ 矩阵格式化提取 """ ? ? ? ? form = idx_mask.reshape(BATCH_SIZE, PRED_MAX, 1).repeat(1, 1, HIDDEN_DIM) ? ? ? ? vec_mask1 = torch.gather(emb, 1, form)
? ? ? ? """ 矩阵化排除 """ ? ? ? ? vec_mask2 = list() ? ? ? ? for s in range(idx_mask.size(0)): ? ? ? ? ? ? for k in range(idx_mask.size(1)): ? ? ? ? ? ? ? ? if idx_mask[s][k] == 0: continue ? ? ? ? ? ? ? ? vec_mask2.append(vec_mask1[s][k].tolist()) ? ? ? ? vec_mask2 = torch.tensor(vec_mask2)
? ? ? ? vec_mask = self.fc2(gelu(self.fc1(vec_mask2)))
? ? ? ? return prob, vec_mask
model = BERT() criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=1e-4)
for i in range(EPOCHS): ? ? for j, (word, segment, position, idx_masked, ids_masked, label) in enumerate(loader):
? ? ? ? probability, vec_masked = model(word, segment, position, idx_masked)
? ? ? ? """ 矩阵化排除 """ ? ? ? ? ids_masked = ids_masked.flatten().tolist() ? ? ? ? ids_masked = torch.tensor([x for x in ids_masked if x != 0])
? ? ? ? loss1 = criterion(probability, label) ? ? ? ? loss2 = criterion(vec_masked, ids_masked) ? ? ? ? loss = loss1 + loss2
? ? ? ? optimizer.zero_grad() ? ? ? ? loss.backward() ? ? ? ? optimizer.step()
? ? ? ? print("Epoch: {:d}, Batch: {:d}, Loss: {:4f}".format(i, j, loss)) ? ? print("Epoch {:d} finished".format(i))
|