python储备知识补充
OS操作补充
(1)os.path.abspath(file)&os.path.dirname
import os
BASE_DIR = os.path.abspath(__file__)
print(BASE_DIR)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
print(BASE_DIR)
dataset_dir = os.path.join(BASE_DIR, "data", "RMB_data")
(2)os.walk(filedir)
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
dataset_dir = os.path.abspath(os.path.join(BASE_DIR, "data", "RMB_data"))
print(dataset_dir)
root, dirs, files = os.walk(dataset_dir)
print(root, type(root))
print(dirs, type(dirs))
print(files, type(files))
(3)lambda匿名函数&x.endswith()&filter()&list()
import os
import random
import shutil
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
dataset_dir = os.path.abspath(os.path.join(BASE_DIR, "data", "RMB_data"))
print(dataset_dir)
root, dirs, files = os.walk(dataset_dir)
for root, dirs, files in os.walk(dataset_dir):
for sub_dir in dirs:
imgs = os.listdir(os.path.join(root, sub_dir))
imgs = list(filter(lambda x: x.endswith('.jpg'), imgs))
print(imgs)
DataLoad机制详解
案例
数据集划分
import os
import random
import shutil
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
def makedir(new_dir):
if not os.path.exists(new_dir):
os.makedirs(new_dir)
if __name__ == '__main__':
dataset_dir = os.path.abspath(os.path.join(BASE_DIR, "data", "RMB_data"))
split_dir = os.path.abspath(os.path.join(BASE_DIR, "data", "rmb_split"))
train_dir = os.path.join(split_dir, "train")
valid_dir = os.path.join(split_dir, "valid")
test_dir = os.path.join(split_dir, "test")
train_pct = 0.8
valid_pct = 0.1
test_pct = 0.1
for root, dirs, files in os.walk(dataset_dir):
for sub_dir in dirs:
imgs = os.listdir(os.path.join(root, sub_dir))
imgs = list(filter(lambda x: x.endswith('.jpg'), imgs))
random.shuffle(imgs)
img_count = len(imgs)
train_point = int(img_count * train_pct)
valid_point = int(img_count * (train_pct + valid_pct))
if img_count == 0:
print("{}目录下,无图片,请检查".format(os.path.join(root, sub_dir)))
import sys
sys.exit(0)
for i in range(img_count):
if i < train_point:
out_dir = os.path.join(train_dir, sub_dir)
elif i < valid_point:
out_dir = os.path.join(valid_dir, sub_dir)
else:
out_dir = os.path.join(test_dir, sub_dir)
makedir(out_dir)
target_path = os.path.join(out_dir, imgs[i])
src_path = os.path.join(dataset_dir, sub_dir, imgs[i])
shutil.copy(src_path, target_path)
print('Class:{}, train:{}, valid:{}, test:{}'.format(sub_dir, train_point, valid_point - train_point,
img_count - valid_point))
模型搭建
lenet.py
import torch.nn as nn
import torch.nn.functional as F
class LeNet(nn.Module):
def __init__(self, classes):
super(LeNet, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16*5*5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, classes)
def forward(self, x):
out = F.relu(self.conv1(x))
out = F.max_pool2d(out, 2)
out = F.relu(self.conv2(out))
out = F.max_pool2d(out, 2)
out = out.view(out.size(0), -1)
out = F.relu(self.fc1(out))
out = F.relu(self.fc2(out))
out = self.fc3(out)
return out
def initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.xavier_normal_(m.weight.data)
if m.bias is not None:
m.bias.data.zero_()
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight.data, 0, 0.1)
m.bias.data.zero_()
class LeNet2(nn.Module):
def __init__(self, classes):
super(LeNet2, self).__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 6, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(6, 16, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2)
)
self.classifier = nn.Sequential(
nn.Linear(16*5*5, 120),
nn.ReLU(),
nn.Linear(120, 84),
nn.ReLU(),
nn.Linear(84, classes)
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size()[0], -1)
x = self.classifier(x)
return x
class LeNet_bn(nn.Module):
def __init__(self, classes):
super(LeNet_bn, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.bn1 = nn.BatchNorm2d(num_features=6)
self.conv2 = nn.Conv2d(6, 16, 5)
self.bn2 = nn.BatchNorm2d(num_features=16)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.bn3 = nn.BatchNorm1d(num_features=120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, classes)
def forward(self, x):
out = self.conv1(x)
out = self.bn1(out)
out = F.relu(out)
out = F.max_pool2d(out, 2)
out = self.conv2(out)
out = self.bn2(out)
out = F.relu(out)
out = F.max_pool2d(out, 2)
out = out.view(out.size(0), -1)
out = self.fc1(out)
out = self.bn3(out)
out = F.relu(out)
out = F.relu(self.fc2(out))
out = self.fc3(out)
return out
def initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.xavier_normal_(m.weight.data)
if m.bias is not None:
m.bias.data.zero_()
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight.data, 0, 1)
m.bias.data.zero_()
commen_tools.py
import torch
import random
import psutil
import numpy as np
from PIL import Image
import torchvision.transforms as transforms
def transform_invert(img_, transform_train):
"""
将data 进行反transfrom操作
:param img_: tensor
:param transform_train: torchvision.transforms
:return: PIL image
"""
if 'Normalize' in str(transform_train):
norm_transform = list(filter(lambda x: isinstance(x, transforms.Normalize), transform_train.transforms))
mean = torch.tensor(norm_transform[0].mean, dtype=img_.dtype, device=img_.device)
std = torch.tensor(norm_transform[0].std, dtype=img_.dtype, device=img_.device)
img_.mul_(std[:, None, None]).add_(mean[:, None, None])
img_ = img_.transpose(0, 2).transpose(0, 1)
if 'ToTensor' in str(transform_train) or img_.max() < 1:
img_ = img_.detach().numpy() * 255
if img_.shape[2] == 3:
img_ = Image.fromarray(img_.astype('uint8')).convert('RGB')
elif img_.shape[2] == 1:
img_ = Image.fromarray(img_.astype('uint8').squeeze())
else:
raise Exception("Invalid img shape, expected 1 or 3 in axis 2, but got {}!".format(img_.shape[2]) )
return img_
def set_seed(seed=1):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
def get_memory_info():
virtual_memory = psutil.virtual_memory()
used_memory = virtual_memory.used/1024/1024/1024
free_memory = virtual_memory.free/1024/1024/1024
memory_percent = virtual_memory.percent
memory_info = "Usage Memory:{:.2f} G,Percentage: {:.1f}%,Free Memory:{:.2f} G".format(
used_memory, memory_percent, free_memory)
return memory_info
my_dataset.py
import numpy as np
import torch
import os
import random
from PIL import Image
from torch.utils.data import Dataset
random.seed(1)
rmb_label = {"1": 0, "100": 1}
class RMBDataset(Dataset):
def __init__(self, data_dir, transform=None):
"""
rmb面额分类任务的Dataset
:param data_dir: str, 数据集所在路径
:param transform: torch.transform,数据预处理
"""
self.label_name = {"1": 0, "100": 1}
self.data_info = self.get_img_info(data_dir)
self.transform = transform
def __getitem__(self, index):
path_img, label = self.data_info[index]
img = Image.open(path_img).convert('RGB')
if self.transform is not None:
img = self.transform(img)
return img, label
def __len__(self):
return len(self.data_info)
@staticmethod
def get_img_info(data_dir):
data_info = list()
for root, dirs, _ in os.walk(data_dir):
for sub_dir in dirs:
img_names = os.listdir(os.path.join(root, sub_dir))
img_names = list(filter(lambda x: x.endswith('.jpg'), img_names))
for i in range(len(img_names)):
img_name = img_names[i]
path_img = os.path.join(root, sub_dir, img_name)
label = rmb_label[sub_dir]
data_info.append((path_img, int(label)))
return data_info
class AntsDataset(Dataset):
def __init__(self, data_dir, transform=None):
self.label_name = {"ants": 0, "bees": 1}
self.data_info = self.get_img_info(data_dir)
self.transform = transform
def __getitem__(self, index):
path_img, label = self.data_info[index]
img = Image.open(path_img).convert('RGB')
if self.transform is not None:
img = self.transform(img)
return img, label
def __len__(self):
return len(self.data_info)
def get_img_info(self, data_dir):
data_info = list()
for root, dirs, _ in os.walk(data_dir):
for sub_dir in dirs:
img_names = os.listdir(os.path.join(root, sub_dir))
img_names = list(filter(lambda x: x.endswith('.jpg'), img_names))
for i in range(len(img_names)):
img_name = img_names[i]
path_img = os.path.join(root, sub_dir, img_name)
label = self.label_name[sub_dir]
data_info.append((path_img, int(label)))
if len(data_info) == 0:
raise Exception("\ndata_dir:{} is a empty dir! Please checkout your path to images!".format(data_dir))
return data_info
class PortraitDataset(Dataset):
def __init__(self, data_dir, transform=None, in_size = 224):
super(PortraitDataset, self).__init__()
self.data_dir = data_dir
self.transform = transform
self.label_path_list = list()
self.in_size = in_size
self._get_img_path()
def __getitem__(self, index):
path_label = self.label_path_list[index]
path_img = path_label[:-10] + ".png"
img_pil = Image.open(path_img).convert('RGB')
img_pil = img_pil.resize((self.in_size, self.in_size), Image.BILINEAR)
img_hwc = np.array(img_pil)
img_chw = img_hwc.transpose((2, 0, 1))
label_pil = Image.open(path_label).convert('L')
label_pil = label_pil.resize((self.in_size, self.in_size), Image.NEAREST)
label_hw = np.array(label_pil)
label_chw = label_hw[np.newaxis, :, :]
label_hw[label_hw != 0] = 1
if self.transform is not None:
img_chw_tensor = torch.from_numpy(self.transform(img_chw.numpy())).float()
label_chw_tensor = torch.from_numpy(self.transform(label_chw.numpy())).float()
else:
img_chw_tensor = torch.from_numpy(img_chw).float()
label_chw_tensor = torch.from_numpy(label_chw).float()
return img_chw_tensor, label_chw_tensor
def __len__(self):
return len(self.label_path_list)
def _get_img_path(self):
file_list = os.listdir(self.data_dir)
file_list = list(filter(lambda x: x.endswith("_matte.png"), file_list))
path_list = [os.path.join(self.data_dir, name) for name in file_list]
random.shuffle(path_list)
if len(path_list) == 0:
raise Exception("\ndata_dir:{} is a empty dir! Please checkout your path to images!".format(self.data_dir))
self.label_path_list = path_list
class PennFudanDataset(Dataset):
def __init__(self, data_dir, transforms):
self.data_dir = data_dir
self.transforms = transforms
self.img_dir = os.path.join(data_dir, "PNGImages")
self.txt_dir = os.path.join(data_dir, "Annotation")
self.names = [name[:-4] for name in list(filter(lambda x: x.endswith(".png"), os.listdir(self.img_dir)))]
def __getitem__(self, index):
"""
返回img和target
:param idx:
:return:
"""
name = self.names[index]
path_img = os.path.join(self.img_dir, name + ".png")
path_txt = os.path.join(self.txt_dir, name + ".txt")
img = Image.open(path_img).convert("RGB")
f = open(path_txt, "r")
import re
points = [re.findall(r"\d+", line) for line in f.readlines() if "Xmin" in line]
boxes_list = list()
for point in points:
box = [int(p) for p in point]
boxes_list.append(box[-4:])
boxes = torch.tensor(boxes_list, dtype=torch.float)
labels = torch.ones((boxes.shape[0],), dtype=torch.long)
target = {}
target["boxes"] = boxes
target["labels"] = labels
if self.transforms is not None:
img, target = self.transforms(img, target)
return img, target
def __len__(self):
if len(self.names) == 0:
raise Exception("\ndata_dir:{} is a empty dir! Please checkout your path to images!".format(self.data_dir))
return len(self.names)
class CelebADataset(Dataset):
def __init__(self, data_dir, transforms):
self.data_dir = data_dir
self.transform = transforms
self.img_names = [name for name in list(filter(lambda x: x.endswith(".jpg"), os.listdir(self.data_dir)))]
def __getitem__(self, index):
path_img = os.path.join(self.data_dir, self.img_names[index])
img = Image.open(path_img).convert('RGB')
if self.transform is not None:
img = self.transform(img)
return img
def __len__(self):
if len(self.img_names) == 0:
raise Exception("\ndata_dir:{} is a empty dir! Please checkout your path to images!".format(self.data_dir))
return len(self.img_names)
架构搭建
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torch.optim as optim
from matplotlib import pyplot as plt
path_lenet = os.path.abspath(os.path.join(BASE_DIR, "model", "lenet.py"))
path_tools = os.path.abspath(os.path.join(BASE_DIR, "tools", "common_tools.py"))
import sys
hello_pytorch_DIR = os.path.abspath(os.path.dirname(__file__)+os.path.sep+".."+os.path.sep+"..")
sys.path.append(hello_pytorch_DIR)
from model.lenet import LeNet
from tools.my_dataset import RMBDataset
from tools.common_tools import set_seed
set_seed()
rmb_label = {"1": 0, "100": 1}
MAX_EPOCH = 10
BATCH_SIZE = 16
LR = 0.01
log_interval = 10
val_interval = 1
split_dir = os.path.abspath(os.path.join(BASE_DIR, "..", "..", "data", "rmb_split"))
if not os.path.exists(split_dir):
raise Exception(r"数据 {} 不存在, 回到lesson-06\1_split_dataset.py生成数据".format(split_dir))
train_dir = os.path.join(split_dir, "train")
valid_dir = os.path.join(split_dir, "valid")
norm_mean = [0.485, 0.456, 0.406]
norm_std = [0.229, 0.224, 0.225]
train_transform = transforms.Compose([
transforms.Resize((32, 32)),
transforms.RandomCrop(32, padding=4),
transforms.ToTensor(),
transforms.Normalize(norm_mean, norm_std),
])
valid_transform = transforms.Compose([
transforms.Resize((32, 32)),
transforms.ToTensor(),
transforms.Normalize(norm_mean, norm_std),
])
train_data = RMBDataset(data_dir=train_dir, transform=train_transform)
valid_data = RMBDataset(data_dir=valid_dir, transform=valid_transform)
train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(dataset=valid_data, batch_size=BATCH_SIZE)
net = LeNet(classes=2)
net.initialize_weights()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=LR, momentum=0.9)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
train_curve = list()
valid_curve = list()
for epoch in range(MAX_EPOCH):
loss_mean = 0.
correct = 0.
total = 0.
net.train()
for i, data in enumerate(train_loader):
inputs, labels = data
outputs = net(inputs)
optimizer.zero_grad()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).squeeze().sum().numpy()
loss_mean += loss.item()
train_curve.append(loss.item())
if (i+1) % log_interval == 0:
loss_mean = loss_mean / log_interval
print("Training:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(
epoch, MAX_EPOCH, i+1, len(train_loader), loss_mean, correct / total))
loss_mean = 0.
scheduler.step()
if (epoch+1) % val_interval == 0:
correct_val = 0.
total_val = 0.
loss_val = 0.
net.eval()
with torch.no_grad():
for j, data in enumerate(valid_loader):
inputs, labels = data
outputs = net(inputs)
loss = criterion(outputs, labels)
_, predicted = torch.max(outputs.data, 1)
total_val += labels.size(0)
correct_val += (predicted == labels).squeeze().sum().numpy()
loss_val += loss.item()
loss_val_epoch = loss_val / len(valid_loader)
valid_curve.append(loss_val_epoch)
print("Valid:\t Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(
epoch, MAX_EPOCH, j+1, len(valid_loader), loss_val_epoch, correct_val / total_val))
train_x = range(len(train_curve))
train_y = train_curve
train_iters = len(train_loader)
valid_x = np.arange(1, len(valid_curve)+1) * train_iters*val_interval - 1
valid_y = valid_curve
plt.plot(train_x, train_y, label='Train')
plt.plot(valid_x, valid_y, label='Valid')
plt.legend(loc='upper right')
plt.ylabel('loss value')
plt.xlabel('Iteration')
plt.show()
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
test_dir = os.path.join(BASE_DIR, "test_data")
test_data = RMBDataset(data_dir=test_dir, transform=valid_transform)
valid_loader = DataLoader(dataset=test_data, batch_size=1)
for i, data in enumerate(valid_loader):
inputs, labels = data
outputs = net(inputs)
_, predicted = torch.max(outputs.data, 1)
rmb = 1 if predicted.numpy()[0] == 0 else 100
print("模型获得{}元".format(rmb))
|