IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 人工智能 -> 【NLP】基于Pytorch的IDCNN-CRF命名实体识别(NER)实现 -> 正文阅读

[人工智能]【NLP】基于Pytorch的IDCNN-CRF命名实体识别(NER)实现

背景

前文介绍了【NLP】命名实体识别——IDCNN-CRF论文阅读与总结【NLP】基于Pytorch lightning与BiLSTM-CRF的NER实现 也实现了相关模型。在GitHub看了一圈,IDCNN基本上都是Tensorflow实现了,现在我来实现一波,看看效果。源码已经上传到我的GitHub上:https://github.com/Htring/IDCNN-CRF_NER_PL,有兴趣的看以瞅瞅哦。

数据来源

本程序数据来源于:https://github.com/luopeixiang/named_entity_recognition.

为了能够使用seqeval工具评估模型效果,将原始数据中“M-”开头的标签处理为“I-”.

程序结构

程序设计结构依然像以往的形式,包括如下三个模块:

数据处理模块:dataloader.py
模型实现模块: idcnn.py
模型训练模块:idcnn_crf_pl.py
模型训练和模型使用模块:trainner.py

数据处理模块

数据处理模块复用了BiLSTM进行NER任务的程序,如下:

import json
import os
from typing import Optional, Any
import pytorch_lightning as pl
from pytorch_lightning.utilities.types import TRAIN_DATALOADERS, EVAL_DATALOADERS
from torchtext.legacy import data, datasets


class NERDataModule(pl.LightningDataModule):

    def __init__(self, data_dir="data/corpus", batch_size=128, experiment=False):
        super().__init__()
        self.data_path = data_dir
        self.batch_size = batch_size
        self.experiment = experiment
        self.setup()

    def setup(self, stage: Optional[str] = None) -> None:
        WORD = data.Field(batch_first=True, include_lengths=True)
        TAG = data.Field(batch_first=True, include_lengths=True)

        train_set, val_set, test_set = datasets.UDPOS.splits(path=self.data_path,
                                                             train='train.char.bmes',
                                                             validation='dev.char.bmes',
                                                             test='test.char.bmes',
                                                             fields=(('word', WORD), ('tag', TAG)),
                                                             separator=' ')
        if self.experiment:
            train_set.examples = train_set.examples[: 1000]
            val_set.examples = val_set.examples[: 1000]
            test_set.examples = test_set.examples[: 1000]

        WORD.build_vocab(train_set.word, val_set.word, test_set.word)
        TAG.build_vocab(train_set.tag, val_set.tag, test_set.tag)
        self.train_iter, self.val_iter, self.test_iter = data.BucketIterator.splits(
            (train_set, val_set, test_set),
            batch_size=self.batch_size,
            sort_within_batch=True,
            shuffle=True
        )
        self.char2idx = WORD.vocab.stoi
        self.id2char = WORD.vocab.itos
        self.tag2idx = TAG.vocab.stoi
        self.idx2tag = {index: value for index, value in enumerate(TAG.vocab.itos)}

        self.tag_size = len(TAG.vocab.stoi)
        self.word_size = len(WORD.vocab.stoi)
        self.vocab_size = self.word_size

    def on_before_batch_transfer(self, batch: Any, dataloader_idx: int) -> Any:
        x = batch.word[0]
        y = batch.tag[0]
        real_length = batch.word[1]
        return x, y, real_length

    def train_dataloader(self) -> TRAIN_DATALOADERS:
        return self.train_iter

    def test_dataloader(self) -> EVAL_DATALOADERS:
        return self.test_iter

    def val_dataloader(self) -> EVAL_DATALOADERS:
        return self.val_iter

    def save_dict(self, data_dir):
        with open(os.path.join(data_dir, "index2tag.txt"), 'w', encoding='utf8') as writer:
            json.dump(self.idx2tag, writer, ensure_ascii=False)

        with open(os.path.join(data_dir, "token2index.txt"), 'w', encoding='utf8') as writer:
            json.dump(self.char2idx, writer, ensure_ascii=False)

模型实现模块

程序实现如下:

