循环神经网络LSTM实现电影情感分类
一.数据集:
为了对前面的word embedding这种常用的文本向量化的方法进行巩固,这里我们会完成一个文本情感分类的案例
现在我们有一个经典的数据集IMDB 数据集,地址:http://ai.stanford.edu/~amaas/data/sentiment/ ,这是一份包含了5万条流行电影的评论数据,其中训练集25000条,测试集25000条。数据格式如下:
下图左边为名称,其中名称包含两部分,分别是序号和情感评分,(1-4为neg,5-10为pos),右边为评论内容
但本次实验从简设计只实现二分类,即实现积极 和消极 的预测
二.实现流程
-
准备数据集 -
构建模型 -
模型训练 -
模型评估
三.数据集准备
import pickle
import torch
from torch.utils.data import DataLoader, Dataset
import os
import re
'''
data: 电影评论数据
数据集准备
使用W2S模型将文本序列化
'''
data_base_path = r"./aclImdb"
ws = pickle.load(open("./models/ws.pkl", "rb"))
Max_Len = 40
train_batch_size = 512
test_batch_size = 1024
def tokenize(text):
fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
'\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”',
'“', ]
text = re.sub("<.*?>", " ", text)
text = re.sub("|".join(fileters), " ", text)
return [i.strip().lower() for i in text.split()]
class ImdbDataset(Dataset):
def __init__(self, train=True):
super(ImdbDataset, self).__init__()
self.train_data_path = data_base_path + r'\train'
self.test_data_path = data_base_path + r'\test'
self.data_path = self.train_data_path if train else self.test_data_path
self.temp_data_path = [os.path.join(self.data_path, 'pos'), os.path.join(self.data_path, 'neg')]
self.total_file_path_list = []
for path in self.temp_data_path:
self.total_file_path_list.extend([os.path.join(path, j) for j in os.listdir(path) if j.endswith('.txt')])
def __getitem__(self, index):
path = self.total_file_path_list[index]
label_str = path.split('\\')[-2]
label = 0 if label_str == 'neg' else 1
content = tokenize(open(path,encoding='utf-8').read())
return content, label
def __len__(self):
return len(self.total_file_path_list)
def collate_fn(batch):
content, labels = list(zip(*batch))
content = torch.LongTensor([ws.transform(i,max_len=Max_Len) for i in content])
labels = torch.LongTensor(labels)
print(content, labels)
return content, labels
def get_dataloader(train=True,batch_size=train_batch_size):
dataset = ImdbDataset(train)
dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
return dataloader
if __name__ == '__main__':
for idx, (sentence, label) in enumerate(get_dataloader()):
print("idx:", idx)
print("sentence:", sentence)
print("label:", label)
break
import numpy as np
'''
文本序列化,序列化文本类
'''
class Word2Sequence():
UNK_TAG = "UNK"
PAD_TAG = "PAD"
UNK = 0
PAD = 1
def __init__(self):
self.dict = {
self.UNK_TAG :self.UNK,
self.PAD_TAG :self.PAD
}
self.fited = False
self.build_vocabd = False
self.count = {}
def to_index(self,word):
"""word -> index"""
assert self.fited == True,"必须先进行fit操作"
return self.dict.get(word,self.UNK)
def to_word(self,index):
"""index -> word"""
assert self.fited , "必须先进行fit操作"
if index in self.inversed_dict:
return self.inversed_dict[index]
return self.UNK_TAG
def __len__(self):
return len(self.dict)
def fit(self,sentence):
for word in sentence:
self.count[word] = self.count.get(word, 0) + 1
self.fited = True
def build_vocab(self, min_count=5, max_count=None, max_features=None):
'''
生成词典
:param sentence: [word1.word2,word3 .....]
:param min_count: 最小出现的次数
:param max_count: 最大出现的次数
:param max_features: 一共保留多少个特征(word)
:return:
'''
if min_count is not None:
self.count = {k: v for k, v in self.count.items() if v >= min_count}
if max_count is not None:
self.count = {k: v for k, v in self.count.items() if v <= max_count}
if max_features is not None:
temp = sorted(self.count.items(),key=lambda x: x[-1],reverse=True)[:max_features]
self.count = dict(temp)
for word in self.count:
self.dict[word] = len(self.dict)
self.inversed_dict = dict(zip(self.dict.values(), self.dict.keys()))
self.build_vocabd = True
def transform(self, sentence, max_len=None):
"""
实现把句子转化为数组(向量)
:param sentence: [word1,word2,word3 ....]
:param max_len: 向量的限制长度
:return:
"""
assert self.fited, "必须先进行fit操作"
assert self.build_vocabd,"必须先进行build_vocab操作"
if max_len is not None:
if max_len > len(sentence):
sentence = sentence + [self.PAD_TAG] * (max_len-len(sentence))
else:
sentence = sentence[:max_len]
return [self.dict.get(word,self.UNK) for word in sentence]
def inverse_transform(self,indices):
"""
实现从数组转化为文字
:param indices: [1,2,3....]
:return:[word1,word2.....]
"""
assert self.fited, "必须先进行fit操作"
assert self.build_vocabd,"必须先进行build_vocab操作"
return [self.inversed_dict.get(idx) for idx in indices]
if __name__ == '__main__':
w2s = Word2Sequence()
w2s.fit(["你", "好", "么"])
w2s.fit(["你", "好", "哦"])
w2s.build_vocab(min_count=1)
print(w2s.dict)
print(w2s.fited)
print(w2s.transform(["你","好","嘛"]))
print(w2s.transform(["你好嘛"],max_len=10))
print(w2s.inverse_transform([5,2,4]))
print(len(w2s))
略
四.模型构建
import os
import torch
import numpy as np
from torch import nn, optim
from DataSet import get_dataloader, ws, Max_Len, test_batch_size
import torch.nn.functional as F
from tqdm import tqdm
'''
IMDB电影评论情感分析(pos,neg)积极和消极-改进版
使用LSTM双向循环神经网络,抽取最后一个时间步的特征用作全连接层特征输入
即:文本 -> num -> vector -> LSTM[last TimeStep] -> 2层全连接 -> softmax
'''
class IMDBLstmmodel(nn.Module):
def __init__(self):
super(IMDBLstmmodel,self).__init__()
self.hidden_size = 64
self.embedding_dim = 200
self.num_layer = 2
self.bidriectional = True
self.bi_num = 2 if self.bidriectional else 1
self.dropout = 0.5
self.embedding = nn.Embedding(len(ws),self.embedding_dim, padding_idx=ws.PAD)
self.lstm = nn.LSTM(input_size=self.embedding_dim, hidden_size=self.hidden_size, num_layers=self.num_layer,
bidirectional=self.bidriectional, dropout=self.dropout)
self.fc = nn.Linear(self.hidden_size*self.bi_num, 20)
self.fc2 = nn.Linear(20, 2)
def forward(self, x):
x = self.embedding(x)
x = x.permute(1, 0, 2)
x, (h_n, c_n) = self.lstm(x)
out = torch.cat([h_n[-2, :, :], h_n[-1, :, :]], dim=-1)
out = self.fc(out)
out = F.relu(out)
out = self.fc2(out)
return F.log_softmax(out,dim=-1)
五. 模型训练
model = IMDBLstmmodel()
optimizer = optim.Adam(model.parameters(), lr=0.001)
if os.path.exists('./models/lstm_model.pkl'):
model.load_state_dict(torch.load('./models/lstm_model.pkl'))
optimizer.load_state_dict(torch.load('./models/lstm_optimizer.pkl'))
def train(epoch):
data_loader = get_dataloader()
for idx, (input, label) in tqdm(enumerate(data_loader),total=len(data_loader),ascii=True,desc='第%d轮训练'%epoch):
optimizer.zero_grad()
ouput = model(input)
loss = F.nll_loss(ouput, label)
loss.backward()
optimizer.step()
if idx == len(data_loader)-1:
print('result: 第%d轮次训练,损失%f'%(epoch,loss.item()))
torch.save(model.state_dict(), "./models/lstm_model.pkl")
torch.save(optimizer.state_dict(), './models/lstm_optimizer.pkl')
训练效果(这里我已经提前训练了,所以损失已经很低了)
六.模型评估
def test():
model.eval()
loss_ = []
acc_ = []
with torch.no_grad():
data_loader = get_dataloader(train=False,batch_size=test_batch_size)
for idx, (input, label) in tqdm(enumerate(data_loader),total=len(data_loader),ascii=True,desc='模型评估'):
ouput = model(input)
loss = F.nll_loss(ouput,label,reduction="mean")
loss_.append(loss.item())
pred = ouput.max(dim=1)[1]
acc_.append(pred.eq(pred).float().mean())
print('模型损失%f,平均准确率%f' % (np.mean(loss_), np.mean(acc_)))
准确率在99% 以上
|