前言:有了上一篇的理论基础,我们正式开始动手实践。作为张靓颖的六十年铁杆歌迷,我们这次就拿“小海豚”做实验,看看能不能用RNN写出来自己的靓式情歌。
爬取张靓颖歌词数据集
用爬虫爬取,QQ音乐上爬取的。QQ音乐很良心,没有多少反爬措施。
爬下来的数据存到我们data.txt中:
#!/usr/bin/python
# -*- coding:utf-8 -*-
import requests
import json
import pymongo
import time
import os
f = open('data.txt', 'w+')
def main(page):
print(page)
url = 'https://c.y.qq.com/soso/fcgi-bin/client_search_cp'
data = {'qqmusic_ver': 1298,
'remoteplace': 'txt.yqq.lyric',
'inCharset': 'utf8',
'sem': 1, 'ct': 24, 'catZhida': 1, 'p': page,
'needNewCode': 0, 'platform': 'yqq',
'lossless': 0, 'notice': 0, 'format': 'jsonp', 'outCharset': 'utf-8', 'loginUin': 0,
'jsonpCallback': 'MusicJsonCallback19507963135827455',
'searchid': '98485846416392878',
'hostUin': 0, 'n': 10, 'g_tk': 5381, 't': 7,
'w': '张靓颖中文', 'aggr': 0
}
headers = {'content-type': 'application/json',
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0'}
r = requests.get(url, params=data, headers=headers)
time.sleep(3)
text = r.text[35:-1]
# print(text)
result = json.loads(text)
if result['code'] == 0:
for list in result['data']['lyric']['list']:
print(list['content'])
# 去掉歌手名等冗余信息
temp = list['content'].replace('\\n', '').replace('-', '').replace(')', '').replace('(', '').replace(':', '').replace(':', '').replace('《', '').replace('》', '')
temp = temp.replace('词', '').replace('曲', '').replace('张靓颖', '').replace('Jane', '').replace('Zhang', '')
print(temp)
f.write(temp)
if __name__ == '__main__':
for i in range(1, 20):
main(i)
f.close()
看一眼过滤之后的,人名没有挑出来,会有影响,但是目测还行:
数据集准备
我们把数据集加载后,映射成一个词典,具体操作如下:
def load_data_jay_lyrics():
'''
加载歌词数据集
:return:
corpus_indices:
char_to_idx:
idx_to_char:
vocab_size: 映射后词典的长度
'''
# with zipfile.ZipFile('./data/data_jaychou_lyrics.txt.zip') as zin:
# with zin.open('jaychou_lyrics.txt') as f:
# corpus_chars = f.read().decode('utf-8')
corpus_chars = f.read()
# print(corpus_chars)
# 将换行符替换成空格
corpus_chars = corpus_chars.replace('\n', ' ').replace('\r', ' ')
# 选取前10000个进行训练
corpus_chars = corpus_chars[0:10000]
# 将数据集里所有不同字符取出来,然后将其逐一映射到索引来构造词典
idx_to_char = list(set(corpus_chars))
char_to_idx = dict([(char, i) for i, char in enumerate(idx_to_char)])
vocab_size = len(char_to_idx)
corpus_indices = [char_to_idx[char] for char in corpus_chars]
return corpus_indices, char_to_idx, idx_to_char, vocab_size
# print(vocab_size) # 1027
# print(char_to_idx)
# sample = corpus_indices[:20]
# print('chars:', ''.join([idx_to_char[idx] for idx in sample]))
# print('indices:', sample)
然后对数据集进行随机采样:
# 随机采样
def data_iter_random(corpus_indices, batch_size, num_steps, device=None):
# 减1是因为输出的索引x是相应输入的索引y加1
num_examples = (len(corpus_indices) - 1) // num_steps
epoch_size = num_examples // batch_size
example_indices = list(range(num_examples))
random.shuffle(example_indices) # 随机打乱
# 返回从pos开始的长为num_steps的序列
def _data(pos):
return corpus_indices[pos: pos + num_steps]
for i in range(epoch_size):
# 每次读取batch_size个随机样本
i = i * batch_size
batch_indices = example_indices[i: i + batch_size]
X = [_data(j * num_steps) for j in batch_indices]
Y = [_data(j * num_steps + 1) for j in batch_indices]
# yield 的作用就是把一个函数变成一个 generator,带有 yield 的函数不再是一个普通函数,Python 解释器会将其视为一个generator
yield torch.tensor(X, dtype=torch.float32, device=device), torch.tensor(Y, dtype=torch.float32, device=device)
RNN模型定义:
这一段直接用了内置库,很简单。
class RNNModel(nn.Block):
def __init__(self, rnn_layer, vocab_size, **kwargs):
super(RNNModel, self).__init__(**kwargs)
self.rnn = rnn_layer
self.vocab_size = vocab_size
self.dense = nn.Dense(vocab_size)
def forward(self, inputs, state):
# 将输入转置成(num_steps, batch_size)后获取one-hot向量表示
X = nd.one_hot(inputs.T, self.vocab_size)
Y, state = self.rnn(X, state)
# 全连接层会首先将Y的形状变成(num_steps * batch_size, num_hiddens),它的输出
# 形状为(num_steps * batch_size, vocab_size)
output = self.dense(Y.reshape((-1, Y.shape[-1])))
return output, state
def begin_state(self, *args, **kwargs):
return self.rnn.begin_state(*args, **kwargs)
开始训练
完整的代码如下:
# import torch
import random
import zipfile
import numpy as np
# from torch import nn, optim
# import torch.nn.functional as F
import math
import mxnet as mx
from mxnet import autograd, gluon, init, nd
from mxnet.gluon import loss as gloss, nn, rnn
import time
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
f = open('./data/data.txt')
def load_data_jay_lyrics():
'''
加载歌词数据集
:return:
corpus_indices:
char_to_idx:
idx_to_char:
vocab_size: 映射后词典的长度
'''
# with zipfile.ZipFile('./data/data_jaychou_lyrics.txt.zip') as zin:
# with zin.open('jaychou_lyrics.txt') as f:
# corpus_chars = f.read().decode('utf-8')
corpus_chars = f.read()
# print(corpus_chars)
# 将换行符替换成空格
corpus_chars = corpus_chars.replace('\n', ' ').replace('\r', ' ')
# 选取前10000个进行训练
corpus_chars = corpus_chars[0:10000]
# 将数据集里所有不同字符取出来,然后将其逐一映射到索引来构造词典
idx_to_char = list(set(corpus_chars))
char_to_idx = dict([(char, i) for i, char in enumerate(idx_to_char)])
vocab_size = len(char_to_idx)
corpus_indices = [char_to_idx[char] for char in corpus_chars]
return corpus_indices, char_to_idx, idx_to_char, vocab_size
# print(vocab_size) # 1027
# print(char_to_idx)
# sample = corpus_indices[:20]
# print('chars:', ''.join([idx_to_char[idx] for idx in sample]))
# print('indices:', sample)
# 随机采样
def data_iter_random(corpus_indices, batch_size, num_steps, device=None):
# 减1是因为输出的索引x是相应输入的索引y加1
num_examples = (len(corpus_indices) - 1) // num_steps
epoch_size = num_examples // batch_size
example_indices = list(range(num_examples))
random.shuffle(example_indices) # 随机打乱
# 返回从pos开始的长为num_steps的序列
def _data(pos):
return corpus_indices[pos: pos + num_steps]
for i in range(epoch_size):
# 每次读取batch_size个随机样本
i = i * batch_size
batch_indices = example_indices[i: i + batch_size]
X = [_data(j * num_steps) for j in batch_indices]
Y = [_data(j * num_steps + 1) for j in batch_indices]
# yield 的作用就是把一个函数变成一个 generator,带有 yield 的函数不再是一个普通函数,Python 解释器会将其视为一个generator
yield torch.tensor(X, dtype=torch.float32, device=device), torch.tensor(Y, dtype=torch.float32, device=device)
# 相邻采样
def data_iter_consecutive(corpus_indices, batch_size, num_steps, ctx=None):
corpus_indices = nd.array(corpus_indices, ctx=ctx)
data_len = len(corpus_indices)
batch_len = data_len // batch_size
indices = corpus_indices[0: batch_size*batch_len].reshape((
batch_size, batch_len))
epoch_size = (batch_len - 1) // num_steps
for i in range(epoch_size):
i = i * num_steps
X = indices[:, i: i + num_steps]
Y = indices[:, i + 1: i + num_steps + 1]
yield X, Y
class RNNModel(nn.Block):
def __init__(self, rnn_layer, vocab_size, **kwargs):
super(RNNModel, self).__init__(**kwargs)
self.rnn = rnn_layer
self.vocab_size = vocab_size
self.dense = nn.Dense(vocab_size)
def forward(self, inputs, state):
# 将输入转置成(num_steps, batch_size)后获取one-hot向量表示
X = nd.one_hot(inputs.T, self.vocab_size)
Y, state = self.rnn(X, state)
# 全连接层会首先将Y的形状变成(num_steps * batch_size, num_hiddens),它的输出
# 形状为(num_steps * batch_size, vocab_size)
output = self.dense(Y.reshape((-1, Y.shape[-1])))
return output, state
def begin_state(self, *args, **kwargs):
return self.rnn.begin_state(*args, **kwargs)
# RNN预测函数
def predict_rnn_gluon(prefix, num_chars, model, vocab_size, ctx, idx_to_char, char_to_idx):
# 使用model的成员函数来初始化隐藏状态
state = model.begin_state(batch_size=1, ctx=ctx)
output = [char_to_idx[prefix[0]]] # 预测歌词在map中的位置
# print(f"output is {output}")
for t in range(num_chars + len(prefix) - 1):
X = nd.array([output[-1]], ctx=ctx).reshape((1, 1))
# print(f"X is {X}")
# print(f"state is {state}")
(Y, state) = model(X, state) # 前向计算不需要传入模型参数
if t < len(prefix) - 1:
output.append(char_to_idx[prefix[t + 1]])
else:
output.append(int(Y.argmax(axis=1).asscalar()))
return ''.join([idx_to_char[i] for i in output])
def try_gpu():
try:
ctx = mx.gpu()
_ = nd.zeros((1,), ctx=ctx)
except mx.base.MXNetError:
ctx = mx.cpu()
return ctx
# 梯度剪裁
def grad_clipping(params, theta, ctx):
norm = nd.array([0], ctx)
for param in params:
norm += (param.grad ** 2).sum()
norm = norm.sqrt().asscalar()
if norm > theta:
for param in params:
param.grad[:] *= theta / norm
# ctx = try_gpu()
# model = RNNModel(rnn_layer, vocab_size)
# model.initialize(force_reinit=True, ctx=ctx)
# predict_rnn_gluon('分开', 10, model, vocab_size, ctx, idx_to_char, char_to_idx)
# rnn训练
def train_and_predict_rnn_gluon(model, num_hiddens, vocab_size, ctx,
corpus_indices, idx_to_char, char_to_idx,
num_epochs, num_steps, lr, clipping_theta,
batch_size, pred_period, pred_len, prefixes):
loss = gloss.SoftmaxCrossEntropyLoss()
model.initialize(ctx=ctx, force_reinit=True, init=init.Normal(0.01))
trainer = gluon.Trainer(model.collect_params(), 'sgd',
{'learning_rate': lr, 'momentum': 0, 'wd': 0})
for epoch in range(num_epochs):
l_sum, n, start = 0.0, 0, time.time()
data_iter = data_iter_consecutive(corpus_indices, batch_size, num_steps, ctx)
state = model.begin_state(batch_size=batch_size, ctx=ctx)
for X, Y in data_iter:
for s in state:
s.detach()
with autograd.record():
(output, state) = model(X, state)
y = Y.T.reshape((-1,))
l = loss(output, y).mean()
l.backward()
# 梯度裁剪
params = [p.data() for p in model.collect_params().values()]
grad_clipping(params, clipping_theta, ctx)
trainer.step(1) # 因为已经误差取过均值,梯度不用再做平均
l_sum += l.asscalar() * y.size
n += y.size
if (epoch + 1) % pred_period == 0:
print('epoch %d, perplexity %f, time %.2f sec' % (
epoch + 1, math.exp(l_sum / n), time.time() - start))
for prefix in prefixes:
print(' -', predict_rnn_gluon(
prefix, pred_len, model, vocab_size, ctx, idx_to_char,
char_to_idx))
if __name__ == '__main__':
# 读取歌词
(corpus_indices, char_to_idx, idx_to_char,
vocab_size) = load_data_jay_lyrics()
# 构造一个含单隐藏层、隐藏单元个数为256的循环神经网络层
num_hiddens = 256
rnn_layer = rnn.RNN(num_hiddens)
rnn_layer.initialize()
model = RNNModel(rnn_layer, vocab_size)
num_inputs, num_hiddens, num_outputs = vocab_size, 256, vocab_size
ctx = try_gpu()
num_epochs, num_steps, batch_size, lr, clipping_theta = 25000, 35, 32, 1e2, 1e-2 # 超参数
# 预测周期, 预测歌词的长度, 预测歌词
pred_period, pred_len, prefixes = 50, 50, ['快乐', '伤心']
train_and_predict_rnn_gluon(model, num_hiddens, vocab_size, ctx,
corpus_indices, idx_to_char, char_to_idx,
num_epochs, num_steps, lr, clipping_theta,
batch_size, pred_period, pred_len, prefixes)
预测出的结果如下:
结果分析:
很明显,我们的模型过拟合了,因为有过多重复的生成歌词。
如何解决这个问题,我们将在下一节讲述。
这次使用的是mxnet,下一篇会用更常见的pytorch,进行更多的参数优化。
?
参考:
|