背景
前文介绍了【NLP】命名实体识别——IDCNN-CRF论文阅读与总结,【NLP】基于Pytorch lightning与BiLSTM-CRF的NER实现 也实现了相关模型。在GitHub看了一圈,IDCNN基本上都是Tensorflow实现了,现在我来实现一波,看看效果。源码已经上传到我的GitHub上:https://github.com/Htring/IDCNN-CRF_NER_PL,有兴趣的看以瞅瞅哦。
数据来源
本程序数据来源于:https://github.com/luopeixiang/named_entity_recognition.
为了能够使用seqeval工具评估模型效果,将原始数据中“M-”开头的标签处理为“I-”.
程序结构
程序设计结构依然像以往的形式,包括如下三个模块:
数据处理模块:dataloader.py 模型实现模块: idcnn.py 模型训练模块:idcnn_crf_pl.py 模型训练和模型使用模块:trainner.py
数据处理模块
数据处理模块复用了BiLSTM进行NER任务的程序,如下:
import json
import os
from typing import Optional, Any
import pytorch_lightning as pl
from pytorch_lightning.utilities.types import TRAIN_DATALOADERS, EVAL_DATALOADERS
from torchtext.legacy import data, datasets
class NERDataModule(pl.LightningDataModule):
def __init__(self, data_dir="data/corpus", batch_size=128, experiment=False):
super().__init__()
self.data_path = data_dir
self.batch_size = batch_size
self.experiment = experiment
self.setup()
def setup(self, stage: Optional[str] = None) -> None:
WORD = data.Field(batch_first=True, include_lengths=True)
TAG = data.Field(batch_first=True, include_lengths=True)
train_set, val_set, test_set = datasets.UDPOS.splits(path=self.data_path,
train='train.char.bmes',
validation='dev.char.bmes',
test='test.char.bmes',
fields=(('word', WORD), ('tag', TAG)),
separator=' ')
if self.experiment:
train_set.examples = train_set.examples[: 1000]
val_set.examples = val_set.examples[: 1000]
test_set.examples = test_set.examples[: 1000]
WORD.build_vocab(train_set.word, val_set.word, test_set.word)
TAG.build_vocab(train_set.tag, val_set.tag, test_set.tag)
self.train_iter, self.val_iter, self.test_iter = data.BucketIterator.splits(
(train_set, val_set, test_set),
batch_size=self.batch_size,
sort_within_batch=True,
shuffle=True
)
self.char2idx = WORD.vocab.stoi
self.id2char = WORD.vocab.itos
self.tag2idx = TAG.vocab.stoi
self.idx2tag = {index: value for index, value in enumerate(TAG.vocab.itos)}
self.tag_size = len(TAG.vocab.stoi)
self.word_size = len(WORD.vocab.stoi)
self.vocab_size = self.word_size
def on_before_batch_transfer(self, batch: Any, dataloader_idx: int) -> Any:
x = batch.word[0]
y = batch.tag[0]
real_length = batch.word[1]
return x, y, real_length
def train_dataloader(self) -> TRAIN_DATALOADERS:
return self.train_iter
def test_dataloader(self) -> EVAL_DATALOADERS:
return self.test_iter
def val_dataloader(self) -> EVAL_DATALOADERS:
return self.val_iter
def save_dict(self, data_dir):
with open(os.path.join(data_dir, "index2tag.txt"), 'w', encoding='utf8') as writer:
json.dump(self.idx2tag, writer, ensure_ascii=False)
with open(os.path.join(data_dir, "token2index.txt"), 'w', encoding='utf8') as writer:
json.dump(self.char2idx, writer, ensure_ascii=False)
模型实现模块
程序实现如下:
from argparse import ArgumentParser
import torch
from torch import nn
import torch.nn.functional as F
class IDCNN(nn.Module):
@staticmethod
def add_model_specific_args(parent_parser):
parser = ArgumentParser(parents=[parent_parser], add_help=False)
parser.add_argument("--lr", type=float, default=5e-03)
parser.add_argument("--block", type=int, default=1)
parser.add_argument('--hidden_dim', type=int, default=32)
parser.add_argument('--data_path', type=str, default="data/corpus")
parser.add_argument("--dropout", type=float, default=0.5)
parser.add_argument("--weight_decay", type=float, default=9e-3)
parser.add_argument("--char_embedding_size", type=int, default=60)
parser.add_argument("--experiment", type=bool, default=False)
return parser
def __init__(self,
token_vocab_size,
num_labels,
token_embedding_dim=128,
cnn_kernel_size=3,
cnn_num_filters=128,
input_dropout=0.5,
middle_dropout=0.2,
hidden_dropout=0.2,
blocks=1,
dilation_l=None,
embedding_pad_idx=0,
drop_penalty=1e-4
):
super().__init__()
if dilation_l is None:
dilation_l = [1, 2, 1]
self.num_blocks = blocks
self.dilation_l = dilation_l
self.drop_penalty = drop_penalty
self.num_labels = num_labels
self.padding_idx = embedding_pad_idx
self.token_embedding_dim = token_embedding_dim
self.token_embedding = nn.Embedding(token_vocab_size,
self.token_embedding_dim,
padding_idx=embedding_pad_idx)
self.filters = cnn_num_filters
padding_word = int(cnn_kernel_size / 2)
self.conv0 = nn.Conv1d(in_channels=token_embedding_dim,
out_channels=self.filters,
kernel_size=cnn_kernel_size,
padding=padding_word)
self.cov_layers = nn.ModuleList([
nn.Conv1d(in_channels=cnn_num_filters,
out_channels=cnn_num_filters,
kernel_size=cnn_kernel_size,
padding=padding_word*dilation,
dilation=dilation) for dilation in dilation_l
])
self.conv_layers_size = len(self.cov_layers)
self.dense = nn.Linear(in_features=(cnn_num_filters*blocks),
out_features=num_labels)
self.i_drop = nn.Dropout(input_dropout)
self.m_drop = nn.Dropout(middle_dropout)
self.h_drop = nn.Dropout(hidden_dropout)
def forward(self, feature):
feature = self.token_embedding(feature)
feature = self.i_drop(feature)
feature = feature.permute(0, 2, 1)
conv0 = self.conv0(feature)
conv0 = F.relu(conv0)
conv_layer = conv0
conv_outputs = []
for _ in range(self.num_blocks):
for j, mdv in enumerate(self.cov_layers):
conv_layer = mdv(conv_layer)
conv_layer = F.relu(conv_layer)
if j == self.conv_layers_size - 1:
conv_layer = self.m_drop(conv_layer)
conv_outputs.append(conv_layer)
layer_concat = torch.cat(conv_outputs, 1)
layer_concat = layer_concat.permute(0, 2, 1)
return self.dense(layer_concat)
这个模型实现需要说明的是,经过embedding处理后语句先进入一个卷积神经网络进行将特征进行处理转成经过dilation层的输入。在进行dilation时需要注意,经过卷积后我们的语句长度要保持原来的不变,因为每个token最有都需要有一个打分,那么这时就需要对输入进行pad。
在每一个block中进行dilation,选取最后一个dilation结果作为当前这个block的输出。最后将所有的block的结构拼接起来经过一个全连接层输出即可。乍一看是不是很简单呢?难怪网上使用Pytorch实现的程序那么少,原来是太简单了。
模型训练的封装
这里依然使用Pytorch_ligtning对模型的训练进行封装。具体代码如下:
import torch
from typing import Union, Dict, List, Optional
from pytorch_lightning.utilities.types import STEP_OUTPUT
from seqeval.metrics import classification_report
from seqeval.metrics import f1_score
from pytorch_lightning import LightningModule
from torch.optim import RAdam
from torch import nn, Tensor
from torchcrf import CRF
from model.idcnn import IDCNN
class IDCNN_CRF(LightningModule):
def __init__(self, hparams):
super().__init__()
self.hyper_params = hparams
self.lr = hparams.lr
self.word_emb = nn.Embedding(self.hyper_params.vocab_size, self.hyper_params.char_embedding_size)
self.id2char = self.hyper_params.id2char
self.idx2tag = self.hyper_params.idx2tag
self.idcnn = IDCNN(token_vocab_size=len(self.id2char),
num_labels=len(self.idx2tag),
token_embedding_dim=self.hyper_params.char_embedding_size,
blocks=self.hyper_params.block
)
self.crf = CRF(num_tags=self.hyper_params.tag_size, batch_first=True)
self.dropout = nn.Dropout(self.hyper_params.dropout)
def configure_optimizers(self):
"""
配置优化器
:return:
"""
optimizer = RAdam(self.parameters(),
lr=self.lr,
weight_decay=self.hyper_params.weight_decay)
return optimizer
def forward_train(self, sentences_idx, tags_idx):
"""
model train
:param sentences_idx:
:param tags_idx:
:return:
"""
feats = self.idcnn(sentences_idx)
mask = tags_idx != 1
loss = self.crf(feats, tags_idx, mask=mask, reduction='mean')
return -loss
def _get_batch_info(self, batch):
this_batch_size = batch.word[0].size()[0]
sentences_idx = batch.word[0].view(this_batch_size, -1)
tags = batch.tag[0].view(this_batch_size, -1)
sentences_length = batch.word[1]
return sentences_idx, tags, sentences_length
def forward(self, sentences_idx):
"""
模型落地推理
:param sentences_idx:
:return:
"""
return self._decode(sentences_idx)
def _decode(self, sentences_idx):
"""
模型实际预测函数
:param sentences_idx:
:return:
"""
feats = self.idcnn(sentences_idx)
results = self.crf.decode(feats)
result_tensor = []
for result in results:
result_tensor.append(torch.tensor(result))
return torch.stack(result_tensor)
def training_step(self, batch, batch_idx, optimizer_idx=None) -> Union[int,
Dict[str, Union[Tensor, Dict[str, Tensor]]]]:
"""
模型训练的前向传播过程
:param batch:批次数据
:param batch_idx:
:param optimizer_idx:
:return:
"""
sentences_idx, tags, sentences_length = batch
loss = self.forward_train(sentences_idx, tags)
res = {"log": {"loss": loss}, "loss": loss}
return res
def validation_step(self, batch, batch_idx) -> Union[int, Dict[str, Union[Tensor, Dict[str, Tensor]]]]:
"""
开发集数据验证过程
:param batch: 批次数据
:param batch_idx:
:return:
"""
sentences_idx, tags, sentences_lengths = batch
loss = self.forward_train(sentences_idx, tags)
loss = loss.mean()
return {"sentence_lengths": sentences_lengths, 'sentence': sentences_idx, "target": tags,
"pred": self._decode(sentences_idx), "loss": loss}
def validation_epoch_end(self, outputs: Union[List[Dict[str, Tensor]],
List[List[Dict[str, Tensor]]]]) -> Dict[str, Dict[str, Tensor]]:
"""
验证数据集
:param outputs: 所有batch预测结果 validation_step的返回值构成的一个list
:return:
"""
return self._decode_epoch_end(outputs)
def _decode_epoch_end(self, outputs: Union[List[Dict[str, Tensor]],
List[List[Dict[str, Tensor]]]]) -> Dict[str, Dict[str, Tensor]]:
"""
对批次预测的结果进行整理,评估对应的结果
:return:
"""
ner_results = []
gold_list, pred_list = [], []
for batch_result in outputs:
batch_size = batch_result['sentence_lengths'].shape[0]
for i in range(batch_size):
res = []
sentence_gold, sentence_pred = [], []
for j in range(batch_result['sentence_lengths'][i].item()):
char = self.id2char[batch_result['sentence'][i][j]]
gold = self.idx2tag.get(batch_result['target'][i][j].item())
pred = self.idx2tag.get(batch_result['pred'][i][j].item())
if gold == "<pad>":
break
res.append(" ".join([char, gold, pred]))
sentence_gold.append(gold)
sentence_pred.append(pred)
ner_results.append(res)
gold_list.append(sentence_gold)
pred_list.append(sentence_pred)
print("\n", classification_report(gold_list, pred_list))
f1 = torch.tensor(f1_score(gold_list, pred_list))
tqdm_dict = {'val_f1': f1}
results = {"progress_bar": tqdm_dict, "log": {'val_f1': f1, "step": self.current_epoch}}
self.log("val_f1", f1)
return results
def test_step(self, batch, batch_idx) -> Optional[STEP_OUTPUT]:
"""
程序测试模块
:param batch:
:param batch_idx:
:return:
"""
sentences_idx, tags, sentences_lengths = batch
loss = self.forward_train(sentences_idx, tags)
loss = loss.mean()
return {"sentence_lengths": sentences_lengths,
'sentence': sentences_idx, "target": tags,
"pred": self._decode(sentences_idx), "loss": loss}
def test_epoch_end(self, outputs: Union[List[Dict[str, Tensor]],
List[List[Dict[str, Tensor]]]]) -> Dict[str, Dict[str, Tensor]]:
"""
测试集的评估
:param outputs:测试集batch预测完成结果
:return:
"""
return self._decode_epoch_end(outputs)
模型训练和模型使用模块
模型训练和使用程序如下,了解过之前写的代码,这里就无需多言了。
import json
import os
from argparse import ArgumentParser
import torch
from pytorch_lightning import Trainer
from model.idcnn_crf_pl import IDCNN_CRF
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor
import pytorch_lightning as pl
from dataloader import NERDataModule
from model.idcnn import IDCNN
pl.seed_everything(2022)
def train(args):
path_prefix = "model_save"
os.makedirs(path_prefix, exist_ok=True)
ner_dm = NERDataModule(data_dir=args.data_path, batch_size=args.batch_size)
args.tag_size = ner_dm.tag_size
args.vocab_size = ner_dm.vocab_size
args.id2char = ner_dm.id2char
args.idx2tag = ner_dm.idx2tag
if args.load_pre:
model = IDCNN_CRF.load_from_checkpoint(args.ckpt_path, hparams=args)
else:
model = IDCNN_CRF(args)
lr_logger = LearningRateMonitor()
checkpoint_callback = ModelCheckpoint(save_top_k=3,
monitor="val_f1",
mode="max",
dirpath=path_prefix,
filename="ner-{epoch:03d}-{val_f1:.3f}", )
trainer = Trainer.from_argparse_args(args, callbacks=[lr_logger,
checkpoint_callback],
gpus=1,
max_epochs=500)
if args.train:
trainer.fit(model=model, datamodule=ner_dm)
if args.test:
trainer.test(model, ner_dm)
if args.save_state_dict:
if len(os.name) > 0:
ner_dm.save_dict(path_prefix)
def model_use(param):
model_dir = os.path.dirname(param.ckpt_path)
def _load_dict():
with open(os.path.join(model_dir, "token2index.txt"), 'r', encoding='utf8') as reader:
t2i_dict: dict = json.load(reader)
t2i_dict = {token: int(index) for token, index in t2i_dict.items()}
with open(os.path.join(model_dir, 'index2tag.txt'), 'r', encoding='utf8') as reader:
i2t_dict: dict = json.load(reader)
i2t_dict = {int(index): tag for index, tag in i2t_dict.items()}
return t2i_dict, i2t_dict
def num_data(content: str, token2index: dict):
number_data = [token2index.get(char, token2index.get("<unk>")) for char in content]
return number_data
token2index, index2tag = _load_dict()
param.tag_size = len(index2tag)
param.vocab_size = len(token2index)
param.idx2tag = index2tag
param.id2char = {index: char for index, char in enumerate(token2index.keys())}
model = IDCNN_CRF.load_from_checkpoint(param.ckpt_path, hparams=param)
test_data = "常建良,男,"
input_data = torch.tensor([num_data(test_data, token2index)], dtype=torch.long)
predict = model(input_data)[0]
result = []
for predict_id in predict:
result.append(index2tag.get(predict_id.item()))
print(predict)
print(result)
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("--batch_size", type=int, default=128)
parser.add_argument("--load_pre", default=True, action="store_true")
parser.add_argument("--ckpt_path", type=str, default="model_save/ner-epoch=151-val_f1=0.934.ckpt")
parser.add_argument("--test", action="store_true", default=True)
parser.add_argument("--train", action="store_true", default=False)
parser.add_argument("--save_state_dict", default=True, action="store_true")
parser = IDCNN.add_model_specific_args(parser)
params = parser.parse_args()
model_use(params)
模型效果
选取效果最好ckpt,我这里训练的结果:ner-epoch=151-val_f1=0.934.ckpt,其在测试集中的效果如下:
Testing: 100%|██████████| 4/4 [00:04<00:00, 1.06s/it]
precision recall f1-score support
CONT 1.00 1.00 1.00 28
EDU 0.97 0.96 0.96 112
LOC 0.80 0.67 0.73 6
NAME 0.98 0.98 0.98 112
ORG 0.89 0.92 0.90 553
PRO 0.76 0.79 0.78 33
RACE 1.00 1.00 1.00 14
TITLE 0.92 0.93 0.93 772
micro avg 0.92 0.93 0.92 1630
macro avg 0.92 0.91 0.91 1630
weighted avg 0.92 0.93 0.92 1630
--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'val_f1': 0.9232643118148598}
--------------------------------------------------------------------------------
Testing: 100%|██████████| 4/4 [00:08<00:00, 2.18s/it]
Process finished with exit code 0
f1值达到0.923,在BiLSTM-CRF中的效果是0.928。可以看出效果很接近。
模型使用
模型对未知数据进行推理时,需要使用模型训练保存的两个字典。模型推理的过程:
- 对待NER内容进行编码,转成数字
- 模型推理
- 对模型推理的结果进行解码
以"常建良,男,"为例,模型输出结果如下:
tensor([11, 8, 8, 2, 2, 2])
['B-NAME', 'I-NAME', 'I-NAME', 'O', 'O', 'O']
总结
IDCNN-CRF原理也比较简单,实现起来其实也不难。在实际的工程实现中,如果比较强调计算效率的话可以考虑IDCNN进行NER任务。当然上面模型的实现可能不够晚上,也欢迎在github上给我提issue哦。
|