背景
前文【NLP】文本匹配——Simple and Effective Text Matching with Richer Alignment Features阅读与总结(RE2)已经简要地介绍了RE2的原理,下面就参照着原文及其网上的开源代码进行该模型的复现。为了能够验证模型是否复现成功,并且我更偏向做中文的相关任务,对比开源项目:https://github.com/zhaogaofeng611/TextMatch在对应数据的复现结果,其在测试集上的ACC为:0.8391. 该论文pytorch版源码如下:https://github.com/alibaba-edu/simple-effective-text-matching-pytorch 感觉源码书写的挺好,就阅读了源码以及对其进行适当的修改以适应使用pytorch_lightning的训练模式。大家有兴趣的也可以看看源码。
原文整体来说介绍相对简单,但是在复现中一些细节问题很重要。实现源码已上传到我的github上:https://github.com/Htring/RE2_Text_Similarity_PL.
RE2实现
沿袭以往的实现思路,程序依然分为一下模块:
- 数据处理模块dataloader
- 模型实现模块
- pytorch_lightning 训练封装模块
- 模型训练和使用模块
下面就跟着论文中的介绍来实现该模型。
数据处理模块
数据处理模块与以往很相似,这里就不过多介绍了,直接看源码:
import json
import os
from typing import Optional, List, Dict
import pytorch_lightning as pl
import torch
from torch.utils.data import Dataset, DataLoader
import jieba
def jieba_cut(content: str):
return [word for word in jieba.cut(content) if word]
def char_cut(content: str):
return [char for char in list(content) if char]
class NLIDataSet(Dataset):
def __init__(self, data_list, word2index, tag2index, max_length):
self.word2index = word2index
self.tag2index = tag2index
self.max_length = max_length
self.data_list = self._num_data(data_list)
def _num_data(self, data_list):
num_data_list = []
def num_data(sentence):
_num_data = []
for char in sentence:
_num_data.append(self.word2index.get(char))
if len(sentence) > self.max_length:
_num_data = _num_data[: self.max_length]
else:
_num_data = _num_data + [self.word2index.get("<pad>")] * (self.max_length - len(sentence))
return _num_data
for dict_data in data_list:
sentence1, sentence2 = dict_data["sentence1"], dict_data["sentence2"]
sen1_len, sen2_len = len(sentence1), len(sentence2)
if not (sen2_len and sen1_len):
continue
sentence1_num = num_data(sentence1)
sentence2_num = num_data(sentence2)
num_data_list.append([sentence1_num, sentence2_num, self.tag2index.get(dict_data["gold_label"])])
return num_data_list
def __getitem__(self, index):
return self.data_list[index]
def __len__(self):
return len(self.data_list)
class NLIDataModule(pl.LightningDataModule):
def __init__(self, data_dir="corpus/chinese-snli-c", max_length=50, batch_size=3):
super().__init__()
self.data_path = data_dir
self.batch_size = batch_size
self.max_length = max_length
self.train_data_set, self.dev_data_set, self.test_data_set = None, None, None
self.tag2idx, self.token2index = None, None
self.setup()
def _load_data(self, file_path) -> List[Dict]:
data_list = []
with open(file_path, 'r', encoding='utf8') as reader:
for line in reader:
line = line.strip()
if not line:
continue
json_data: dict = json.loads(line)
json_data["sentence1"] = char_cut(json_data["sentence1"])
json_data["sentence2"] = char_cut(json_data["sentence2"])
data_list.append(json_data)
return data_list
def setup(self, stage: Optional[str] = None) -> None:
train_data_list = self._load_data(os.path.join(self.data_path, "train.txt"))
dev_data_list = self._load_data(os.path.join(self.data_path, "dev.txt"))
test_data_list = self._load_data(os.path.join(self.data_path, "test.txt"))
self.char2idx = {"<pad>": 0, "<unk>": 1}
self.tag2idx = {}
for data_list in [train_data_list, dev_data_list, test_data_list]:
for dict_data in data_list:
for words in [dict_data["sentence1"], dict_data["sentence2"]]:
for word in words:
if word not in self.char2idx:
self.char2idx[word] = len(self.char2idx)
if dict_data["gold_label"] not in self.tag2idx:
self.tag2idx[dict_data['gold_label']] = len(self.tag2idx)
self.idx2char = {index: char for char, index in self.char2idx.items()}
self.idx2tag = {index: value for value, index in self.tag2idx.items()}
self.tag_size = len(self.tag2idx)
self.vocab_size = len(self.char2idx)
self.train_data_set = NLIDataSet(train_data_list, self.char2idx, self.tag2idx, self.max_length)
self.dev_data_set = NLIDataSet(dev_data_list, self.char2idx, self.tag2idx, self.max_length)
self.test_data_set = NLIDataSet(test_data_list, self.char2idx, self.tag2idx, self.max_length)
@staticmethod
def collate_fn(batch):
sen1, sen2, y = [], [], []
for simple in batch:
sen1.append(simple[0])
sen2.append(simple[1])
y.append(simple[-1])
sen1_t = torch.tensor(sen1, dtype=torch.long)
sen2_t = torch.tensor(sen2, dtype=torch.long)
y_t = torch.tensor(y, dtype=torch.long)
return sen1_t, sen2_t, y_t
def train_dataloader(self):
return DataLoader(self.train_data_set, batch_size=self.batch_size, collate_fn=self.collate_fn)
def test_dataloader(self):
return DataLoader(self.test_data_set, batch_size=self.batch_size, collate_fn=self.collate_fn)
def val_dataloader(self):
return DataLoader(self.dev_data_set, batch_size=self.batch_size, collate_fn=self.collate_fn)
def save_dict(self, data_dir):
with open(os.path.join(data_dir, "index2tag.txt"), 'w', encoding='utf8') as writer:
json.dump(self.idx2tag, writer, ensure_ascii=False)
with open(os.path.join(data_dir, "token2index.txt"), 'w', encoding='utf8') as writer:
json.dump(self.char2idx, writer, ensure_ascii=False)
模型实现模块
由于论文中很多模块会有多种处理方式,源码采用注册的方式去获取对应的模块。这种方式算是一种设计模块吧,值得学习一下。该种方式借助了一个装饰器函数,实现如下:
def register(name=None, registry=None):
"""
将某个函数获这某个类注册到某各地方,装饰器函数
:param name: 注册的函数别名
:param registry: 注册保存的对象
:return: registered fun
"""
def decorator(fn, registration_name=None):
module_name = registration_name or fn.__name__
if module_name in registry:
raise LookupError(f"module {module_name} already registered.")
registry[module_name] = fn
return fn
return lambda fn: decorator(fn, name)
第一层是比较常规的处理方式,即将文本转成embedding。文中使用的是基于word embedding进行输入,即论文中语句的切分方式是词语。个人觉得可以根据不同语言特征选取合适的语句切分方式。这里先记作
x
(
0
)
x^{(0)}
x(0)。我们也可以根据实际情形对embedding层进行改写,如下:
from collections import OrderedDict
import torch
from torch import nn
import torch.nn.functional as F
class Embedding(nn.Module):
__doc__ = """ 改写的embedding """
def __init__(self, args):
super().__init__()
self.fix_embeddings = args.fix_embeddings
self.embedding = nn.Embedding(args.num_vocab, args.embedding_dim, padding_idx=0)
self.dropout = args.dropout
def set_(self, value):
self.embedding.weight.requires_grad = not self.fix_embeddings
self.embedding.load_state_dict(OrderedDict({'weight': torch.tensor(value)}))
def forward(self, x):
x = self.embedding(x)
x = F.dropout(x, self.dropout, self.training)
return x
源码中使用的激活函数也都是Gelu,也对线性函数进行重写,如下:
class GeLU(nn.Module):
__doc__ = """ gelu激活函数 """
def forward(self, x: torch.Tensor) -> torch.Tensor:
return 0.5 * x * (1. + torch.tanh(x * 0.7978845608 * (1. + 0.044715 * x * x)))
class Linear(nn.Module):
__doc__ = """ 改写的Linear层 """
def __init__(self, in_features:int, out_features:int, activations=False):
super().__init__()
linear = nn.Linear(in_features, out_features)
nn.init.normal_(linear.weight, std=math.sqrt((2. if activations else 1.) / in_features))
nn.init.zeros_(linear.bias)
modules = [nn.utils.weight_norm(linear)]
if activations:
modules.append(GeLU())
self.model = nn.Sequential(*modules)
def forward(self, x:torch.Tensor) -> torch.Tensor:
return self.model(x)
每一个block的第一件事就是使用一个encoder去计算这个序列的上下文特征。接着把encoder的输入与输出拼接起来送入到对齐层。该encoder使用的是多层卷积网络。第
n
n
n个block的输入与输出先定义如下: 输入:
x
(
n
)
=
(
x
1
(
n
)
,
x
2
(
n
)
,
?
?
,
x
l
(
n
)
)
x^{(n)}=(x_1^{(n)},x_2^{(n)},\cdots,x_l^{(n)})
x(n)=(x1(n)?,x2(n)?,?,xl(n)?),输出
o
(
n
)
=
(
o
1
(
n
)
,
o
2
(
n
)
,
?
?
,
o
l
(
n
)
)
o^{(n)}=(o_1^{(n)},o_2^{(n)},\cdots,o_l^{(n)})
o(n)=(o1(n)?,o2(n)?,?,ol(n)?)。其中
o
(
0
)
o^{(0)}
o(0)设置为0向量。第1个block的输入只是
x
(
0
)
x^{(0)}
x(0),从第二个开始,即
n
≥
2
n\geq2
n≥2开始,block的输入需要满足如下公式:
x
i
(
n
)
=
[
x
i
(
1
)
;
o
i
(
n
?
1
)
+
o
i
(
n
?
2
)
]
x_{i}^{(n)}=\left[x_{i}^{(1)} ; o_{i}^{(n-1)}+o_{i}^{(n-2)}\right]
xi(n)?=[xi(1)?;oi(n?1)?+oi(n?2)?] 编码器使用的是卷积神经网络去实现的,为了能够适应多个卷积核,以及多个卷积核卷积后的维度要保持一致,这里也对一维卷积进行了改写,相关实现如下:
class Conv1d(nn.Module):
__doc__ = """ 改写的一维卷积 """
def __init__(self, in_channels, out_channels, kernel_sizes: Collection[int]):
super().__init__()
assert all(k % 2 == 1 for k in kernel_sizes), 'only support odd kernel sizes'
assert out_channels % len(kernel_sizes) == 0, 'out channels must be dividable by kernels'
out_channels = out_channels // len(kernel_sizes)
convs = []
for kernel_size in kernel_sizes:
conv = nn.Conv1d(in_channels,
out_channels,
kernel_size,
padding=(kernel_size - 1) // 2)
nn.init.normal_(conv.weight, std=math.sqrt(2. / (in_channels * kernel_size)))
nn.init.zeros_(conv.bias)
convs.append(nn.Sequential(nn.utils.weight_norm(conv), GeLU()))
self.model = nn.ModuleList(convs)
def forward(self, x):
return torch.cat([encoder(x) for encoder in self.model], dim=-1)
编码器实现如下:
class Encoder(nn.Module):
__doc__ = """ 编码器 """
def __init__(self, args, input_size):
super().__init__()
self.dropout = args.dropout
self.encoders = nn.ModuleList(
[
Conv1d(in_channels=input_size if i == 0 else args.hidden_size,
out_channels=args.hidden_size,
kernel_sizes=args.kernel_sizes) for i in range(args.enc_layers)
]
)
def forward(self, x: torch.Tensor, mask: torch.Tensor):
x = x.transpose(1, 2)
mask = mask.transpose(1, 2)
for i, encoder in enumerate(self.encoders):
x.masked_fill_(~mask, 0.)
if i > 0:
x = F.dropout(x, self.dropout, self.training)
x = encoder(x)
x = F.dropout(x, self.dropout, self.training)
return x.transpose(1, 2)
需要说明的是,第一个block的输入维度与后面的block输入不同。 文中也在很多地方加上了dropout避免过拟合。
然后就使用多种方式串行连接
N
N
N个参数独立,结构相同的block进行特征提取。block之间的连接方式有多种选择:
实现代码如下:
import math
import torch
from torch import nn
from functools import partial
from .utils import register
from . import Linear
registry = {}
register = partial(register, registry=registry)
@register('none')
class NullConnection(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x, _, __):
return x
@register("residual")
class Residual(nn.Module):
def __init__(self, args):
super().__init__()
self.linear = Linear(args.embedding_dim, args.hidden_size)
def forward(self, x: torch.Tensor, res: torch.Tensor, index: int):
if index == 1:
res = self.linear(res)
return (x + res) * math.sqrt(0.5)
@register('aug')
class AugmentedResidual(nn.Module):
def __init__(self, _):
super().__init__()
def forward(self, x: torch.Tensor, res: torch.Tensor, index: int):
if index == 1:
return torch.cat([x, res], dim=-1)
hidden_size = x.size(-1)
x = (res[:, :, : hidden_size] + x) * math.sqrt(0.5)
return torch.cat([x, res[:, :, hidden_size:]], dim=-1)
普通的残差连接则需要将
x
(
0
)
x^{(0)}
x(0)转换成增强残差连接所需要的维度即可,可以使用一个线性函数进行转换。这里还进行了平方均值操作。 block之间的连接是从第一个block以后才开始使用。还一直把原始的embedding放到拼接向量的最后。
编码结束之后就是一个重要环节,语义特征对齐。
alignment layer 对齐层
对齐层则是处理两个sentence,使用类似于attention的方式进行特征相关性计算。fusion layer再把alignment的输入与输出进行融合。原文提到在对两个语句进行度相似度计算方法有两个,一个是identity function一个是一个单层的前向传播网络。这个identity function就是使用向量之间的点乘,一个单层的前向传播则可以选取一个全连接网络即可。对齐层需要是对两个sentence进行对齐,对应一个sentence的输入则是:
import math
import torch
from torch import nn
import torch.nn.functional as F
from functools import partial
from .utils import register
from . import Linear
registry = {}
register = partial(register, registry=registry)
@register("identity")
class Alignment(nn.Module):
def __init__(self, args, _):
super().__init__()
self.temperature = nn.Parameter(torch.tensor(1 / math.sqrt(args.hidden_size)))
def _attention(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
return torch.matmul(a, b.transpose(1, 2)) * self.temperature
def forward(self, a: torch.Tensor, b: torch.Tensor, mask_a: torch.Tensor, mask_b: torch.Tensor):
attention = self._attention(a, b)
mask = torch.matmul(mask_a.float(), mask_b.transpose(1, 2).float())
mask = mask.bool()
attention.masked_fill_(~mask, -1e4)
attention_a = F.softmax(attention, dim=1)
attention_b = F.softmax(attention, dim=2)
feature_a = torch.matmul(attention_b, b)
feature_b = torch.matmul(attention_a, a)
return feature_a, feature_b
@register("linear")
class MappedAlignment(Alignment):
def __init__(self, args, input_size):
super().__init__(args, input_size)
self.projection = nn.Sequential(
nn.Dropout(args.dropout),
Linear(input_size, args.hidden_size, activations=True)
)
def _attention(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
a = self.projection(a)
b = self.projection(b)
return super()._attention(a, b)
其中在计算attention权重时还进行了缩放操作。
得到对齐之后的特征向量之后就需要将对齐前的向量与对齐后的向量进行融合。
fusion layer 融合层
fusion的输出就是一个block的输出,要么作为下一个block的输入,要么就作为pooling layer的输入。融合程序如下:
import torch
from torch import nn
from functools import partial
from .utils import register
from . import Linear
import torch.nn.functional as F
registry = {}
register = partial(register, registry=registry)
@register('simple')
class Fusion(nn.Module):
def __init__(self, args, input_size):
super().__init__()
self.fusion = Linear(input_size * 2, args.hidden_size, activations=True)
def forward(self, x, align):
return self.fusion(torch.cat([x, align], dim=-1))
@register('full')
class FulFusion(nn.Module):
def __init__(self, args, input_size):
super().__init__()
self.dropout = args.dropout
self.fusion1 = Linear(input_size*2, args.hidden_size, activations=True)
self.fusion2 = Linear(input_size*2, args.hidden_size, activations=True)
self.fusion3 = Linear(input_size*2, args.hidden_size, activations=True)
self.fusion = Linear(args.hidden_size * 3, args.hidden_size, activations=True)
def forward(self, x: torch.Tensor, align: torch.Tensor):
g1 = self.fusion1(torch.cat([x, align], dim=-1))
g2 = self.fusion2(torch.cat([x, x - align], dim=-1))
g3 = self.fusion3(torch.cat([x, x * align], dim=-1))
g = F.dropout(torch.cat([g1, g2, g3], dim=-1), self.dropout, self.training)
return self.fusion(g)
在融合的过程中需要注意不同block 在融合时输入与输入数据维度的差别。
pooling layer 池化层
池化层则是将block的输出,转成固定长度的向量,然后送入到prediction layer。这个相对简单,实现如下:
from torch import nn
import torch
class Pooling(nn.Module):
def forward(self, x:torch.Tensor, mask: torch.Tensor):
return x.masked_fill_(~mask, -float('inf')).max(dim=1)[0]
在将特征向量池化处理之后,就可以将两个语句的特征向量送入预测层,语句之间的特征以及提取完毕,开始对两个语句进行预测了。
prediction layer 预测层
两个语句在上面各层计算的特征向量后,就可以构建一个预测层进行一个分类任务了。原论文也提出了多种向量拼接方式进行的prediction,可以看看下面的代码:
import torch
from torch import nn
from functools import partial
from . import Linear
from .utils import register
registry = {}
register = partial(register, registry=registry)
@register('simple')
class Prediction(nn.Module):
def __init__(self, args, input_features=2):
super().__init__()
self.dense = nn.Sequential(
nn.Dropout(args.dropout),
Linear(args.hidden_size * input_features, args.hidden_size, activations=True),
nn.Dropout(args.dropout),
Linear(args.hidden_size, args.num_classes),
)
def forward(self, a: torch.Tensor, b: torch.Tensor):
return self.dense(torch.cat([a, b], dim=-1))
@register('full')
class AdvancedPrediction(Prediction):
def __init__(self, args):
super().__init__(args, input_features=4)
def forward(self, a: torch.Tensor, b: torch.Tensor):
return self.dense(torch.cat([a, b, a - b, a * b], dim=-1))
@register('symmetric')
class SymmetricPrediction(AdvancedPrediction):
def forward(self, a: torch.Tensor, b: torch.Tensor):
return self.dense(torch.cat([a, b, (a - b).abs(), a * b], dim=-1))
pytorch_lightning 训练封装模块
以往的程序已经做过很多介绍,这里就不在赘述了,查看源码即可。
模型训练和使用模块
模型训练过程中未使用预训练的字符向量,以及训练30个epoch之后在测试集中的效果如下:
Testing: 100%|██████████| 42/42 [00:17<00:00, 2.49it/s]
precision recall f1-score support
0 0.77 0.95 0.85 6250
1 0.93 0.72 0.81 6250
accuracy 0.83 12500
macro avg 0.85 0.83 0.83 12500
weighted avg 0.85 0.83 0.83 12500
--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'accuracy': 0.8340799808502197,
'f1_score': 0.8340800404548645,
'recall': 0.8340799808502197,
'val_loss': 0.5524728894233704}
--------------------------------------------------------------------------------
Testing: 100%|██████████| 42/42 [00:18<00:00, 2.28it/s]
虽然acc在0.834与0.839有一点差距,但总体来说复现了。除此之外,在原论文中没有该语料的比较基准。
总结
论文总体结构比较清晰,模型设计的与ESIM有一些相似,可以在一定程度上看成是对ESIM的改良,ESIM使用的BiLSTM进行特征提取,该论文使用的是CNN。除论文之外,当看到论文复现的代码时,才会发现在理论是一方面,代码再实现过程中的用到的一些小trick也值得我们去学习,去积累。这是一篇2019年的论文,transformer在2018年已经发布了。在论文也看到了一些类似的地方。后面我就开始书写transformer以及bert相关的内容。敬请期待!!!
|