TextCNN
一、文件目录
二、语料库(MR)
MR数据集(电影评论): 1.积极评论 2.消极评论
三、数据处理(MR_Dataset.py)
1.词向量导入 2.数据集加载 3.构建word2id并pad成相同的长度 4.求词向量均值和方差 5.生成词向量 6.生成训练集,验证集和测试集
from torch.utils import data
import os
import random
import numpy as np
from gensim.test.utils import datapath, get_tmpfile
from gensim.models import KeyedVectors
class MR_Dataset(data.Dataset):
def __init__(self, state="train",k=0,embedding_type="word2cec"):
self.path = os.path.abspath('.')
if "data" not in self.path:
self.path+="/data"
pos_samples = open(self.path+"/MR/rt-polarity.pos", errors="ignore").readlines()
neg_samples = open(self.path+"/MR/rt-polarity.neg", errors="ignore").readlines()
datas = pos_samples + neg_samples
datas = [data.split() for data in datas]
labels = [1] * len(pos_samples) + [0] * len(neg_samples)
max_sample_length = max([len(sample) for sample in datas])
word2id = {"<pad>": 0}
for i, data in enumerate(datas):
for j, word in enumerate(data):
if word2id.get(word) == None:
word2id[word] = len(word2id)
datas[i][j] = word2id[word]
datas[i] = datas[i] + [0] * (max_sample_length - len(datas[i]))
self.n_vocab = len(word2id)
self.word2id = word2id
if embedding_type=="word2vec":
self.get_word2vec()
c = list(zip(datas, labels))
random.seed(1)
random.shuffle(c)
datas[:], labels[:] = zip(*c)
if state == "train":
self.datas = datas[:int(k * len(datas) / 10)] + datas[int((k + 1) * len(datas) / 10):]
self.labels = labels[:int(k * len(datas) / 10)] + labels[int((k + 1) * len(labels) / 10):]
self.datas = np.array(self.datas[0:int(0.9 * len(self.datas))])
self.labels = np.array(self.labels[0:int(0.9 * len(self.labels))])
elif state == "valid":
self.datas = datas[:int(k * len(datas) / 10)] + datas[int((k + 1) * len(datas) / 10):]
self.labels = labels[:int(k * len(datas) / 10)] + labels[int((k + 1) * len(labels) / 10):]
self.datas = np.array(self.datas[int(0.9 * len(self.datas)):])
self.labels = np.array(self.labels[int(0.9 * len(self.labels)):])
elif state == "test":
self.datas = np.array(datas[int(k * len(datas) / 10):int((k + 1) * len(datas) / 10)])
self.labels = np.array(labels[int(k * len(datas) / 10):int((k + 1) * len(datas) / 10)])
def get_word2vec(self):
if not os.path.exists(self.path+"/word2vec_embedding_mr.npy"):
print ("Reading word2vec Embedding...")
wvmodel = KeyedVectors.load_word2vec_format("./GoogleNews-vectors-negative300.bin.gz", binary=True)
tmp = []
for word, index in self.word2id.items():
try:
tmp.append(wvmodel.get_vector(word))
except:
pass
mean = np.mean(np.array(tmp))
std = np.std(np.array(tmp))
vocab_size = len(self.word2id)
embed_size = 300
embedding_weights = np.random.normal(mean, std, [vocab_size, embed_size])
for word, index in self.word2id.items():
try:
embedding_weights[index, :] = wvmodel.get_vector(word)
except:
pass
np.save(self.path + "/word2vec_embedding_mr.npy", embedding_weights)
else:
embedding_weights = np.load(self.path + "/word2vec_embedding_mr.npy")
self.weight = embedding_weights
def __getitem__(self, index):
return self.datas[index], self.labels[index]
def __len__(self):
return len(self.datas)
if __name__ == "__main__":
mr_train_dataset = MR_Dataset()
print(mr_train_dataset.__len__())
mr_valid_dataset = MR_Dataset("valid")
print(mr_valid_dataset.__len__())
mr_test_dataset = MR_Dataset("test")
print(mr_test_dataset.__len__())
四、模型(TextCNN.py)
import numpy as np
import torch
class EarlyStopping:
"""如果在给定的耐心之后,验证失败没有改善,那么Early会停止培训。"""
def __init__(self, patience=7, verbose=False, delta=0,cv_index = 0):
"""
Args:
patience (int):上次验证失败改善后需要等待多长时间。
默认值:7
verbose (bool): 如果为真,则为每个验证丢失改进打印一条消息。
默认值:False
delta (float):监控量的最小变化,符合改善条件。
默认值:0
"""
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.delta = delta
self.cv_index = cv_index
def __call__(self, val_loss, model):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model)
elif score < self.best_score + self.delta:
self.counter += 1
print('EarlyStopping counter: %d out of %d'%(self.counter,self.patience))
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model)
self.counter = 0
def save_checkpoint(self, val_loss, model):
'''验证损失减少时保存模型。'''
if self.verbose:
print('Validation loss decreased (%.5f --> %.5f). Saving model ...'%(self.val_loss_min,val_loss))
torch.save(model.state_dict(), './checkpoints/checkpoint%d.pt'%self.cv_index)
self.val_loss_min = val_loss
五、防止过拟合(pytorchtools.py)
import torch
import torch.nn as nn
import numpy as np
class C2W(nn.Module):
def __init__(self, config):
super(C2W, self).__init__()
self.char_hidden_size = config.char_hidden_size
self.word_embed_size = config.word_embed_size
self.lm_hidden_size = config.lm_hidden_size
self.character_embedding = nn.Embedding(config.n_chars,config.char_embed_size)
self.sentence_length = config.max_sentence_length
self.char_lstm = nn.LSTM(input_size=config.char_embed_size,hidden_size=config.char_hidden_size,
bidirectional=True,batch_first=True)
self.lm_lstm = nn.LSTM(input_size=self.word_embed_size,hidden_size=config.lm_hidden_size,batch_first=True)
self.fc_1 = nn.Linear(2*config.char_hidden_size,config.word_embed_size)
self.fc_2 =nn.Linear(config.lm_hidden_size,config.vocab_size)
def forward(self, x):
input = self.character_embedding(x)
char_lstm_result = self.char_lstm(input)
word_input = torch.cat([char_lstm_result[0][:,-1,0:self.char_hidden_size],
char_lstm_result[0][:,0,self.char_hidden_size:]],dim=1)
word_input = self.fc_1(word_input)
word_input = word_input.view([-1,self.sentence_length,self.word_embed_size])
lm_lstm_result = self.lm_lstm(word_input)[0].contiguous()
lm_lstm_result = lm_lstm_result.view([-1,self.lm_hidden_size])
print(lm_lstm_result.shape)
out = self.fc_2(lm_lstm_result)
return out
class config:
def __init__(self):
self.n_chars = 64
self.char_embed_size = 50
self.max_sentence_length = 8
self.char_hidden_size = 50
self.lm_hidden_size = 150
self.word_embed_size = 50
config.vocab_size = 1000
if __name__=="__main__":
config = config()
c2w = C2W(config)
test = np.zeros([64,16])
c2w(test)
六、训练和测试
from pytorchtools import EarlyStopping
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.optim as optim
from model import TextCNN
from data import MR_Dataset
import numpy as np
import config as argumentparser
config = argumentparser.ArgumentParser()
config.filters = list(map(int,config.filters.split(",")))
torch.manual_seed(config.seed)
if torch.cuda.is_available():
torch.cuda.set_device(config.gpu)
i=0
early_stopping = EarlyStopping(patience=10, verbose=True,cv_index=i)
training_set = MR_Dataset(state="train",k=i,embedding_type=config.embedding_type)
config.n_vocab = training_set.n_vocab
training_iter = torch.utils.data.DataLoader(dataset=training_set,
batch_size=config.batch_size,
shuffle=True,
num_workers=0)
if config.use_pretrained_embed:
config.embedding_pretrained = torch.from_numpy(training_set.weight).float()
else:
config.embedding_pretrained = False
valid_set = MR_Dataset(state="valid", k=i,embedding_type="no")
valid_iter = torch.utils.data.DataLoader(dataset=valid_set,
batch_size=config.batch_size,
shuffle=False,
num_workers=0)
test_set = MR_Dataset(state="test", k=i,embedding_type="no")
test_iter = torch.utils.data.DataLoader(dataset=test_set,
batch_size=config.batch_size,
shuffle=False,
num_workers=0)
model = TextCNN(config)
if config.cuda and torch.cuda.is_available():
model.cuda()
config.embedding_pretrained.cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config.learning_rate)
count = 0
loss_sum = 0
def get_test_result(data_iter,data_set):
model.eval()
data_loss = 0
true_sample_num = 0
for data, label in data_iter:
if config.cuda and torch.cuda.is_available():
data = data.cuda()
label = label.cuda()
else:
data = torch.autograd.Variable(data).long()
out = model(data)
loss = criterion(out, autograd.Variable(label.long()))
data_loss += loss.data.item()
true_sample_num += np.sum((torch.argmax(out, 1) == label).cpu().numpy())
acc = true_sample_num / data_set.__len__()
return data_loss,acc
for epoch in range(config.epoch):
model.train()
for data, label in training_iter:
if config.cuda and torch.cuda.is_available():
data = data.cuda()
label = label.cuda()
else:
data = torch.autograd.Variable(data).long()
label = torch.autograd.Variable(label).squeeze()
out = model(data)
l2_loss = config.l2*torch.sum(torch.pow(list(model.parameters())[1],2))
loss = criterion(out, autograd.Variable(label.long()))+l2_loss
loss_sum += loss.data.item()
count += 1
if count % 100 == 0:
print("epoch", epoch, end=' ')
print("The loss is: %.5f" % (loss_sum / 100))
loss_sum = 0
count = 0
optimizer.zero_grad()
loss.backward()
optimizer.step()
valid_loss,valid_acc = get_test_result(valid_iter,valid_set)
early_stopping(valid_loss, model)
print ("The valid acc is: %.5f" % valid_acc)
if early_stopping.early_stop:
print("Early stopping")
break
acc = 0
model.load_state_dict(torch.load('./checkpoints/checkpoint%d.pt' % i))
test_loss, test_acc = get_test_result(test_iter, test_set)
print("The test acc is: %.5f" % test_acc)
acc += test_acc / 10
print("The test acc is: %.5f" % acc)
实验结果
输出loss值: 举个例子判断:
x = "it's so bad"
x = x.split()
x = [training_set.word2id[word] for word in x]
x = np.array(x+[0]*(59-len(x))).reshape([1,-1])
x = torch.autograd.Variable(torch.Tensor(x)).long()
out = model(x)
明显0.7111比较大,所以输出为位置0,0表示消极。
|