from argparse import ArgumentParser
import torch
from torch import nn
import torch.nn.functional as F


class IDCNN(nn.Module):

    @staticmethod
    def add_model_specific_args(parent_parser):
        parser = ArgumentParser(parents=[parent_parser], add_help=False)
        parser.add_argument("--lr", type=float, default=5e-03)
        parser.add_argument("--block", type=int, default=1)
        parser.add_argument('--hidden_dim', type=int, default=32)
        parser.add_argument('--data_path', type=str, default="data/corpus")
        parser.add_argument("--dropout", type=float, default=0.5)
        parser.add_argument("--weight_decay", type=float, default=9e-3)
        parser.add_argument("--char_embedding_size", type=int, default=60)
        parser.add_argument("--experiment", type=bool, default=False)
        return parser

    def __init__(self,
                 token_vocab_size,
                 num_labels,
                 token_embedding_dim=128,
                 cnn_kernel_size=3,
                 cnn_num_filters=128,
                 input_dropout=0.5,
                 middle_dropout=0.2,
                 hidden_dropout=0.2,
                 blocks=1,
                 dilation_l=None,
                 embedding_pad_idx=0,
                 drop_penalty=1e-4
                 ):
        super().__init__()
        if dilation_l is None:
            dilation_l = [1, 2, 1]
        self.num_blocks = blocks
        self.dilation_l = dilation_l
        self.drop_penalty = drop_penalty
        self.num_labels = num_labels
        self.padding_idx = embedding_pad_idx
        self.token_embedding_dim = token_embedding_dim
        self.token_embedding = nn.Embedding(token_vocab_size,
                                            self.token_embedding_dim,
                                            padding_idx=embedding_pad_idx)
        self.filters = cnn_num_filters
        padding_word = int(cnn_kernel_size / 2)
        self.conv0 = nn.Conv1d(in_channels=token_embedding_dim,
                               out_channels=self.filters,
                               kernel_size=cnn_kernel_size,
                               padding=padding_word)
        self.cov_layers = nn.ModuleList([
        nn.Conv1d(in_channels=cnn_num_filters,
                  out_channels=cnn_num_filters,
                  kernel_size=cnn_kernel_size,
                  padding=padding_word*dilation,
                  dilation=dilation) for dilation in dilation_l
        ])
        self.conv_layers_size = len(self.cov_layers)
        self.dense = nn.Linear(in_features=(cnn_num_filters*blocks),
                               out_features=num_labels)
        self.i_drop = nn.Dropout(input_dropout)
        self.m_drop = nn.Dropout(middle_dropout)
        self.h_drop = nn.Dropout(hidden_dropout)

    def forward(self, feature):
        feature = self.token_embedding(feature)
        feature = self.i_drop(feature)
        feature = feature.permute(0, 2, 1)
        conv0 = self.conv0(feature)
        conv0 = F.relu(conv0)
        conv_layer = conv0
        conv_outputs = []
        for _ in range(self.num_blocks):
            for j, mdv in enumerate(self.cov_layers):
                conv_layer = mdv(conv_layer)
                conv_layer = F.relu(conv_layer)
                if j == self.conv_layers_size - 1:
                    conv_layer = self.m_drop(conv_layer)
                    conv_outputs.append(conv_layer)
        layer_concat = torch.cat(conv_outputs, 1)
        layer_concat = layer_concat.permute(0, 2, 1)
        return self.dense(layer_concat)

这个模型实现需要说明的是,经过embedding处理后语句先进入一个卷积神经网络进行将特征进行处理转成经过dilation层的输入。在进行dilation时需要注意,经过卷积后我们的语句长度要保持原来的不变,因为每个token最有都需要有一个打分,那么这时就需要对输入进行pad。

在每一个block中进行dilation,选取最后一个dilation结果作为当前这个block的输出。最后将所有的block的结构拼接起来经过一个全连接层输出即可。乍一看是不是很简单呢?难怪网上使用Pytorch实现的程序那么少,原来是太简单了。

模型训练的封装

这里依然使用Pytorch_ligtning对模型的训练进行封装。具体代码如下:

