IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 人工智能 -> 【NLP】文本匹配——Simple and Effective Text Matching with Richer Alignment Features(RE2)模型实现 -> 正文阅读

[人工智能]【NLP】文本匹配——Simple and Effective Text Matching with Richer Alignment Features(RE2)模型实现

背景

前文【NLP】文本匹配——Simple and Effective Text Matching with Richer Alignment Features阅读与总结(RE2)已经简要地介绍了RE2的原理,下面就参照着原文及其网上的开源代码进行该模型的复现。为了能够验证模型是否复现成功,并且我更偏向做中文的相关任务,对比开源项目:https://github.com/zhaogaofeng611/TextMatch在对应数据的复现结果,其在测试集上的ACC为:0.8391.
该论文pytorch版源码如下:https://github.com/alibaba-edu/simple-effective-text-matching-pytorch
感觉源码书写的挺好,就阅读了源码以及对其进行适当的修改以适应使用pytorch_lightning的训练模式。大家有兴趣的也可以看看源码。

原文整体来说介绍相对简单,但是在复现中一些细节问题很重要。实现源码已上传到我的github上:https://github.com/Htring/RE2_Text_Similarity_PL.

RE2实现

沿袭以往的实现思路,程序依然分为一下模块:

  • 数据处理模块dataloader
  • 模型实现模块
  • pytorch_lightning 训练封装模块
  • 模型训练和使用模块

下面就跟着论文中的介绍来实现该模型。

数据处理模块

数据处理模块与以往很相似,这里就不过多介绍了,直接看源码:

import json
import os
from typing import Optional, List, Dict
import pytorch_lightning as pl
import torch
from torch.utils.data import Dataset, DataLoader
import jieba


def jieba_cut(content: str):
    return [word for word in jieba.cut(content) if word]


def char_cut(content: str):
    return [char for char in list(content) if char]


class NLIDataSet(Dataset):

    def __init__(self, data_list, word2index, tag2index, max_length):
        self.word2index = word2index
        self.tag2index = tag2index
        self.max_length = max_length
        self.data_list = self._num_data(data_list)

    def _num_data(self, data_list):
        num_data_list = []

        def num_data(sentence):
            _num_data = []
            for char in sentence:
                _num_data.append(self.word2index.get(char))
            if len(sentence) > self.max_length:
                _num_data = _num_data[: self.max_length]
            else:
                _num_data = _num_data + [self.word2index.get("<pad>")] * (self.max_length - len(sentence))
            return _num_data

        for dict_data in data_list:
            sentence1, sentence2 = dict_data["sentence1"], dict_data["sentence2"]
            sen1_len, sen2_len = len(sentence1), len(sentence2)
            # 有一个为空时跳过
            if not (sen2_len and sen1_len):
                continue
            sentence1_num = num_data(sentence1)
            sentence2_num = num_data(sentence2)
            num_data_list.append([sentence1_num, sentence2_num, self.tag2index.get(dict_data["gold_label"])])
        return num_data_list

    def __getitem__(self, index):
        return self.data_list[index]

    def __len__(self):
        return len(self.data_list)


