参考:
NER 原理:https://paddlepedia.readthedocs.io/en/latest/tutorials/natural_language_processing/ner/bilstm_crf.html NER 多种实现(浅显):https://zhuanlan.zhihu.com/p/88544122 论文地址:https://arxiv.org/pdf/1911.04474.pdf 参考 Github 代码实现:https://github.com/fastnlp/TENER 用到的 fastNLP Github:https://github.com/fastnlp/fastNLP fastNLP 模型 save:https://fastnlp.readthedocs.io/zh/latest/fastNLP.io.model_io.html 需要保存 Vocabulary:https://fastnlp.readthedocs.io/zh/latest/fastNLP.core.vocabulary.html
NER 原理
NER 包括两个方面:BILSTM / Bert / Transform + Linear(softmax) + CRF
- 其中神经网络层:主要是 input(文本) -> embedding 化 -> 通过 lstm / transformer 等提取特征信息 --> softmax 得到每个字符(char)的 label(B-xx、I-xx、O…)概率([context_size,tag_size] 的矩阵,其中每个字词对应一行标签 softmax 分数)
- CRF 层:是
tag_size * tag_size 的可学习概率矩阵 - 损失函数:模型学习到了第一步的
[context_size, tag_size] 概率矩阵,也就是每个字符(xi)都得到一个属于哪种 tag 的概率值,如果没有 CRF,那么对每个字符的概率向量,计算 argmax 得到的就是最终的 Y 序列。但是每个字符都是最大概率值,不等于最后的 Y 序列概率最大(最大似然),所以需要找到概率最大的路径(也就是 y1 -> y2 -> y3 -> ... 概率最大):CRF的解码策略就是在所有可能的路径中,找出得出概率最大,效果最优的一条路径,那这个标签序列就是模型的最终输出 Y。 假设标签数量是k,文本长度是n,显然会有N=k**n (k 的 n 次方)条路径,若用 Si 代表第i条路径的分数,那我们可以这样去算一个标签序列出现的概率: 显然,我们需要让 Real 真实的那条路径概率最大,也就是: 因此,我们建模学习的目的就是为了不断的提高 P(Sreal) 的概率值,这就是我们的目标函数,当目标函数越大时,它对应的损失就应该越小,所以我们可以这样去建模它的损失函数: 放到log空间去求解后: 省略一系列推导,可以参考:https://paddlepedia.readthedocs.io/en/latest/tutorials/natural_language_processing/ner/bilstm_crf.html 对于只有两个 tag 的例子,到 x1 位置的概率为 log(ex00+t00+x10+ex01+t01+x10+ex00+t10+x11+ex01+t11+x11) ,其中 x00 表示 x0 这个字符(索引为 0 的字符)属于 tag0 的概率,也就是 lstm 网络第一行的 softmax 概率向量,t00 表示从 tag1 转移到 tag0 的概率,见下图公式 因此模型的损失是综合了 lstm 的 softmax 概率,以及 CRF 的 tag 转移概率,两个损失得到的,因此模型的前向传播就可以在这两部分中进行了。
TENER
下面是 TENER (CRF 的一种最新实现,主要思想是去除 / √dk 归一化的 transformer + softmax + CRF)的代码整理,CRF 是个数据概率算法,所以无论任何模型都是固定的,优化的方向就是如何让文本的信息提取能力更强(transformer),如何让各个 tag 的区分度更大(去除归一化)
主代码:train_tener_cn.py
from models.TENER import TENER
from fastNLP import cache_results
from fastNLP import Trainer, GradientClipCallback, WarmupCallback
from torch import optim
from fastNLP import SpanFPreRecMetric, BucketSampler
from fastNLP.embeddings import StaticEmbedding
from fastNLP.io.model_io import ModelSaver, ModelLoader
from modules.pipe import CNNERPipe
import argparse
from modules.callbacks import EvaluateCallback
device = 0
parser = argparse.ArgumentParser()
parser.add_argument('--dataset', type=str, default='resume', choices=['weibo', 'resume', 'ontonotes', 'msra', 'ticket'])
args = parser.parse_args()
dataset = args.dataset
if dataset == 'resume':
n_heads = 4
head_dims = 64
num_layers = 2
lr = 0.0007
attn_type = 'adatrans'
n_epochs = 50
elif dataset == 'weibo':
n_heads = 4
head_dims = 32
num_layers = 1
lr = 0.001
attn_type = 'adatrans'
n_epochs = 100
elif dataset == 'ontonotes':
n_heads = 4
head_dims = 48
num_layers = 2
lr = 0.0007
attn_type = 'adatrans'
n_epochs = 100
elif dataset == 'msra':
n_heads = 6
head_dims = 80
num_layers = 2
lr = 0.0007
attn_type = 'adatrans'
n_epochs = 100
elif dataset == 'ticket':
n_heads = 6
head_dims = 80
num_layers = 2
lr = 0.0007
attn_type = 'adatrans'
n_epochs = 100
pos_embed = None
batch_size = 16
warmup_steps = 0.01
after_norm = 1
model_type = 'transformer'
normalize_embed = True
dropout=0.15
fc_dropout=0.4
encoding_type = 'bio'
name = 'caches/{}_{}_{}_{}.pkl'.format(dataset, model_type, encoding_type, normalize_embed)
d_model = n_heads * head_dims
dim_feedforward = int(2 * d_model)
@cache_results(name, _refresh=False)
def load_data():
if dataset == 'ontonotes':
paths = {'train':'../data/OntoNote4NER/train.char.bmes',
"dev":'../data/OntoNote4NER/dev.char.bmes',
"test":'../data/OntoNote4NER/test.char.bmes'}
min_freq = 2
elif dataset == 'weibo':
paths = {'train': '../data/WeiboNER/train.all.bmes',
'dev':'../data/WeiboNER/dev.all.bmes',
'test':'../data/WeiboNER/test.all.bmes'}
min_freq = 1
elif dataset == 'resume':
paths = {'train': '../data/ResumeNER/train.char.bmes',
'dev':'../data/ResumeNER/dev.char.bmes',
'test':'../data/ResumeNER/test.char.bmes'}
min_freq = 1
elif dataset == 'msra':
paths = {'train': '../data/MSRANER/train_dev.char.bmes',
'dev':'../data/MSRANER/test.char.bmes',
'test':'../data/MSRANER/test.char.bmes'}
min_freq = 2
elif dataset == 'ticket':
paths = {'train': '../data/train.all.bmes',
'dev': '../data/dev.all.bmes',
'test': '../data/test.all.bmes'}
min_freq = 2
data_bundle = CNNERPipe(bigrams=True, encoding_type=encoding_type).process_from_file(paths)
embed = StaticEmbedding(data_bundle.get_vocab('chars'),
model_dir_or_name='../data/gigaword_chn.all.a2b.uni.ite50.vec',
min_freq=1, only_norm_found_vector=normalize_embed, word_dropout=0.01, dropout=0.3)
bi_embed = StaticEmbedding(data_bundle.get_vocab('bigrams'),
model_dir_or_name='../data/gigaword_chn.all.a2b.bi.ite50.vec',
word_dropout=0.02, dropout=0.3, min_freq=min_freq,
only_norm_found_vector=normalize_embed, only_train_min_freq=True)
return data_bundle, embed, bi_embed
data_bundle, embed, bi_embed = load_data()
data_bundle.get_vocab('chars').save('../model/chars_vocab_200.npy')
data_bundle.get_vocab('bigrams').save('../model/bigrams_vocab_200.npy')
data_bundle.get_vocab('target').save('../model/target_vocab_200.npy')
model = TENER(tag_vocab=data_bundle.get_vocab('target'), embed=embed, num_layers=num_layers,
d_model=d_model, n_head=n_heads,
feedforward_dim=dim_feedforward, dropout=dropout,
after_norm=after_norm, attn_type=attn_type,
bi_embed=bi_embed,
fc_dropout=fc_dropout,
pos_embed=pos_embed,
scale=attn_type=='transformer')
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
callbacks = []
clip_callback = GradientClipCallback(clip_type='value', clip_value=5)
evaluate_callback = EvaluateCallback(data_bundle.get_dataset('test'))
if warmup_steps>0:
warmup_callback = WarmupCallback(warmup_steps, schedule='linear')
callbacks.append(warmup_callback)
callbacks.extend([clip_callback, evaluate_callback])
trainer = Trainer(data_bundle.get_dataset('train'), model, optimizer, batch_size=batch_size, sampler=BucketSampler(),
num_workers=2, n_epochs=n_epochs, dev_data=data_bundle.get_dataset('dev'),
metrics=SpanFPreRecMetric(tag_vocab=data_bundle.get_vocab('target'), encoding_type=encoding_type),
dev_batch_size=batch_size, callbacks=callbacks, device=None, test_use_tqdm=False,
use_tqdm=True, print_every=300, save_path=None)
trainer.train(load_best_model=False)
state_saver = ModelSaver("../model/ticket_ner_state_dict_200.pkl")
state_saver.save_pytorch(model, param_only=True)
pkl_saver = ModelSaver("../model/ticket_ner_200.pkl")
pkl_saver.save_pytorch(model, param_only=False)
主模型网络:TENER.py
from fastNLP.modules import ConditionalRandomField, allowed_transitions
from modules.transformer import TransformerEncoder
from torch import nn
import torch
import torch.nn.functional as F
class TENER(nn.Module):
def __init__(self, tag_vocab, embed, num_layers, d_model, n_head, feedforward_dim, dropout,
after_norm=True, attn_type='adatrans', bi_embed=None,
fc_dropout=0.3, pos_embed=None, scale=False, dropout_attn=None):
"""
:param tag_vocab: fastNLP Vocabulary
:param embed: fastNLP TokenEmbedding
:param num_layers: number of self-attention layers
:param d_model: input size
:param n_head: number of head
:param feedforward_dim: the dimension of ffn
:param dropout: dropout in self-attention
:param after_norm: normalization place
:param attn_type: adatrans, naive
:param rel_pos_embed: position embedding的类型,支持sin, fix, None. relative时可为None
:param bi_embed: Used in Chinese scenerio
:param fc_dropout: dropout rate before the fc layer
"""
super().__init__()
self.embed = embed
embed_size = self.embed.embed_size
self.bi_embed = None
if bi_embed is not None:
self.bi_embed = bi_embed
embed_size += self.bi_embed.embed_size
self.in_fc = nn.Linear(embed_size, d_model)
self.transformer = TransformerEncoder(num_layers, d_model, n_head, feedforward_dim, dropout,
after_norm=after_norm, attn_type=attn_type,
scale=scale, dropout_attn=dropout_attn,
pos_embed=pos_embed)
self.fc_dropout = nn.Dropout(fc_dropout)
self.out_fc = nn.Linear(d_model, len(tag_vocab))
trans = allowed_transitions(tag_vocab, include_start_end=True)
self.crf = ConditionalRandomField(len(tag_vocab), include_start_end_trans=True, allowed_transitions=trans)
def _forward(self, chars, target, bigrams=None):
mask = chars.ne(0)
chars = self.embed(chars)
if self.bi_embed is not None:
bigrams = self.bi_embed(bigrams)
chars = torch.cat([chars, bigrams], dim=-1)
chars = self.in_fc(chars)
chars = self.transformer(chars, mask)
chars = self.fc_dropout(chars)
chars = self.out_fc(chars)
logits = F.log_softmax(chars, dim=-1)
if target is None:
paths, _ = self.crf.viterbi_decode(logits, mask)
return {'pred': paths}
else:
loss = self.crf(logits, target, mask)
return {'loss': loss}
def forward(self, chars, target, bigrams=None):
return self._forward(chars, target, bigrams)
def predict(self, chars, bigrams=None):
return self._forward(chars, target=None, bigrams=bigrams)
数据预处理:pipe.py
from fastNLP.io import Pipe, ConllLoader
from fastNLP.io import DataBundle
from fastNLP.io.pipe.utils import _add_words_field, _indexize
from fastNLP.io.pipe.utils import iob2, iob2bioes
from fastNLP.io.pipe.utils import _add_chars_field
from fastNLP.io.utils import check_loader_paths
from fastNLP.io import Conll2003NERLoader
from fastNLP import Const
def bmeso2bio(tags):
new_tags = []
for tag in tags:
tag = tag.lower()
if tag.startswith('m') or tag.startswith('e'):
tag = 'i' + tag[1:]
if tag.startswith('s'):
tag = 'b' + tag[1:]
new_tags.append(tag)
return new_tags
def bmeso2bioes(tags):
new_tags = []
for tag in tags:
lowered_tag = tag.lower()
if lowered_tag.startswith('m'):
tag = 'i' + tag[1:]
new_tags.append(tag)
return new_tags
class CNNERPipe(Pipe):
def __init__(self, bigrams=False, encoding_type='bmeso'):
super().__init__()
self.bigrams = bigrams
if encoding_type=='bmeso':
self.encoding_func = lambda x:x
elif encoding_type=='bio':
self.encoding_func = bmeso2bio
elif encoding_type == 'bioes':
self.encoding_func = bmeso2bioes
else:
raise RuntimeError("Only support bio, bmeso, bioes")
def process(self, data_bundle: DataBundle):
_add_chars_field(data_bundle, lower=False)
data_bundle.apply_field(self.encoding_func, field_name=Const.TARGET, new_field_name=Const.TARGET)
data_bundle.apply_field(lambda chars:[''.join(['0' if c.isdigit() else c for c in char]) for char in chars],
field_name=Const.CHAR_INPUT, new_field_name=Const.CHAR_INPUT)
input_field_names = [Const.CHAR_INPUT]
if self.bigrams:
data_bundle.apply_field(lambda chars:[c1+c2 for c1,c2 in zip(chars, chars[1:]+['<eos>'])],
field_name=Const.CHAR_INPUT, new_field_name='bigrams')
input_field_names.append('bigrams')
_indexize(data_bundle, input_field_names=input_field_names, target_field_names=Const.TARGET)
input_fields = [Const.TARGET, Const.INPUT_LEN] + input_field_names
target_fields = [Const.TARGET, Const.INPUT_LEN]
for name, dataset in data_bundle.datasets.items():
dataset.add_seq_len(Const.CHAR_INPUT)
data_bundle.set_input(*input_fields)
data_bundle.set_target(*target_fields)
return data_bundle
def process_from_file(self, paths):
paths = check_loader_paths(paths)
loader = ConllLoader(headers=['raw_chars', 'target'])
data_bundle = loader.load(paths)
return self.process(data_bundle)
训练
sudo python train_tener_cn.py --dataset ticket
线上使用(& 离线预测)
from __future__ import division
from copy import deepcopy
import math
import torch
import torch.nn.functional as F
from fastNLP.core import Vocabulary
from fastNLP.embeddings import StaticEmbedding
from fastNLP.modules import ConditionalRandomField, allowed_transitions
from torch import nn
class RelativeEmbedding(nn.Module):
def forward(self, input):
"""Input is expected to be of size [bsz x seqlen].
"""
bsz, seq_len = input.size()
max_pos = self.padding_idx + seq_len
if max_pos > self.origin_shift:
weights = self.get_embedding(
max_pos * 2,
self.embedding_dim,
self.padding_idx,
)
weights = weights.to(self._float_tensor)
del self.weights
self.origin_shift = weights.size(0) // 2
self.register_buffer('weights', weights)
positions = torch.arange(-seq_len, seq_len).to(input.device).long() + self.origin_shift
embed = self.weights.index_select(0, positions.long()).detach()
return embed
class RelativeSinusoidalPositionalEmbedding(RelativeEmbedding):
"""This module produces sinusoidal positional embeddings of any length.
Padding symbols are ignored.
"""
def __init__(self, embedding_dim, padding_idx, init_size=1568):
"""
:param embedding_dim: 每个位置的dimension
:param padding_idx:
:param init_size:
"""
super().__init__()
self.embedding_dim = embedding_dim
self.padding_idx = padding_idx
assert init_size % 2 == 0
weights = self.get_embedding(
init_size + 1,
embedding_dim,
padding_idx,
)
self.register_buffer('weights', weights)
self.register_buffer('_float_tensor', torch.FloatTensor(1))
def get_embedding(self, num_embeddings, embedding_dim, padding_idx=None):
"""Build sinusoidal embeddings.
This matches the implementation in tensor2tensor, but differs slightly
from the description in Section 3.5 of "Attention Is All You Need".
"""
half_dim = embedding_dim // 2
emb = math.log(10000) / (half_dim - 1)
emb = torch.exp(torch.arange(half_dim, dtype=torch.float) * -emb)
emb = torch.arange(-num_embeddings // 2, num_embeddings // 2, dtype=torch.float).unsqueeze(1) * emb.unsqueeze(0)
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1).view(num_embeddings, -1)
if embedding_dim % 2 == 1:
emb = torch.cat([emb, torch.zeros(num_embeddings, 1)], dim=1)
if padding_idx is not None:
emb[padding_idx, :] = 0
self.origin_shift = num_embeddings // 2 + 1
return emb
class RelativeMultiHeadAttn(nn.Module):
def __init__(self, d_model, n_head, dropout, r_w_bias=None, r_r_bias=None, scale=False):
"""
:param int d_model:
:param int n_head:
:param dropout: 对attention map的dropout
:param r_w_bias: n_head x head_dim or None, 如果为dim
:param r_r_bias: n_head x head_dim or None,
:param scale:
:param rel_pos_embed:
"""
super().__init__()
self.qkv_linear = nn.Linear(d_model, d_model * 3, bias=False)
self.n_head = n_head
self.head_dim = d_model // n_head
self.dropout_layer = nn.Dropout(dropout)
self.pos_embed = RelativeSinusoidalPositionalEmbedding(d_model // n_head, 0, 1200)
if scale:
self.scale = math.sqrt(d_model // n_head)
else:
self.scale = 1
if r_r_bias is None or r_w_bias is None:
self.r_r_bias = nn.Parameter(nn.init.xavier_normal_(torch.zeros(n_head, d_model // n_head)))
self.r_w_bias = nn.Parameter(nn.init.xavier_normal_(torch.zeros(n_head, d_model // n_head)))
else:
self.r_r_bias = r_r_bias
self.r_w_bias = r_w_bias
def forward(self, x, mask):
"""
:param x: batch_size x max_len x d_model
:param mask: batch_size x max_len
:return:
"""
batch_size, max_len, d_model = x.size()
pos_embed = self.pos_embed(mask)
qkv = self.qkv_linear(x)
q, k, v = torch.chunk(qkv, chunks=3, dim=-1)
q = q.view(batch_size, max_len, self.n_head, -1).transpose(1, 2)
k = k.view(batch_size, max_len, self.n_head, -1).transpose(1, 2)
v = v.view(batch_size, max_len, self.n_head, -1).transpose(1, 2)
rw_head_q = q + self.r_r_bias[:, None]
AC = torch.einsum('bnqd,bnkd->bnqk', [rw_head_q, k])
D_ = torch.einsum('nd,ld->nl', self.r_w_bias, pos_embed)[None, :, None]
B_ = torch.einsum('bnqd,ld->bnql', q, pos_embed)
E_ = torch.einsum('bnqd,ld->bnql', k, pos_embed)
BD = B_ + D_
BDE = self._shift(BD) + self._transpose_shift(E_)
attn = AC + BDE
attn = attn / self.scale
attn = attn.masked_fill(mask[:, None, None, :].eq(0), float('-inf'))
attn = F.softmax(attn, dim=-1)
attn = self.dropout_layer(attn)
v = torch.matmul(attn, v).transpose(1, 2).reshape(batch_size, max_len, d_model)
return v
def _shift(self, BD):
"""
类似
-3 -2 -1 0 1 2
-3 -2 -1 0 1 2
-3 -2 -1 0 1 2
转换为
0 1 2
-1 0 1
-2 -1 0
:param BD: batch_size x n_head x max_len x 2max_len
:return: batch_size x n_head x max_len x max_len
"""
bsz, n_head, max_len, _ = BD.size()
zero_pad = BD.new_zeros(bsz, n_head, max_len, 1)
BD = torch.cat([BD, zero_pad], dim=-1).view(bsz, n_head, -1, max_len)
BD = BD[:, :, :-1].view(bsz, n_head, max_len, -1)
BD = BD[:, :, :, max_len:]
return BD
def _transpose_shift(self, E):
"""
类似
-3 -2 -1 0 1 2
-30 -20 -10 00 10 20
-300 -200 -100 000 100 200
转换为
0 -10 -200
1 00 -100
2 10 000
:param E: batch_size x n_head x max_len x 2max_len
:return: batch_size x n_head x max_len x max_len
"""
bsz, n_head, max_len, _ = E.size()
zero_pad = E.new_zeros(bsz, n_head, max_len, 1)
E = torch.cat([E, zero_pad], dim=-1).view(bsz, n_head, -1, max_len)
indice = (torch.arange(max_len) * 2 + 1).to(E.device)
E = E.index_select(index=indice, dim=-2).transpose(-1, -2)
return E
class MultiHeadAttn(nn.Module):
def __init__(self, d_model, n_head, dropout=0.1, scale=False):
"""
:param d_model:
:param n_head:
:param scale: 是否scale输出
"""
super().__init__()
assert d_model % n_head == 0
self.n_head = n_head
self.qkv_linear = nn.Linear(d_model, 3 * d_model, bias=False)
self.fc = nn.Linear(d_model, d_model)
self.dropout_layer = nn.Dropout(dropout)
if scale:
self.scale = math.sqrt(d_model // n_head)
else:
self.scale = 1
def forward(self, x, mask):
"""
:param x: bsz x max_len x d_model
:param mask: bsz x max_len
:return:
"""
batch_size, max_len, d_model = x.size()
x = self.qkv_linear(x)
q, k, v = torch.chunk(x, 3, dim=-1)
q = q.view(batch_size, max_len, self.n_head, -1).transpose(1, 2)
k = k.view(batch_size, max_len, self.n_head, -1).permute(0, 2, 3, 1)
v = v.view(batch_size, max_len, self.n_head, -1).transpose(1, 2)
attn = torch.matmul(q, k)
attn = attn / self.scale
attn.masked_fill_(mask=mask[:, None, None].eq(0), value=float('-inf'))
attn = F.softmax(attn, dim=-1)
attn = self.dropout_layer(attn)
v = torch.matmul(attn, v)
v = v.transpose(1, 2).reshape(batch_size, max_len, -1)
v = self.fc(v)
return v
class TransformerLayer(nn.Module):
def __init__(self, d_model, self_attn, feedforward_dim, after_norm, dropout):
"""
:param int d_model: 一般512之类的
:param self_attn: self attention模块,输入为x:batch_size x max_len x d_model, mask:batch_size x max_len, 输出为
batch_size x max_len x d_model
:param int feedforward_dim: FFN中间层的dimension的大小
:param bool after_norm: norm的位置不一样,如果为False,则embedding可以直接连到输出
:param float dropout: 一共三个位置的dropout的大小
"""
super().__init__()
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.self_attn = self_attn
self.after_norm = after_norm
self.ffn = nn.Sequential(nn.Linear(d_model, feedforward_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(feedforward_dim, d_model),
nn.Dropout(dropout))
def forward(self, x, mask):
"""
:param x: batch_size x max_len x hidden_size
:param mask: batch_size x max_len, 为0的地方为pad
:return: batch_size x max_len x hidden_size
"""
residual = x
if not self.after_norm:
x = self.norm1(x)
x = self.self_attn(x, mask)
x = x + residual
if self.after_norm:
x = self.norm1(x)
residual = x
if not self.after_norm:
x = self.norm2(x)
x = self.ffn(x)
x = residual + x
if self.after_norm:
x = self.norm2(x)
return x
class TransformerEncoder(nn.Module):
def __init__(self, num_layers, d_model, n_head, feedforward_dim, dropout, after_norm=True, attn_type='naive',
scale=False, dropout_attn=None, pos_embed=None):
super().__init__()
if dropout_attn is None:
dropout_attn = dropout
self.d_model = d_model
if pos_embed is None:
self.pos_embed = None
elif pos_embed == 'sin':
self.pos_embed = SinusoidalPositionalEmbedding(d_model, 0, init_size=1024)
elif pos_embed == 'fix':
self.pos_embed = LearnedPositionalEmbedding(1024, d_model, 0)
if attn_type == 'transformer':
self_attn = MultiHeadAttn(d_model, n_head, dropout_attn, scale=scale)
elif attn_type == 'adatrans':
self_attn = RelativeMultiHeadAttn(d_model, n_head, dropout_attn, scale=scale)
self.layers = nn.ModuleList([TransformerLayer(d_model, deepcopy(self_attn), feedforward_dim, after_norm, dropout)
for _ in range(num_layers)])
def forward(self, x, mask):
"""
:param x: batch_size x max_len
:param mask: batch_size x max_len. 有value的地方为1
:return:
"""
if self.pos_embed is not None:
x = x + self.pos_embed(mask)
for layer in self.layers:
x = layer(x, mask)
return x
def make_positions(tensor, padding_idx):
"""Replace non-padding symbols with their position numbers.
Position numbers begin at padding_idx+1. Padding symbols are ignored.
"""
mask = tensor.ne(padding_idx).int()
return (
torch.cumsum(mask, dim=1).type_as(mask) * mask
).long() + padding_idx
class SinusoidalPositionalEmbedding(nn.Module):
"""This module produces sinusoidal positional embeddings of any length.
Padding symbols are ignored.
"""
def __init__(self, embedding_dim, padding_idx, init_size=1568):
super().__init__()
self.embedding_dim = embedding_dim
self.padding_idx = padding_idx
self.weights = SinusoidalPositionalEmbedding.get_embedding(
init_size,
embedding_dim,
padding_idx,
)
self.register_buffer('_float_tensor', torch.FloatTensor(1))
@staticmethod
def get_embedding(num_embeddings, embedding_dim, padding_idx=None):
"""Build sinusoidal embeddings.
This matches the implementation in tensor2tensor, but differs slightly
from the description in Section 3.5 of "Attention Is All You Need".
"""
half_dim = embedding_dim // 2
emb = math.log(10000) / (half_dim - 1)
emb = torch.exp(torch.arange(half_dim, dtype=torch.float) * -emb)
emb = torch.arange(num_embeddings, dtype=torch.float).unsqueeze(1) * emb.unsqueeze(0)
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1).view(num_embeddings, -1)
if embedding_dim % 2 == 1:
emb = torch.cat([emb, torch.zeros(num_embeddings, 1)], dim=1)
if padding_idx is not None:
emb[padding_idx, :] = 0
return emb
def forward(self, input):
"""Input is expected to be of size [bsz x seqlen]."""
bsz, seq_len = input.size()
max_pos = self.padding_idx + 1 + seq_len
if max_pos > self.weights.size(0):
self.weights = SinusoidalPositionalEmbedding.get_embedding(
max_pos,
self.embedding_dim,
self.padding_idx,
)
self.weights = self.weights.to(self._float_tensor)
positions = make_positions(input, self.padding_idx)
return self.weights.index_select(0, positions.view(-1)).view(bsz, seq_len, -1).detach()
def max_positions(self):
"""Maximum number of supported positions."""
return int(1e5)
class LearnedPositionalEmbedding(nn.Embedding):
"""
This module learns positional embeddings up to a fixed maximum size.
Padding ids are ignored by either offsetting based on padding_idx
or by setting padding_idx to None and ensuring that the appropriate
position ids are passed to the forward function.
"""
def __init__(
self,
num_embeddings: int,
embedding_dim: int,
padding_idx: int,
):
super().__init__(num_embeddings, embedding_dim, padding_idx)
def forward(self, input):
positions = make_positions(input, self.padding_idx)
return super().forward(positions)
class TENER(nn.Module):
def __init__(self, tag_vocab, embed, num_layers, d_model, n_head, feedforward_dim, dropout,
after_norm=True, attn_type='adatrans', bi_embed=None,
fc_dropout=0.3, pos_embed=None, scale=False, dropout_attn=None):
"""
:param tag_vocab: fastNLP Vocabulary
:param embed: fastNLP TokenEmbedding
:param num_layers: number of self-attention layers
:param d_model: input size
:param n_head: number of head
:param feedforward_dim: the dimension of ffn
:param dropout: dropout in self-attention
:param after_norm: normalization place
:param attn_type: adatrans, naive
:param rel_pos_embed: position embedding的类型,支持sin, fix, None. relative时可为None
:param bi_embed: Used in Chinese scenerio
:param fc_dropout: dropout rate before the fc layer
"""
super().__init__()
self.embed = embed
embed_size = self.embed.embed_size
self.bi_embed = None
if bi_embed is not None:
self.bi_embed = bi_embed
embed_size += self.bi_embed.embed_size
self.in_fc = nn.Linear(embed_size, d_model)
self.transformer = TransformerEncoder(num_layers, d_model, n_head, feedforward_dim, dropout,
after_norm=after_norm, attn_type=attn_type,
scale=scale, dropout_attn=dropout_attn,
pos_embed=pos_embed)
self.fc_dropout = nn.Dropout(fc_dropout)
self.out_fc = nn.Linear(d_model, len(tag_vocab))
trans = allowed_transitions(tag_vocab, include_start_end=True)
self.crf = ConditionalRandomField(len(tag_vocab), include_start_end_trans=True, allowed_transitions=trans)
def _forward(self, chars, target, bigrams=None):
mask = chars.ne(0)
chars = self.embed(chars)
if self.bi_embed is not None:
bigrams = self.bi_embed(bigrams)
chars = torch.cat([chars, bigrams], dim=-1)
chars = self.in_fc(chars)
chars = self.transformer(chars, mask)
chars = self.fc_dropout(chars)
chars = self.out_fc(chars)
logits = F.log_softmax(chars, dim=-1)
if target is None:
paths, _ = self.crf.viterbi_decode(logits, mask)
return {'pred': paths}
else:
loss = self.crf(logits, target, mask)
return {'loss': loss}
def forward(self, chars, target, bigrams=None):
return self._forward(chars, target, bigrams)
def predict(self, chars, bigrams=None):
return self._forward(chars, target=None, bigrams=bigrams)
min_freq = 2
n_heads = 6
head_dims = 80
num_layers = 2
lr = 0.0007
attn_type = 'adatrans'
n_epochs = 100
pos_embed = None
batch_size = 16
warmup_steps = 0.01
after_norm = 1
model_type = 'transformer'
normalize_embed = True
dropout = 0.15
fc_dropout = 0.4
encoding_type = 'bio'
d_model = n_heads * head_dims
dim_feedforward = int(2 * d_model)
chars_vocab = Vocabulary().load('./model/chars_vocab.npy')
chars_vocab_dict = dict(chars_vocab)
embed = StaticEmbedding(chars_vocab,
model_dir_or_name='./data/gigaword_chn.all.a2b.uni.ite50.vec',
min_freq=1, only_norm_found_vector=normalize_embed, word_dropout=0.01, dropout=0.3)
bigrams_vocab = Vocabulary().load('./model/bigrams_vocab.npy')
bigrams_vocab_dict = dict(bigrams_vocab)
bi_embed = StaticEmbedding(bigrams_vocab,
model_dir_or_name='./data/gigaword_chn.all.a2b.bi.ite50.vec',
word_dropout=0.02, dropout=0.3, min_freq=min_freq,
only_norm_found_vector=normalize_embed, only_train_min_freq=True)
chars_unk_index = chars_vocab_dict['<unk>']
bigrams_unk_index = bigrams_vocab_dict['<unk>']
target_vocab = Vocabulary().load('./model/target_vocab.npy')
target_vocab_dict = dict(target_vocab)
target_vocab_dict_reverse = dict({v: k for k, v in target_vocab_dict.items()})
model = TENER(tag_vocab=target_vocab, embed=embed, num_layers=num_layers,
d_model=d_model, n_head=n_heads,
feedforward_dim=dim_feedforward, dropout=dropout,
after_norm=after_norm, attn_type=attn_type,
bi_embed=bi_embed,
fc_dropout=fc_dropout,
pos_embed=pos_embed,
scale=attn_type == 'transformer')
model.load_state_dict(torch.load('./model/ticket_ner_100.pkl'))
chars = ['x', 'x', 'x', 'x']
input_chars = [''.join(['0' if c.isdigit() else c for c in char]) for char in chars]
input_bigrams = [c1 + c2 for c1, c2 in zip(chars, chars[1:] + ['<eos>'])]
tensor_input_chars = torch.Tensor([[chars_vocab_dict[i] if i in chars_vocab_dict else chars_unk_index for i in input_chars]]).long()
tensor_input_bigrams = torch.Tensor([[bigrams_vocab_dict[i] if i in bigrams_vocab_dict else bigrams_unk_index for i in input_bigrams]]).long()
preds = [target_vocab_dict_reverse[i] for i in model.predict(tensor_input_chars, tensor_input_bigrams)['pred'][0].tolist()]
模型成功加载:
预测结果:
|