import torch
from typing import Union, Dict, List, Optional
from pytorch_lightning.utilities.types import STEP_OUTPUT
from seqeval.metrics import classification_report
from seqeval.metrics import f1_score
from pytorch_lightning import LightningModule
from torch.optim import RAdam
from torch import nn, Tensor
from torchcrf import CRF
from model.idcnn import IDCNN


class IDCNN_CRF(LightningModule):

    def __init__(self, hparams):
        super().__init__()
        self.hyper_params = hparams
        self.lr = hparams.lr
        self.word_emb = nn.Embedding(self.hyper_params.vocab_size, self.hyper_params.char_embedding_size)

        self.id2char = self.hyper_params.id2char
        self.idx2tag = self.hyper_params.idx2tag
        self.idcnn = IDCNN(token_vocab_size=len(self.id2char),
                           num_labels=len(self.idx2tag),
                           token_embedding_dim=self.hyper_params.char_embedding_size,
                           blocks=self.hyper_params.block
                           )
        self.crf = CRF(num_tags=self.hyper_params.tag_size, batch_first=True)
        self.dropout = nn.Dropout(self.hyper_params.dropout)

    def configure_optimizers(self):
        """
        配置优化器
        :return:
        """
        optimizer = RAdam(self.parameters(),
                          lr=self.lr,
                          weight_decay=self.hyper_params.weight_decay)
        return optimizer

    def forward_train(self, sentences_idx, tags_idx):
        """
        model train
        :param sentences_idx:
        :param tags_idx:
        :return:
        """
        feats = self.idcnn(sentences_idx)
        mask = tags_idx != 1
        loss = self.crf(feats, tags_idx, mask=mask, reduction='mean')
        return -loss

    def _get_batch_info(self, batch):
        this_batch_size = batch.word[0].size()[0]
        sentences_idx = batch.word[0].view(this_batch_size, -1)
        tags = batch.tag[0].view(this_batch_size, -1)
        sentences_length = batch.word[1]
        return sentences_idx, tags, sentences_length

    def forward(self, sentences_idx):
        """
        模型落地推理
        :param sentences_idx:
        :return:
        """
        return self._decode(sentences_idx)

    def _decode(self, sentences_idx):
        """
        模型实际预测函数
        :param sentences_idx:
        :return:
        """
        feats = self.idcnn(sentences_idx)
        results = self.crf.decode(feats)
        result_tensor = []
        for result in results:
            result_tensor.append(torch.tensor(result))
        return torch.stack(result_tensor)

    def training_step(self, batch, batch_idx, optimizer_idx=None) -> Union[int,
                                                                           Dict[str, Union[Tensor, Dict[str, Tensor]]]]:
        """
        模型训练的前向传播过程
        :param batch:批次数据
        :param batch_idx:
        :param optimizer_idx:
        :return:
        """
        sentences_idx, tags, sentences_length = batch
        loss = self.forward_train(sentences_idx, tags)
        res = {"log": {"loss": loss}, "loss": loss}
        return res

    def validation_step(self, batch, batch_idx) -> Union[int, Dict[str, Union[Tensor, Dict[str, Tensor]]]]:
        """
        开发集数据验证过程
        :param batch: 批次数据
        :param batch_idx:
        :return:
        """
        sentences_idx, tags, sentences_lengths = batch
        loss = self.forward_train(sentences_idx, tags)
        loss = loss.mean()
        return {"sentence_lengths": sentences_lengths, 'sentence': sentences_idx, "target": tags,
                "pred": self._decode(sentences_idx), "loss": loss}

    def validation_epoch_end(self, outputs: Union[List[Dict[str, Tensor]],
                                                  List[List[Dict[str, Tensor]]]]) -> Dict[str, Dict[str, Tensor]]:
        """
        验证数据集
        :param outputs: 所有batch预测结果 validation_step的返回值构成的一个list
        :return:
        """
        return self._decode_epoch_end(outputs)

    def _decode_epoch_end(self, outputs: Union[List[Dict[str, Tensor]],
                                               List[List[Dict[str, Tensor]]]]) -> Dict[str, Dict[str, Tensor]]:
        """
        对批次预测的结果进行整理,评估对应的结果
        :return:
        """
        ner_results = []
        gold_list, pred_list = [], []  # 原始标签以及模型预测结果
        for batch_result in outputs:
            batch_size = batch_result['sentence_lengths'].shape[0]
            for i in range(batch_size):
                res = []  # char gold pred
                sentence_gold, sentence_pred = [], []
                for j in range(batch_result['sentence_lengths'][i].item()):
                    char = self.id2char[batch_result['sentence'][i][j]]
                    gold = self.idx2tag.get(batch_result['target'][i][j].item())
                    pred = self.idx2tag.get(batch_result['pred'][i][j].item())
                    if gold == "<pad>":
                        break
                    res.append(" ".join([char, gold, pred]))
                    sentence_gold.append(gold)
                    sentence_pred.append(pred)
                ner_results.append(res)
                gold_list.append(sentence_gold)
                pred_list.append(sentence_pred)
        print("\n", classification_report(gold_list, pred_list))
        f1 = torch.tensor(f1_score(gold_list, pred_list))
        tqdm_dict = {'val_f1': f1}
        results = {"progress_bar": tqdm_dict, "log": {'val_f1': f1, "step": self.current_epoch}}
        self.log("val_f1", f1)
        return results

    def test_step(self, batch, batch_idx) -> Optional[STEP_OUTPUT]:
        """
        程序测试模块
        :param batch:
        :param batch_idx:
        :return:
        """
        sentences_idx, tags, sentences_lengths = batch
        loss = self.forward_train(sentences_idx, tags)
        loss = loss.mean()
        return {"sentence_lengths": sentences_lengths,
                'sentence': sentences_idx, "target": tags,
                "pred": self._decode(sentences_idx), "loss": loss}

    def test_epoch_end(self, outputs: Union[List[Dict[str, Tensor]],
                                            List[List[Dict[str, Tensor]]]]) -> Dict[str, Dict[str, Tensor]]:
        """
        测试集的评估
        :param outputs:测试集batch预测完成结果
        :return:
        """
        return self._decode_epoch_end(outputs)