class NLIDataModule(pl.LightningDataModule):

    def __init__(self, data_dir="corpus/chinese-snli-c", max_length=50, batch_size=3):
        super().__init__()
        self.data_path = data_dir
        self.batch_size = batch_size
        self.max_length = max_length
        self.train_data_set, self.dev_data_set, self.test_data_set = None, None, None
        self.tag2idx, self.token2index = None, None
        self.setup()

    def _load_data(self, file_path) -> List[Dict]:
        data_list = []
        with open(file_path, 'r', encoding='utf8') as reader:
            for line in reader:
                line = line.strip()
                if not line:
                    continue
                json_data: dict = json.loads(line)
                json_data["sentence1"] = char_cut(json_data["sentence1"])
                json_data["sentence2"] = char_cut(json_data["sentence2"])
                data_list.append(json_data)
        return data_list

    def setup(self, stage: Optional[str] = None) -> None:
        train_data_list = self._load_data(os.path.join(self.data_path, "train.txt"))
        dev_data_list = self._load_data(os.path.join(self.data_path, "dev.txt"))
        test_data_list = self._load_data(os.path.join(self.data_path, "test.txt"))

        self.char2idx = {"<pad>": 0, "<unk>": 1}
        self.tag2idx = {}
        for data_list in [train_data_list, dev_data_list, test_data_list]:
            for dict_data in data_list:
                for words in [dict_data["sentence1"], dict_data["sentence2"]]:
                    for word in words:
                        if word not in self.char2idx:
                            self.char2idx[word] = len(self.char2idx)
                if dict_data["gold_label"] not in self.tag2idx:
                    self.tag2idx[dict_data['gold_label']] = len(self.tag2idx)

        self.idx2char = {index: char for char, index in self.char2idx.items()}
        self.idx2tag = {index: value for value, index in self.tag2idx.items()}
        self.tag_size = len(self.tag2idx)
        self.vocab_size = len(self.char2idx)
        self.train_data_set = NLIDataSet(train_data_list, self.char2idx, self.tag2idx, self.max_length)
        self.dev_data_set = NLIDataSet(dev_data_list, self.char2idx, self.tag2idx, self.max_length)
        self.test_data_set = NLIDataSet(test_data_list, self.char2idx, self.tag2idx, self.max_length)

    @staticmethod
    def collate_fn(batch):
        sen1, sen2, y = [], [], []
        for simple in batch:
            sen1.append(simple[0])
            sen2.append(simple[1])
            y.append(simple[-1])
        sen1_t = torch.tensor(sen1, dtype=torch.long)
        sen2_t = torch.tensor(sen2, dtype=torch.long)
        y_t = torch.tensor(y, dtype=torch.long)
        return sen1_t, sen2_t, y_t

    def train_dataloader(self):
        return DataLoader(self.train_data_set, batch_size=self.batch_size, collate_fn=self.collate_fn)

    def test_dataloader(self):
        return DataLoader(self.test_data_set, batch_size=self.batch_size, collate_fn=self.collate_fn)

    def val_dataloader(self):
        return DataLoader(self.dev_data_set, batch_size=self.batch_size, collate_fn=self.collate_fn)

    def save_dict(self, data_dir):
        with open(os.path.join(data_dir, "index2tag.txt"), 'w', encoding='utf8') as writer:
            json.dump(self.idx2tag, writer, ensure_ascii=False)

        with open(os.path.join(data_dir, "token2index.txt"), 'w', encoding='utf8') as writer:
            json.dump(self.char2idx, writer, ensure_ascii=False)

模型实现模块

由于论文中很多模块会有多种处理方式,源码采用注册的方式去获取对应的模块。这种方式算是一种设计模块吧,值得学习一下。该种方式借助了一个装饰器函数,实现如下:

def register(name=None, registry=None):
    """
    将某个函数获这某个类注册到某各地方,装饰器函数
    :param name: 注册的函数别名
    :param registry: 注册保存的对象
    :return: registered fun
    """
    def decorator(fn, registration_name=None):
        module_name = registration_name or fn.__name__
        if module_name in registry:
            raise LookupError(f"module {module_name} already registered.")
        registry[module_name] = fn
        return fn

    return lambda fn: decorator(fn, name)

第一层是比较常规的处理方式,即将文本转成embedding。文中使用的是基于word embedding进行输入,即论文中语句的切分方式是词语。个人觉得可以根据不同语言特征选取合适的语句切分方式。这里先记作 x ( 0 ) x^{(0)} x(0)。我们也可以根据实际情形对embedding层进行改写,如下:

from collections import OrderedDict
import torch
from torch import nn
import torch.nn.functional as F


class Embedding(nn.Module):

    __doc__ = """ 改写的embedding """

    def __init__(self, args):
        super().__init__()
        self.fix_embeddings = args.fix_embeddings
        self.embedding = nn.Embedding(args.num_vocab, args.embedding_dim, padding_idx=0)
        self.dropout = args.dropout

    def set_(self, value):
        self.embedding.weight.requires_grad = not self.fix_embeddings
        self.embedding.load_state_dict(OrderedDict({'weight': torch.tensor(value)}))

    def forward(self, x):
        x = self.embedding(x)
        x = F.dropout(x, self.dropout, self.training)
        return x

源码中使用的激活函数也都是Gelu,也对线性函数进行重写,如下:

class GeLU(nn.Module):
    __doc__ = """ gelu激活函数 """

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return 0.5 * x * (1. + torch.tanh(x * 0.7978845608 * (1. + 0.044715 * x * x)))


class Linear(nn.Module):
    __doc__ = """ 改写的Linear层 """

    def __init__(self, in_features:int, out_features:int, activations=False):
        super().__init__()
        linear = nn.Linear(in_features, out_features)
        nn.init.normal_(linear.weight, std=math.sqrt((2. if activations else 1.) / in_features))
        nn.init.zeros_(linear.bias)
        modules = [nn.utils.weight_norm(linear)]
        if activations:
            modules.append(GeLU())
        self.model = nn.Sequential(*modules)

    def forward(self, x:torch.Tensor) -> torch.Tensor:
        return self.model(x)

