系列文章
-
如何从大型模型(BART)fine tune一个小模型及代码实现 -
文本自动摘要评价方法-金字塔方法 -
pytorch 使用BART模型进行中文自动摘要
摘要
fine-tune BART模型实现中文自动摘要 如何fine-tune BART模型参见系列文章1 博文提供了数据集和训练好的模型,从结果可以看出,模型学习到了摘要的能力,但是选择适当的位置进行终止,能力较差。
实现
数据准备
首先安装相应的包
! python --version
! pip install datasets transformers rouge-score jieba
! pip install --upgrade ipywidgets tqdm
! jupyter nbextension enable --py widgetsnbextension
! pip install fire
导入包和数据集 数据集见我上传的:
from ipywidgets import IntProgress
import tqdm
from datasets import load_dataset
dataset = load_dataset('json', data_files='nlpcc_data.json', field='data')
下面对数据进行处理:
dataset["train"]
def flatten(example):
return {
"document": example["content"],
"summary": example["title"],
"id":"0"
}
dataset = dataset["train"].map(flatten, remove_columns=["title", "content"])
下面加载完整的BART模型,以备fine-tune
TokenModel = "bert-base-chinese"
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(TokenModel)
model_checkpoint = "facebook/bart-large-cnn"
if model_checkpoint in ["t5-small", "t5-base", "t5-larg", "t5-3b", "t5-11b"]:
prefix = "summarize: "
else:
prefix = ""
max_input_length = 1024
max_target_length = 256
def preprocess_function(examples):
inputs = [prefix + doc for doc in examples["document"]]
model_inputs = tokenizer(inputs, max_length=max_input_length, truncation=True)
with tokenizer.as_target_tokenizer():
labels = tokenizer(examples["summary"], max_length=max_target_length, truncation=True)
model_inputs["labels"] = labels["input_ids"]
return model_inputs
dataset
raw_datasets = dataset
tokenized_datasets = raw_datasets.map(preprocess_function, batched=True)
装载数据
from datasets import dataset_dict
import datasets
train_data_txt, validation_data_txt = dataset.train_test_split(test_size=0.1).values()
train_data_txt, test_data_tex = train_data_txt.train_test_split(test_size=0.1).values()
dd = datasets.DatasetDict({"train":train_data_txt,"validation": validation_data_txt,"test":test_data_tex })
raw_datasets = dd
tokenized_datasets = raw_datasets.map(preprocess_function, batched=True)
处理好的数据集
raw_datasets
import datasets
import random
import pandas as pd
from IPython.display import display, HTML
def show_random_elements(dataset, num_examples=5):
assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset."
picks = []
for _ in range(num_examples):
pick = random.randint(0, len(dataset)-1)
while pick in picks:
pick = random.randint(0, len(dataset)-1)
picks.append(pick)
df = pd.DataFrame(dataset[picks])
for column, typ in dataset.features.items():
if isinstance(typ, datasets.ClassLabel):
df[column] = df[column].transform(lambda i: typ.names[i])
display(HTML(df.to_html()))
预览数据
show_random_elements(raw_datasets["train"])
安装transformer
! pip install transformers
from transformers import AutoModelForSeq2SeqLM, DataCollatorForSeq2Seq, Seq2SeqTrainingArguments, Seq2SeqTrainer
model = AutoModelForSeq2SeqLM.from_pretrained(model_checkpoint)
import warnings
from pathlib import Path
from typing import List, Tuple, Union
import fire
from torch import nn
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, PreTrainedModel
from transformers.utils import logging
logger = logging.get_logger(__name__)
抽取部分模型 fine-tune
def copy_layers(src_layers: nn.ModuleList, dest_layers: nn.ModuleList, layers_to_copy: List[int]) -> None:
layers_to_copy = nn.ModuleList([src_layers[i] for i in layers_to_copy])
assert len(dest_layers) == len(layers_to_copy), f"{len(dest_layers)} != {len(layers_to_copy)}"
dest_layers.load_state_dict(layers_to_copy.state_dict())
LAYERS_TO_COPY = {
12: {
1: [0],
2: [0, 6],
3: [0, 6, 11],
4: [0, 4, 8, 11],
6: [0, 2, 4, 7, 9, 11],
9: [0, 1, 2, 4, 5, 7, 9, 10, 11],
12: list(range(12)),
},
16: {
1: [0],
2: [0, 15],
3: [0, 8, 15],
4: [0, 5, 10, 15],
6: [0, 3, 6, 9, 12, 15],
8: [0, 2, 4, 6, 8, 10, 12, 15],
9: [0, 1, 3, 5, 7, 9, 11, 13, 15],
12: [0, 1, 2, 3, 4, 5, 6, 7, 9, 11, 13, 15],
16: list(range(16)),
},
6: {1: [0], 2: [0, 5], 3: [0, 2, 5], 4: [0, 1, 3, 5], 6: list(range(6))},
}
LAYERS_TO_SUPERVISE = {
6: {1: [5], 2: [3, 5], 3: [1, 4, 5], 4: [1, 2, 4, 5]},
12: {1: [11], 2: [5, 11], 3: [3, 7, 11], 6: [1, 3, 5, 8, 10, 11]},
16: {1: [15], 4: [4, 9, 12, 15], 8: [1, 3, 5, 7, 9, 11, 13, 15]},
}
def create_student_by_copying_alternating_layers(
teacher: Union[str, PreTrainedModel],
save_path: Union[str, Path] = "student",
e: Union[int, None] = None,
d: Union[int, None] = None,
copy_first_teacher_layers=False,
e_layers_to_copy=None,
d_layers_to_copy=None,
**extra_config_kwargs
) -> Tuple[PreTrainedModel, List[int], List[int]]:
_msg = "encoder_layers and decoder_layers cannot be both None-- you would just have an identical teacher."
assert (e is not None) or (d is not None), _msg
if isinstance(teacher, str):
AutoTokenizer.from_pretrained(teacher).save_pretrained(save_path)
teacher = AutoModelForSeq2SeqLM.from_pretrained(teacher).eval()
else:
assert isinstance(teacher, PreTrainedModel), f"teacher must be a model or string got type {type(teacher)}"
init_kwargs = teacher.config.to_diff_dict()
try:
teacher_e, teacher_d = teacher.config.encoder_layers, teacher.config.decoder_layers
if e is None:
e = teacher_e
if d is None:
d = teacher_d
init_kwargs.update({"encoder_layers": e, "decoder_layers": d})
except AttributeError:
teacher_e, teacher_d = teacher.config.num_layers, teacher.config.num_decoder_layers
if e is None:
e = teacher_e
if d is None:
d = teacher_d
init_kwargs.update({"num_layers": e, "num_decoder_layers": d})
init_kwargs.update(extra_config_kwargs)
student_cfg = teacher.config_class(**init_kwargs)
student = AutoModelForSeq2SeqLM.from_config(student_cfg)
info = student.load_state_dict(teacher.state_dict(), strict=False)
assert info.missing_keys == [], info.missing_keys
if copy_first_teacher_layers:
e_layers_to_copy, d_layers_to_copy = list(range(e)), list(range(d))
logger.info(
f"Copied encoder layers {e_layers_to_copy} and decoder layers {d_layers_to_copy}. Saving them to {save_path}"
)
student.save_pretrained(save_path)
return student, e_layers_to_copy, d_layers_to_copy
if e_layers_to_copy is None:
e_layers_to_copy: List[int] = pick_layers_to_copy(e, teacher_e)
if d_layers_to_copy is None:
d_layers_to_copy: List[int] = pick_layers_to_copy(d, teacher_d)
try:
copy_layers(teacher.model.encoder.layers, student.model.encoder.layers, e_layers_to_copy)
copy_layers(teacher.model.decoder.layers, student.model.decoder.layers, d_layers_to_copy)
except AttributeError:
copy_layers(teacher.encoder.block, student.encoder.block, e_layers_to_copy)
copy_layers(teacher.decoder.block, student.decoder.block, d_layers_to_copy)
logger.info(
f"Copied encoder layers {e_layers_to_copy} and decoder layers {d_layers_to_copy}. Saving them to {save_path}"
)
student.config.init_metadata = dict(
teacher_type=teacher.config.model_type,
copied_encoder_layers=e_layers_to_copy,
copied_decoder_layers=d_layers_to_copy,
)
student.save_pretrained(save_path)
return student, e_layers_to_copy, d_layers_to_copy
def pick_layers_to_copy(n_student, n_teacher):
try:
val = LAYERS_TO_COPY[n_teacher][n_student]
return val
except KeyError:
if n_student != n_teacher:
warnings.warn(
f"no hardcoded layers to copy for teacher {n_teacher} -> student {n_student}, defaulting to first {n_student}"
)
return list(range(n_student))
model, list_en, list_de = create_student_by_copying_alternating_layers(model, 'trian.pth', 12, 3)
batch_size = 2
args = Seq2SeqTrainingArguments(
output_dir="results",
num_train_epochs=1,
do_train=True,
do_eval=True,
per_device_train_batch_size=batch_size,
per_device_eval_batch_size=batch_size,
warmup_steps=500,
weight_decay=0.1,
label_smoothing_factor=0.1,
predict_with_generate=True,
logging_dir="logs",
logging_steps=50,
save_total_limit=3,
)
data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)
import jieba
import numpy as np
def compute_metrics(eval_pred):
predictions, labels = eval_pred
decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
decoded_preds = ["\n".join(jieba.cut(pred.strip())) for pred in decoded_preds]
decoded_labels = ["\n".join(jieba.cut(label.strip())) for label in decoded_labels]
result = metric.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)
result = {key: value.mid.fmeasure * 100 for key, value in result.items()}
prediction_lens = [np.count_nonzero(pred != tokenizer.pad_token_id) for pred in predictions]
result["gen_len"] = np.mean(prediction_lens)
return {k: round(v, 4) for k, v in result.items()}
trainer = Seq2SeqTrainer(
model,
args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["validation"],
data_collator=data_collator,
tokenizer=tokenizer,
compute_metrics=compute_metrics
)
训练与训练平台
开始训练,我是用的华为云,colab的GPU时间不足以完成训练
trainer.train()
保存和加载模型
torch.save(model.state_dict(), "BART.pth")
import torch
model.load_state_dict(torch.load('BART.pth'))
def generate_summary(test_samples, model):
inputs = tokenizer(
test_samples,
padding="max_length",
truncation=True,
max_length=max_input_length,
return_tensors="pt",
)
input_ids = inputs.input_ids.to(model.device)
attention_mask = inputs.attention_mask.to(model.device)
outputs = model.generate(input_ids, attention_mask=attention_mask)
output_str = tokenizer.batch_decode(outputs, skip_special_tokens=True)
return outputs, output_str
测试
可以做一个测试:
能学习到部分摘要能力,但后面该终止摘要的地方没有自动终止,仍有不足
可供下载的数据集与模型
数据集:nlpcc_data.json 模训练好的模型:BART模型-包含网络参数
|