模型训练和模型使用模块

模型训练和使用程序如下,了解过之前写的代码,这里就无需多言了。

import json
import os
from argparse import ArgumentParser
import torch
from pytorch_lightning import Trainer
from model.idcnn_crf_pl import IDCNN_CRF
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor
import pytorch_lightning as pl
from dataloader import NERDataModule
from model.idcnn import IDCNN

pl.seed_everything(2022)


def train(args):
    path_prefix = "model_save"
    os.makedirs(path_prefix, exist_ok=True)

    ner_dm = NERDataModule(data_dir=args.data_path, batch_size=args.batch_size)
    args.tag_size = ner_dm.tag_size
    args.vocab_size = ner_dm.vocab_size
    args.id2char = ner_dm.id2char
    args.idx2tag = ner_dm.idx2tag
    if args.load_pre:
        model = IDCNN_CRF.load_from_checkpoint(args.ckpt_path, hparams=args)
    else:
        model = IDCNN_CRF(args)
    lr_logger = LearningRateMonitor()
    checkpoint_callback = ModelCheckpoint(save_top_k=3,
                                          monitor="val_f1",
                                          mode="max",
                                          dirpath=path_prefix,
                                          filename="ner-{epoch:03d}-{val_f1:.3f}", )
    trainer = Trainer.from_argparse_args(args, callbacks=[lr_logger,
                                                          checkpoint_callback],
                                         gpus=1,
                                         max_epochs=500)

    if args.train:
        trainer.fit(model=model, datamodule=ner_dm)

    if args.test:
        trainer.test(model, ner_dm)

    if args.save_state_dict:
        if len(os.name) > 0:
            ner_dm.save_dict(path_prefix)