每一个block的第一件事就是使用一个encoder去计算这个序列的上下文特征。接着把encoder的输入与输出拼接起来送入到对齐层。该encoder使用的是多层卷积网络。第 n n n个block的输入与输出先定义如下:
输入: x ( n ) = ( x 1 ( n ) , x 2 ( n ) , ? ? , x l ( n ) ) x^{(n)}=(x_1^{(n)},x_2^{(n)},\cdots,x_l^{(n)}) x(n)=(x1(n)?,x2(n)?,?,xl(n)?),输出 o ( n ) = ( o 1 ( n ) , o 2 ( n ) , ? ? , o l ( n ) ) o^{(n)}=(o_1^{(n)},o_2^{(n)},\cdots,o_l^{(n)}) o(n)=(o1(n)?,o2(n)?,?,ol(n)?)。其中 o ( 0 ) o^{(0)} o(0)设置为0向量。第1个block的输入只是 x ( 0 ) x^{(0)} x(0),从第二个开始,即 n ≥ 2 n\geq2 n2开始,block的输入需要满足如下公式:
x i ( n ) = [ x i ( 1 ) ; o i ( n ? 1 ) + o i ( n ? 2 ) ] x_{i}^{(n)}=\left[x_{i}^{(1)} ; o_{i}^{(n-1)}+o_{i}^{(n-2)}\right] xi(n)?=[xi(1)?;oi(n?1)?+oi(n?2)?]
编码器使用的是卷积神经网络去实现的,为了能够适应多个卷积核,以及多个卷积核卷积后的维度要保持一致,这里也对一维卷积进行了改写,相关实现如下:

class Conv1d(nn.Module):
    __doc__ = """ 改写的一维卷积 """

    def __init__(self, in_channels, out_channels, kernel_sizes: Collection[int]):
        super().__init__()
        assert all(k % 2 == 1 for k in kernel_sizes), 'only support odd kernel sizes'
        assert out_channels % len(kernel_sizes) == 0, 'out channels must be dividable by kernels'
        out_channels = out_channels // len(kernel_sizes)
        convs = []
        for kernel_size in kernel_sizes:
            conv = nn.Conv1d(in_channels,
                             out_channels,
                             kernel_size,
                             padding=(kernel_size - 1) // 2)
            nn.init.normal_(conv.weight, std=math.sqrt(2. / (in_channels * kernel_size)))
            nn.init.zeros_(conv.bias)
            convs.append(nn.Sequential(nn.utils.weight_norm(conv), GeLU()))
        self.model = nn.ModuleList(convs)

    def forward(self, x):
        return torch.cat([encoder(x) for encoder in self.model], dim=-1)

编码器实现如下:

class Encoder(nn.Module):

    __doc__ = """ 编码器 """

    def __init__(self, args, input_size):
        super().__init__()
        self.dropout = args.dropout
        self.encoders = nn.ModuleList(
            [
                Conv1d(in_channels=input_size if i == 0 else args.hidden_size,
                       out_channels=args.hidden_size,
                       kernel_sizes=args.kernel_sizes) for i in range(args.enc_layers)
            ]
        )

    def forward(self, x: torch.Tensor, mask: torch.Tensor):
        x = x.transpose(1, 2)  # BxCxL
        mask = mask.transpose(1, 2)
        for i, encoder in enumerate(self.encoders):
            x.masked_fill_(~mask, 0.)
            if i > 0:
                x = F.dropout(x, self.dropout, self.training)
            x = encoder(x)
        x = F.dropout(x, self.dropout, self.training)
        return x.transpose(1, 2)  # BxLxC

需要说明的是,第一个block的输入维度与后面的block输入不同。 文中也在很多地方加上了dropout避免过拟合。

然后就使用多种方式串行连接 N N N个参数独立,结构相同的block进行特征提取。block之间的连接方式有多种选择:

  • 残差连接
  • 增强残差连接

实现代码如下:

import math
import torch
from torch import nn
from functools import partial
from .utils import register
from . import Linear

registry = {}
register = partial(register, registry=registry)


@register('none')
class NullConnection(nn.Module):

    def __init__(self):
        super().__init__()

    def forward(self, x, _, __):
        return x


@register("residual")
class Residual(nn.Module):

    def __init__(self, args):
        super().__init__()
        self.linear = Linear(args.embedding_dim, args.hidden_size)

    def forward(self, x: torch.Tensor, res: torch.Tensor, index: int):
        if index == 1:
            res = self.linear(res)
        return (x + res) * math.sqrt(0.5)


@register('aug')
class AugmentedResidual(nn.Module):

    def __init__(self, _):
        super().__init__()

    def forward(self, x: torch.Tensor, res: torch.Tensor, index: int):
        if index == 1:
            return torch.cat([x, res], dim=-1)  # res is embedding
        hidden_size = x.size(-1)
        x = (res[:, :, : hidden_size] + x) * math.sqrt(0.5)
        return torch.cat([x, res[:, :, hidden_size:]], dim=-1)  # latter half of res is embedding

普通的残差连接则需要将 x ( 0 ) x^{(0)} x(0)转换成增强残差连接所需要的维度即可,可以使用一个线性函数进行转换。这里还进行了平方均值操作。 block之间的连接是从第一个block以后才开始使用。还一直把原始的embedding放到拼接向量的最后。

编码结束之后就是一个重要环节,语义特征对齐。

alignment layer 对齐层

对齐层则是处理两个sentence,使用类似于attention的方式进行特征相关性计算。fusion layer再把alignment的输入与输出进行融合。原文提到在对两个语句进行度相似度计算方法有两个,一个是identity function一个是一个单层的前向传播网络。这个identity function就是使用向量之间的点乘,一个单层的前向传播则可以选取一个全连接网络即可。对齐层需要是对两个sentence进行对齐,对应一个sentence的输入则是:

import math
import torch
from torch import nn
import torch.nn.functional as F
from functools import partial
from .utils import register
from . import Linear

registry = {}
# 将register中registry参数值固定为registry
register = partial(register, registry=registry)


@register("identity")
class Alignment(nn.Module):

    def __init__(self, args, _):
        super().__init__()
        self.temperature = nn.Parameter(torch.tensor(1 / math.sqrt(args.hidden_size)))

    def _attention(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
        return torch.matmul(a, b.transpose(1, 2)) * self.temperature

    def forward(self, a: torch.Tensor, b: torch.Tensor, mask_a: torch.Tensor, mask_b: torch.Tensor):
        attention = self._attention(a, b)
        mask = torch.matmul(mask_a.float(), mask_b.transpose(1, 2).float())
        mask = mask.bool()
        attention.masked_fill_(~mask, -1e4)
        attention_a = F.softmax(attention, dim=1)
        attention_b = F.softmax(attention, dim=2)
        feature_a = torch.matmul(attention_b, b)
        feature_b = torch.matmul(attention_a, a)
        return feature_a, feature_b


@register("linear")
class MappedAlignment(Alignment):

    def __init__(self, args, input_size):
        super().__init__(args, input_size)
        self.projection = nn.Sequential(
            nn.Dropout(args.dropout),
            Linear(input_size, args.hidden_size, activations=True)
        )

    def _attention(self, a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
        a = self.projection(a)
        b = self.projection(b)
        return super()._attention(a, b)

其中在计算attention权重时还进行了缩放操作。

得到对齐之后的特征向量之后就需要将对齐前的向量与对齐后的向量进行融合。

fusion layer 融合层

fusion的输出就是一个block的输出,要么作为下一个block的输入,要么就作为pooling layer的输入。融合程序如下:

import torch
from torch import nn
from functools import partial
from .utils import register
from . import Linear
import torch.nn.functional as F

registry = {}
register = partial(register, registry=registry)


@register('simple')
class Fusion(nn.Module):

    def __init__(self, args, input_size):
        super().__init__()
        self.fusion = Linear(input_size * 2, args.hidden_size, activations=True)

    def forward(self, x, align):
        return self.fusion(torch.cat([x, align], dim=-1))


@register('full')
class FulFusion(nn.Module):
    def __init__(self, args, input_size):
        super().__init__()
        self.dropout = args.dropout
        self.fusion1 = Linear(input_size*2, args.hidden_size, activations=True)
        self.fusion2 = Linear(input_size*2, args.hidden_size, activations=True)
        self.fusion3 = Linear(input_size*2, args.hidden_size, activations=True)
        self.fusion = Linear(args.hidden_size * 3, args.hidden_size, activations=True)

    def forward(self, x: torch.Tensor, align: torch.Tensor):
        g1 = self.fusion1(torch.cat([x, align], dim=-1))
        g2 = self.fusion2(torch.cat([x, x - align], dim=-1))
        g3 = self.fusion3(torch.cat([x, x * align], dim=-1))
        g = F.dropout(torch.cat([g1, g2, g3], dim=-1), self.dropout, self.training)
        return self.fusion(g)

在融合的过程中需要注意不同block 在融合时输入与输入数据维度的差别。

pooling layer 池化层

池化层则是将block的输出,转成固定长度的向量,然后送入到prediction layer。这个相对简单,实现如下:

from torch import nn
import torch

class Pooling(nn.Module):

    def forward(self, x:torch.Tensor, mask: torch.Tensor):
        return x.masked_fill_(~mask, -float('inf')).max(dim=1)[0]

在将特征向量池化处理之后,就可以将两个语句的特征向量送入预测层,语句之间的特征以及提取完毕,开始对两个语句进行预测了。

prediction layer 预测层

两个语句在上面各层计算的特征向量后,就可以构建一个预测层进行一个分类任务了。原论文也提出了多种向量拼接方式进行的prediction,可以看看下面的代码:

import torch
from torch import nn
from functools import partial
from . import Linear
from .utils import register

registry = {}
register = partial(register, registry=registry)


@register('simple')
class Prediction(nn.Module):

    def __init__(self, args, input_features=2):
        super().__init__()
        self.dense = nn.Sequential(
            nn.Dropout(args.dropout),
            Linear(args.hidden_size * input_features, args.hidden_size, activations=True),
            nn.Dropout(args.dropout),
            Linear(args.hidden_size, args.num_classes),
        )

    def forward(self, a: torch.Tensor, b: torch.Tensor):
        return self.dense(torch.cat([a, b], dim=-1))


@register('full')
class AdvancedPrediction(Prediction):

    def __init__(self, args):
        super().__init__(args, input_features=4)

    def forward(self, a: torch.Tensor, b: torch.Tensor):
        return self.dense(torch.cat([a, b, a - b, a * b], dim=-1))


@register('symmetric')
class SymmetricPrediction(AdvancedPrediction):

    def forward(self, a: torch.Tensor, b: torch.Tensor):
        return self.dense(torch.cat([a, b, (a - b).abs(), a * b], dim=-1))

pytorch_lightning 训练封装模块

以往的程序已经做过很多介绍,这里就不在赘述了,查看源码即可。

模型训练和使用模块

模型训练过程中未使用预训练的字符向量,以及训练30个epoch之后在测试集中的效果如下:

Testing: 100%|██████████| 42/42 [00:17<00:00,  2.49it/s]
              precision    recall  f1-score   support

           0       0.77      0.95      0.85      6250
           1       0.93      0.72      0.81      6250

    accuracy                           0.83     12500
   macro avg       0.85      0.83      0.83     12500
weighted avg       0.85      0.83      0.83     12500

--------------------------------------------------------------------------------
DATALOADER:0 TEST RESULTS
{'accuracy': 0.8340799808502197,
 'f1_score': 0.8340800404548645,
 'recall': 0.8340799808502197,
 'val_loss': 0.5524728894233704}
--------------------------------------------------------------------------------
Testing: 100%|██████████| 42/42 [00:18<00:00,  2.28it/s]

虽然acc在0.834与0.839有一点差距,但总体来说复现了。除此之外,在原论文中没有该语料的比较基准。

总结

论文总体结构比较清晰,模型设计的与ESIM有一些相似,可以在一定程度上看成是对ESIM的改良,ESIM使用的BiLSTM进行特征提取,该论文使用的是CNN。除论文之外,当看到论文复现的代码时,才会发现在理论是一方面,代码再实现过程中的用到的一些小trick也值得我们去学习,去积累。这是一篇2019年的论文,transformer在2018年已经发布了。在论文也看到了一些类似的地方。后面我就开始书写transformer以及bert相关的内容。敬请期待!!!

  人工智能 最新文章
2022吴恩达机器学习课程——第二课(神经网
第十五章 规则学习
FixMatch: Simplifying Semi-Supervised Le
数据挖掘Java——Kmeans算法的实现
大脑皮层的分割方法
【翻译】GPT-3是如何工作的
论文笔记:TEACHTEXT: CrossModal Generaliz
python从零学(六)
详解Python 3.x 导入(import)
【答读者问27】backtrader不支持最新版本的
上一篇文章      下一篇文章      查看所有文章
加:2022-05-07 11:11:00  更:2022-05-07 11:12:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/4 15:54:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码