NLP-Task4:基于LSTM+CRF的序列标注
一、任务介绍
对英文句子进行序列标注,Named Entity Recognition 命名实体标签
EU NNP B-NP B-ORG rejects VBZ B-VP O German JJ B-NP B-MISC call NN I-NP O to TO B-VP O boycott VB I-VP O British JJ B-NP B-MISC lamb NN I-NP O . . O O
数据解释
每行第一项是单词,第二项是词性(POS)标签,第三项是语法块标签,第四项是命名实体标签 详细数据集介绍请参考 CONLL 2003 Data
二、特征提取—Word Embedding
请参考Task2、Task3
三、神经网络(LSTM+CRF)
四、实验设置及代码
- 样本:train.txt 及 test.txt
- 特征提取:Random/ GloVe Embedding
- 学习率:
1
0
?
3
10^{-3}
10?3
-
l
h
,
l
f
l_h, l_f
lh?,lf?:50
- Batch size:128
- 训练轮数:50
代码
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch
import torch.nn as nn
def pre_process(data):
sentences = list()
tags = list()
sentence = list()
tag = list()
for line in data:
if line == '\n':
if sentence:
sentences.append(sentence)
tags.append(tag)
sentence = list()
tag = list()
else:
elements = line.split()
if elements[0] == '-DOCSTART-':
continue
sentence.append(elements[0].upper())
tag.append(elements[-1])
if sentence:
sentences.append(sentence)
tags.append(tag)
return list(zip(sentences, tags))
class Glove_embedding():
def __init__(self, train_zip, test_zip, trained_dict=None):
if trained_dict is None:
trained_dict = dict()
self.dict_words = dict()
self.trained_dict = trained_dict
train_zip.sort(key=lambda x: len(x[0]))
test_zip.sort(key=lambda x: len(x[0]))
self.train_x, self.train_y = zip(*train_zip)
self.test_x, self.test_y = zip(*test_zip)
self.train_x_matrix = list()
self.train_y_matrix = list()
self.test_x_matrix = list()
self.test_y_matrix = list()
self.len_words = 1
self.len_tag = 3
self.longest = 0
self.embedding = list()
self.tag_dict = {'<pad>': 0, '<begin>': 1, '<end>': 2}
def get_words(self):
self.embedding.append([0]*50)
for term in self.train_x:
for word in term:
if word not in self.dict_words:
self.dict_words[word] = len(self.dict_words) + 1
if word in self.trained_dict:
self.embedding.append(self.trained_dict[word])
else:
self.embedding.append([0]*50)
for term in self.test_x:
for word in term:
if word not in self.dict_words:
self.dict_words[word] = len(self.dict_words) + 1
if word in self.trained_dict:
self.embedding.append(self.trained_dict[word])
else:
self.embedding.append([0]*50)
for tags in self.train_y:
for tag in tags:
if tag not in self.tag_dict:
self.tag_dict[tag] = len(self.tag_dict)
for tags in self.test_y:
for tag in tags:
if tag not in self.tag_dict:
self.tag_dict[tag] = len(self.tag_dict)
self.len_tag = len(self.tag_dict)
self.len_words = len(self.dict_words) + 1
def get_id(self):
for term in self.train_x:
item = [self.dict_words[word] for word in term]
self.longest = max(self.longest, len(item))
self.train_x_matrix.append(item)
for term in self.test_x:
item = [self.dict_words[word] for word in term]
self.longest = max(self.longest, len(item))
self.test_x_matrix.append(item)
for tags in self.train_y:
item = [self.tag_dict[tag] for tag in tags]
self.train_y_matrix.append(item)
for tags in self.test_y:
item = [self.tag_dict[tag] for tag in tags]
self.test_y_matrix.append(item)
class ClsDataset(Dataset):
def __init__(self, sentence, tag):
self.sentence = sentence
self.tag = tag
def __getitem__(self, item):
return self.sentence[item], self.tag[item]
def __len__(self):
return len(self.tag)
def collate_fn(batch_data):
sentence, tag = zip(*batch_data)
sentences = [torch.LongTensor(sent) for sent in sentence]
padded_sents = pad_sequence(sentences, batch_first=True, padding_value=0)
tags = [torch.LongTensor(t) for t in tag]
padded_tags = pad_sequence(tags, batch_first=True, padding_value=0)
return torch.LongTensor(padded_sents), torch.LongTensor(padded_tags)
def get_batch(x, y, batch_size):
dataset = ClsDataset(x, y)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, drop_last=True, collate_fn=collate_fn)
return dataloader
import torch.nn as nn
class Named_Entity_Recognition(nn.Module):
def __init__(self, len_feature, len_words, len_hidden, type_num, pad_id, start_id, end_id, weight=None, drop_out=0.5):
super(Named_Entity_Recognition, self).__init__()
self.len_feature = len_feature
self.len_words = len_words
self.len_hidden = len_hidden
self.dropout = nn.Dropout(drop_out)
if weight is None:
x = nn.init.xavier_normal_(torch.Tensor(len_words, len_feature))
self.embedding = nn.Embedding(num_embeddings=len_words, embedding_dim=len_feature, _weight=x).cuda()
else:
self.embedding = nn.Embedding(num_embeddings=len_words, embedding_dim=len_feature, _weight=weight).cuda()
self.lstm = nn.LSTM(input_size=len_feature, hidden_size=len_hidden, batch_first=True, bidirectional=True).cuda()
self.fc = nn.Linear(2*len_hidden, type_num).cuda()
self.crf = CRF(type_num, pad_id, start_id, end_id).cuda()
def forward(self, x, tags, mask):
mask = mask.int()
x = self.embedding(x)
x = self.dropout(x)
self.lstm.flatten_parameters()
x, _ = self.lstm(x)
scores = self.fc(x)
loss = self.crf(scores, tags, mask)
return loss
def predict(self, x, mask):
mask = mask.int()
x = self.embedding(x)
x = self.dropout(x)
self.lstm.flatten_parameters()
x, _ = self.lstm(x)
scores = self.fc(x)
return self.crf.predict(scores, mask)
class CRF(nn.Module):
def __init__(self, type_num, pad_id, start_id, end_id):
super(CRF, self).__init__()
self.type_num = type_num
self.pad_id = pad_id
self.start_id = start_id
self.end_id = end_id
transition = torch.zeros(type_num, type_num)
transition[:, start_id] = -10000.0
transition[end_id, :] = -10000.0
transition[:, pad_id] = -10000.0
transition[pad_id, :] = -10000.0
transition[pad_id, pad_id] = 0.0
transition[pad_id, : end_id] = 0.0
self.transition = nn.Parameter(transition).cuda()
def forward(self, scores, tags, mask):
true_prob = self.true_prob(scores, tags, mask)
total_prob = self.total_prob(scores, mask)
return -torch.sum(true_prob - total_prob)
def true_prob(self, scores, tags, mask):
batch_size, sequence_len = tags.shape
true_prob = torch.zeros(batch_size).cuda()
first_tag = tags[:, 0]
last_tag_index = mask.sum(1) - 1
last_tag = torch.gather(tags, 1, last_tag_index.unsqueeze(1)).squeeze(1)
tran_score = self.transition[self.start_id, first_tag]
tag_score = torch.gather(scores[:, 0], 1, first_tag.unsqueeze(1)).squeeze(1)
true_prob += tran_score + tag_score
for i in range(1, sequence_len):
non_pad = mask[:, i]
pre_tag = tags[:, i-1]
curr_tag = tags[:, i]
tran_score = self.transition[pre_tag, curr_tag]
tag_score = torch.gather(scores[:, i], 1, curr_tag.unsqueeze(1)).squeeze(1)
true_prob += tran_score * non_pad + tag_score * non_pad
true_prob += self.transition[last_tag, self.end_id]
return true_prob
def total_prob(self, scores, mask):
batch_size, sequence_len, num_tags = scores.shape
log_sum_exp_prob = self.transition[self.start_id, :].unsqueeze(0) + scores[:, 0]
for i in range(1, sequence_len):
every_log_sum_exp_prob = list()
for j in range(num_tags):
tran_score = self.transition[:, j].unsqueeze(0)
tag_score = scores[:, i, j].unsqueeze(1)
prob = tran_score + tag_score + log_sum_exp_prob
every_log_sum_exp_prob.append(torch.logsumexp(prob, dim=1))
new_prob = torch.stack(every_log_sum_exp_prob).t()
non_pad = mask[:, i].unsqueeze(-1)
log_sum_exp_prob = non_pad * new_prob + (1 - non_pad) * log_sum_exp_prob
tran_score = self.transition[:, self.end_id].unsqueeze(0)
return torch.logsumexp(log_sum_exp_prob + tran_score, dim=1)
def predict(self, scores, mask):
batch_size, sequence_len, num_tags = scores.shape
total_prob = self.transition[self.start_id, :].unsqueeze(0) + scores[:, 0]
tags = torch.cat([torch.tensor(range(num_tags)).view(1, -1, 1) for _ in range(batch_size)], dim=0).cuda()
for i in range(1, sequence_len):
new_prob = torch.zeros(batch_size, num_tags).cuda()
new_tag = torch.zeros(batch_size, num_tags, 1).cuda()
for j in range(num_tags):
prob = total_prob + self.transition[:, j].unsqueeze(0) + scores[:, i, j].unsqueeze(1)
max_prob, max_tag = torch.max(prob, dim=1)
new_prob[:, j] = max_prob
new_tag[:, j, 0] = max_tag
non_pad = mask[:, i].unsqueeze(-1)
total_prob = non_pad * new_prob + (1-non_pad) * total_prob
non_pad = non_pad.unsqueeze(-1)
temp_tag = torch.cat([torch.tensor(range(num_tags)).view(1, -1, 1) for _ in range(batch_size)], dim=0).cuda()
append_tag = non_pad * temp_tag + (1-non_pad) * torch.ones(batch_size, num_tags, 1).cuda() * self.pad_id
new_tag = new_tag.long()
pre_tag = tags[[ [i]*num_tags for i in range(batch_size)], new_tag[:, :, 0], :]
tags = torch.cat([pre_tag, append_tag], dim=-1)
prob = total_prob + self.transition[:, self.end_id].unsqueeze(0)
_, max_tag = torch.max(prob, dim=1)
return tags[[i for i in range(batch_size)], max_tag]
import matplotlib.pyplot as plt
import torch
from torch import optim
def NN_embdding(model, train, test, learning_rate, iter_times, batch_szie):
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train_loss_record = list()
test_loss_record = list()
train_record = list()
test_record = list()
for iteration in range(iter_times):
model.train()
for i, batch in enumerate(train):
x, y = batch
x = x.cuda()
y = y.cuda()
mask = (y != 0).cuda()
loss = model(x, y, mask).cuda()
optimizer.zero_grad()
loss.backward()
optimizer.step()
model.eval()
train_acc = list()
test_acc = list()
train_loss = 0
test_loss = 0
for i, batch in enumerate(train):
x, y = batch
x = x.cuda()
y = y.cuda()
mask = (y != 0).cuda()
loss = model(x, y, mask).cuda()
train_loss += loss.item() / batch_size / y.shape[1]
pred = model.predict(x, mask)
acc = (pred == y).float()
len_batch, len_seq = acc.shape
points = torch.ones((1, len_batch)).cuda()
for j in range(len_seq):
points *= acc[:, j]
train_acc.append(points.mean())
for i, batch in enumerate(test):
x, y = batch
x = x.cuda()
y = y.cuda()
mask = (y != 0).cuda()
loss = model(x, y, mask).cuda()
test_loss += loss.item() / batch_size / y.shape[1]
pred = model.predict(x, mask)
acc = (pred == y).float()
len_batch, len_seq = acc.shape
points = torch.ones((1, len_batch)).cuda()
for j in range(len_seq):
points *= acc[:, j]
test_acc.append(points.mean())
trains_acc = sum(train_acc) / len(train_acc)
tests_acc = sum(test_acc) / len(test_acc)
train_loss_record.append(train_loss / len(train))
test_loss_record.append(test_loss / len(test))
train_record.append(trains_acc)
test_record.append(tests_acc)
print('----------Iteration', iteration + 1, '----------')
print('Train Loss:', train_loss / len(train))
print('Test Loss :', test_loss / len(test))
print('Train accuracy:', trains_acc)
print('Test accuracy :', tests_acc)
return train_loss_record, test_loss_record, train_record, test_record
def NN_plot(random_embedding, glove_embedding, len_feature, len_hidden, learning_rate, batch_size, iter_times):
train_random = get_batch(random_embedding.train_x_matrix, random_embedding.train_y_matrix, batch_size)
test_random = get_batch(random_embedding.test_x_matrix, random_embedding.test_y_matrix, batch_size)
train_glove = get_batch(glove_embedding.train_x_matrix, glove_embedding.train_y_matrix, batch_size)
test_glove = get_batch(glove_embedding.test_x_matrix, glove_embedding.test_y_matrix, batch_size)
random_model = Named_Entity_Recognition(len_feature, random_embedding.len_words, len_hidden,
random_embedding.len_tag, 0, 1, 2)
glove_model = Named_Entity_Recognition(len_feature, random_embedding.len_words, len_hidden,
random_embedding.len_tag, 0, 1, 2, weight=torch.tensor(glove_embedding.embedding, dtype=torch.float))
trl_ran, tsl_ran, tra_ran, tea_ran = NN_embdding(random_model, train_random, test_random, learning_rate, iter_times, batch_size)
trl_glo, tsl_glo, tra_glo, tea_glo = NN_embdding(glove_model, train_glove, test_glove, learning_rate, iter_times, batch_size)
x = list(range(1, iter_times + 1))
plt.subplot(2, 2, 1)
plt.plot(x, trl_ran, 'r--', label='random')
plt.plot(x, trl_glo, 'g--', label='glove')
plt.legend(fontsize=10)
plt.title('Train Loss')
plt.xlabel('Iterations')
plt.ylabel('Loss')
plt.subplot(2, 2, 2)
plt.plot(x, tsl_ran, 'r--', label='random')
plt.plot(x, tsl_glo, 'g--', label='glove')
plt.legend(fontsize=10)
plt.title('Test Loss')
plt.xlabel('Iterations')
plt.ylabel('Loss')
plt.subplot(2, 2, 3)
plt.plot(x, tra_ran, 'r--', label='random')
plt.plot(x, tra_glo, 'g--', label='glove')
plt.legend(fontsize=10)
plt.title('Train Accuracy')
plt.xlabel('Iterations')
plt.ylabel('Accuracy')
plt.ylim(0, 1)
plt.subplot(2, 2, 4)
plt.plot(x, tea_ran, 'r--', label='random')
plt.plot(x, tea_glo, 'g--', label='glove')
plt.legend(fontsize=10)
plt.title('Test Accuracy')
plt.xlabel('Iterations')
plt.ylabel('Accuracy')
plt.ylim(0, 1)
plt.tight_layout()
fig = plt.gcf()
fig.set_size_inches(8, 8, forward=True)
plt.savefig('main_plot.jpg')
plt.show()
import random
import numpy as np
random.seed(2021)
np.random.seed(2021)
torch.cuda.manual_seed(2021)
torch.manual_seed(2021)
with open('../input/conll003-englishversion/train.txt', 'r') as f:
temp = f.readlines()
data = temp[2:]
train_zip = pre_process(data)
with open('../input/conll003-englishversion/test.txt', 'r') as f:
temp = f.readlines()
data = temp[2:]
test_zip = pre_process(data)
with open('../input/glove6b50dtxt/glove.6B.50d.txt', 'rb') as f:
lines = f.readlines()
trained_dict = dict()
n = len(lines)
for i in range(n):
line = lines[i].split()
trained_dict[line[0].decode('utf-8').upper()] = [float(line[j]) for j in range(1, 51)]
random_embedding = Glove_embedding(train_zip, test_zip, trained_dict=None)
random_embedding.get_words()
random_embedding.get_id()
glove_embedding = Glove_embedding(train_zip, test_zip, trained_dict=trained_dict)
glove_embedding.get_words()
glove_embedding.get_id()
iter_times = 50
learning_rate = 0.001
batch_size = 100
NN_plot(random_embedding, glove_embedding, 50, 50, learning_rate, batch_size, iter_times)
|