代码是根据《动手学深度学习torch版》写的,在原书的4.10
关于函数之类的我都写在代码中了,只要环境没问题,应该能直接跑的。
采用的Adam优化,均方根误差loss,k-折验证
数据集我也发了点我查看就好
import hashlib
import os
import tarfile
import zipfile
import requests
import numpy as py
import pandas as pd
from IPython import display
import torch
from torch import nn
import matplotlib.pyplot as plt
DATA_HUB = dict() # dict()用于创造一个字典
DATA_URL = 'http://d2l-data.s3-accelerate.amazonaws.com/'
def download(name, cache_dir=os.path.join('data')):
# 下载一个DATA_HUB中的文件,返回本地文件名
assert name in DATA_HUB, f"{name} 不存在于 {DATA_HUB}."
url, sha1_hash = DATA_HUB[name]
os.makedirs(cache_dir, exist_ok=True)
fname = os.path.join(cache_dir, url.split('/')[-1])
if os.path.exists(fname):
sha1 = hashlib.sha1()
with open(fname, 'rb') as f:
while True:
data = f.read(1048576)
if not data:
break
sha1.update(data)
if sha1.hexdigest() == sha1_hash:
return fname # Hit cache
print(f'正在从{url}下载{fname}...')
r = requests.get(url, stream=True, verify=True)
with open(fname, 'wb') as f:
f.write(r.content)
return fname
def download_extract(name, folder=None):
# 下载并解压zip/tar文件
fname = download(name)
base_dir = os.path.dirname(fname)
data_dir, ext = os.path.splitext(fname)
if ext == '.zip':
fp = zipfile.ZipFile(fname, 'r')
elif ext in ('.tar', '.gz'):
fp = tarfile.open(fname, 'r')
else:
assert False # 只有zip/tar文件可以被解压缩
fp.extractall(base_dir)
return os.path.join(base_dir, folder) if folder else data_dir
def load_array(data_arrays, batch_size, is_train=True):
# 构造一个pytorch数据迭代器
dataset = torch.utils.data.TensorDataset(*data_arrays)
return torch.utils.data.DataLoader(dataset, batch_size, shuffle=is_train)
def download_all():
# 下载DATA_HUB中的所有文件
for name in DATA_HUB:
download(name)
# 下载并缓存kaggle房屋数据集
DATA_HUB['kaggle_house_train'] = (DATA_URL + 'kaggle_house_pred_train.csv', '585e9cc93e70b39160e7921475f9bcd7d31219ce')
DATA_HUB['kaggle_house_test'] = (DATA_URL + 'kaggle_house_pred_test.csv', 'fa19780a7b011d9b009e8bff8e99922a8ee2eb90')
# 分别加载包含训练数据和测试数据的两个CSV文件
train_data = pd.read_csv(download('kaggle_house_train'))
test_data = pd.read_csv(download('kaggle_house_test'))
# 查看数据集形状
# print(train_data.shape)
# print(test_data.shape)
# 看看看前四个后最后两个特征以及对应的标签
# print(train_data.iloc[0:4, [0, 1, 2, 3, -3, -2, -1]])
# 第一个特征是ID,没有用,把它删除了
all_features = pd.concat((train_data.iloc[:, 1:-1], test_data.iloc[:, 1:-1]))
# print(all_features.iloc[0:4, [0, 1, 2, 3, -3, -2, -1]])
# 标准化数据
numeric_features = all_features.dtypes[all_features.dtypes != 'object'].index
all_features[numeric_features] = all_features[numeric_features].apply(lambda x: (x - x.mean()) / (x.std()))
# 在标准化数据之后,所有数据都意味着消失,因此我们可以将缺失值设置为0
all_features[numeric_features] = all_features[numeric_features].fillna(0) # 缺失值(NA)用0来替换
# 'Dummy_na=True'将“na”(缺失值)视为有效的特征值,并且为其创建指示符特征
all_features = pd.get_dummies(all_features, dummy_na=True) # one-hot
# print(all_features.shape)
# 通过values属性,从pandas格式中提取Numpy格式,并将其转换为张量表示用于训练
n_train = train_data.shape[0]
train_features = torch.tensor(all_features[:n_train].values, dtype=torch.float32)
test_features = torch.tensor(all_features[n_train:].values, dtype=torch.float32)
train_labels = torch.tensor(train_data.SalePrice.values.reshape(-1, 1), dtype=torch.float32)
# 一个简单的线性模型来测试数据
loss = nn.MSELoss()
in_features = train_features.shape[1]
def get_net():
net = nn.Sequential(nn.Linear(in_features, 1))
return net
# 均方根误差
def log_rmse(net, features, labels):
# 为了在取对数时进一步稳定该值,将小于1的值设为1
# torch.clamp将输入input张量每个元素的夹紧到区间 [min,max][min,max],并返回结果到一个新张量
clipped_pred = torch.clamp(net(features), 1, float('inf'))
rmse = torch.sqrt(loss(torch.log(clipped_pred), torch.log(labels)))
return rmse.item()
def train(net, train_features, train_labels, test_features, test_labels, num_epochs,
learning_rate, weight_decay, batch_size):
train_ls, test_ls = [], []
train_iter = load_array((train_features, train_labels), batch_size)
# 这里用的是Adam优化算法
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate, weight_decay=weight_decay)
for epoch in range(num_epochs):
for X, y in train_iter:
optimizer.zero_grad() # 梯度清零
l = loss(net(X), y) # loss
l.backward() # 反向传播,获得梯度
optimizer.step() # 更新参数
train_ls.append(log_rmse(net, train_features, train_labels))
if test_labels is not None:
test_ls.append(log_rmse(net, test_features, test_labels))
return train_ls, test_ls
# K折交叉验证
def get_k_folk_data(k, i, X, y):
assert k > 1
fold_size = X.shape[0] // k # //整除
X_train, y_train = None, None
for j in range(k):
idx = slice(j * fold_size, (j + 1) * fold_size) # 截取点
X_part, y_part = X[idx, :], y[idx]
if j == i: # 验证集
X_valid, y_valid = X_part, y_part
elif X_train is None: # 第一次
X_train, y_train = X_part, y_part
else: # 连接X(y)_train与X(y)_part
X_train = torch.cat([X_train, X_part], 0)
y_train = torch.cat([y_train, y_part], 0)
return X_train, y_train, X_valid, y_valid
def use_svg_display():
"""Use svg format to display plot in jupyter"""
display.set_matplotlib_formats('svg')
def set_figsize(figsize=(3.5, 2.5)):
use_svg_display()
# 设置图的尺寸
plt.rcParams['figure.figsize'] = figsize
def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):
# 设置matplotlib的轴
axes.set_xlabel(xlabel)
axes.set_ylabel(ylabel)
axes.set_xscale(xscale)
axes.set_yscale(yscale)
axes.set_xlim(xlim)
axes.set_ylim(ylim)
# if legend: # 这里注释的原因是报了'AxesSubplot' object has no attribute 'lagend'的错,我没有找到具体的原因,但我发现注释了也可以直接跑,所以就先注释了,如果有知道的大佬请告知,谢谢!
# axes.lagend(legend)
axes.grid()
def plot(X, Y=None, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None):
# 绘制数据点
if legend is None:
legend = []
set_figsize(figsize)
axes = axes if axes else plt.gca()
# 如果'X'有一个轴,输出True
def has_one_axis(X):
return hasattr(X, "ndim") and X.ndim == 1 or isinstance(X, list)
if has_one_axis(X):
X = [X]
if Y is None:
X, Y = [[]] + len(X), X
elif has_one_axis(Y):
X = X * len(Y)
axes.cla() # Clear axis即清除当前图形中的当前活动轴。其他轴不受影响
for x, y, fmt in zip(X, Y, fmts):
if len(x):
axes.plot(x, y, fmt)
else:
axes.plot(y, fmt)
set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend)
def k_fold(k, X_train, y_train, num_epochs, learning_rate, weight_decay, batch_size):
train_l_sum, valid_l_sum = 0, 0
for i in range(k):
data = get_k_folk_data(k, i, X_train, y_train)
net = get_net()
train_ls, valid_ls = train(net, *data, num_epochs, learning_rate, weight_decay, batch_size)
train_l_sum += train_ls[-1]
valid_l_sum += valid_ls[-1]
if i == 0:
plot(list(range(1, num_epochs + 1)), [train_ls, valid_ls], xlabel='epoch', ylabel='rmse', legend=['train', 'valid'],
xlim=[1, num_epochs], yscale='log')
print(f'fold{i + 1}, train log rmse {float(train_ls[-1]):f}, '
f'valid log rmse{float(valid_ls[-1]):f}')
return train_l_sum / k, valid_l_sum / k
k, num_epochs, lr, weight_decay, batch_size = 5, 100, 5, 0, 64
train_l, valid_l, = k_fold(k, train_features, train_labels, num_epochs, lr, weight_decay, batch_size)
print(f'{k}-折验证:平均训练log rmse:{float(train_l):f},'
f'平均验证log rmse:{float(valid_l):f}')
plt.show()
|