def model_use(param):
    model_dir = os.path.dirname(param.ckpt_path)

    def _load_dict():
        with open(os.path.join(model_dir, "token2index.txt"), 'r', encoding='utf8') as reader:
            t2i_dict: dict = json.load(reader)
        t2i_dict = {token: int(index) for token, index in t2i_dict.items()}
        with open(os.path.join(model_dir, 'index2tag.txt'), 'r', encoding='utf8') as reader:
            i2t_dict: dict = json.load(reader)
        i2t_dict = {int(index): tag for index, tag in i2t_dict.items()}
        return t2i_dict, i2t_dict

    def num_data(content: str, token2index: dict):
        number_data = [token2index.get(char, token2index.get("<unk>")) for char in content]
        return number_data

    token2index, index2tag = _load_dict()
    param.tag_size = len(index2tag)
    param.vocab_size = len(token2index)
    param.idx2tag = index2tag
    param.id2char = {index: char for index, char in enumerate(token2index.keys())}
    model = IDCNN_CRF.load_from_checkpoint(param.ckpt_path, hparams=param)

    test_data = "常建良,男,"
    # encode
    input_data = torch.tensor([num_data(test_data, token2index)], dtype=torch.long)
    # predict
    predict = model(input_data)[0]
    result = []
    # decode
    for predict_id in predict:
        result.append(index2tag.get(predict_id.item()))
    print(predict)
    print(result)


if __name__ == '__main__':
    parser = ArgumentParser()
    parser.add_argument("--batch_size", type=int, default=128)
    parser.add_argument("--load_pre", default=True, action="store_true")
    parser.add_argument("--ckpt_path", type=str, default="model_save/ner-epoch=151-val_f1=0.934.ckpt")
    parser.add_argument("--test", action="store_true", default=True)
    parser.add_argument("--train", action="store_true", default=False)
    parser.add_argument("--save_state_dict", default=True, action="store_true")
    parser = IDCNN.add_model_specific_args(parser)
    params = parser.parse_args()
    # train(params)
    model_use(params)

模型效果

选取效果最好ckpt,我这里训练的结果:ner-epoch=151-val_f1=0.934.ckpt,其在测试集中的效果如下:

Testing: 100%|██████████| 4/4 [00:04<00:00,  1.06s/it]
               precision    recall  f1-score   support

        CONT       1.00      1.00      1.00        28
         EDU       0.97      0.96      0.96       112
         LOC       0.80      0.67      0.73         6
        NAME       0.98      0.98      0.98       112
         ORG       0.89      0.92      0.90       553
         PRO       0.76      0.79      0.78        33
        RACE       1.00      1.00      1.00        14
       TITLE       0.92      0.93      0.93       772

   micro avg       0.92      0.93      0.92      1630
   macro avg       0.92      0.91      0.91      1630
weighted avg       0.92      0.93      0.92      1630

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'val_f1': 0.9232643118148598}
--------------------------------------------------------------------------------
Testing: 100%|██████████| 4/4 [00:08<00:00,  2.18s/it]

Process finished with exit code 0

f1值达到0.923,在BiLSTM-CRF中的效果是0.928。可以看出效果很接近。

模型使用

模型对未知数据进行推理时,需要使用模型训练保存的两个字典。模型推理的过程:

  1. 对待NER内容进行编码,转成数字
  2. 模型推理
  3. 对模型推理的结果进行解码

以"常建良,男,"为例,模型输出结果如下:

tensor([11,  8,  8,  2,  2,  2])
['B-NAME', 'I-NAME', 'I-NAME', 'O', 'O', 'O']

总结

IDCNN-CRF原理也比较简单,实现起来其实也不难。在实际的工程实现中,如果比较强调计算效率的话可以考虑IDCNN进行NER任务。当然上面模型的实现可能不够晚上,也欢迎在github上给我提issue哦。

  人工智能 最新文章
2022吴恩达机器学习课程——第二课(神经网
第十五章 规则学习
FixMatch: Simplifying Semi-Supervised Le
数据挖掘Java——Kmeans算法的实现
大脑皮层的分割方法
【翻译】GPT-3是如何工作的
论文笔记:TEACHTEXT: CrossModal Generaliz
python从零学(六)
详解Python 3.x 导入(import)
【答读者问27】backtrader不支持最新版本的
上一篇文章      下一篇文章      查看所有文章
加:2022-05-09 12:39:40  更:2022-05-09 12:43:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 7:37:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码