项目地址:GitHub地址
????????该项目是自己写的一个简单的图像分类项目,环境是pytorch 1.7.0 cuda,从数据读取到训练再到最后的部署。
????????废话不多说,直接上代码:
DataLoader.py
# -*- coding:utf-8 -*-
import os
import numpy as np
from torchvision import transforms, utils
from PIL import Image
from torch.utils.data import Dataset, DataLoader
##################################################
# define dataloader class
##################################################
def default_loader(path):
img_pil = Image.open(path)
img_pil = img_pil.resize((224, 224))
normalize = transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
preprocess = transforms.Compose([
# transforms.Scale(256),
# transforms.CenterCrop(224),
transforms.ToTensor(),
normalize
])
img_tensor = preprocess(img_pil)
return img_tensor
class Trainset(Dataset):
def __init__(self, file_train, number_train, loader=default_loader):
self.images = file_train
self.target = number_train
self.loader = loader
def __getitem__(self, index):
fn = self.images[index]
img = self.loader(fn)
target = self.target[index]
return img, target
def __len__(self):
return len(self.images)
def getDataset(path):
image = []
label = []
label_n = []
label_dict = {}
for index, files in enumerate(os.listdir(path)):
for images in os.listdir(os.path.join(path, files)):
images_path = os.path.join(os.path.join(path, files), images)
image.append(images_path)
label.append(files)
label_n.append(index)
label_dict[index] = files
image_len = len(image)
index = np.arange(image_len)
np.random.shuffle(index)
return np.array(image)[index], np.array(label_n)[index], label_dict
DataLoader.py主要是定义一个数据提取工具,从数据集中取数据用的
Trainer.py
# -*- coding:utf-8 -*-
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import argparse
from DataLoader import getDataset, Trainset
USE_DEFAULT_MODEL = True
##################################################
# define Net class
##################################################
class Net(nn.Module):
# This is a simple train net, you can use resnet18 or more deep net
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=7)
self.conv2 = nn.Conv2d(32, 64, kernel_size=7)
self.conv2_drop = nn.Dropout2d()
self.conv3 = nn.Conv2d(64, 64, kernel_size=7)
self.conv3_drop = nn.Dropout2d()
self.conv4 = nn.Conv2d(64, 64, kernel_size=3)
self.conv4_drop = nn.Dropout2d()
self.fc1 = nn.Linear(6400, 1000)
self.fc2 = nn.Linear(1000, 48)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = F.relu(F.max_pool2d(self.conv3_drop(self.conv3(x)), 2))
x = F.relu(F.max_pool2d(self.conv4_drop(self.conv4(x)), 2))
x = x.view(-1, 64 * 10 * 10)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
##################################################
# function train
##################################################
def train(args, model, device, trainloader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(trainloader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
target = target.long()
if USE_DEFAULT_MODEL:
loss = criterion(output, target)
else:
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(trainloader.dataset),
100. * batch_idx / len(trainloader), loss.item()))
##################################################
# function test
##################################################
def test(model, device, testloader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in testloader:
data, target = data.to(device), target.to(device)
output = model(data)
target = target.long()
if USE_DEFAULT_MODEL:
test_loss += criterion(output, target).sum().item()
else:
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(testloader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(testloader.dataset),
100. * correct / len(testloader.dataset)))
##################################################
# train process
##################################################
if __name__ == '__main__':
# Training settings
parser = argparse.ArgumentParser(description='DL Example')
parser.add_argument('--batch-size', type=int, default=32, metavar='N',
help='input batch size for training (default: 32)')
parser.add_argument('--test-batch-size', type=int, default=100, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=100, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.001, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=300, metavar='N',
help='how many batches to wait before logging training status')
# default setting to train
args = parser.parse_args(['--epochs', '10', '--log-interval', '100'])
# path = r'D:\image_file' #change you own dataset path
path = r'D:\T'
x, y, label_dict = getDataset(path)
# 80% of data to trian left 20% to test
train_data = x[:int(len(x) * 0.8)]
test_data = x[int(len(x) * 0.8):]
train_label = y[:int(len(x) * 0.8)]
test_label = y[int(len(x) * 0.8):]
# load data
train_data = Trainset(train_data, train_label)
test_data = Trainset(test_data, test_label)
trainloader = DataLoader(train_data, batch_size=args.batch_size, shuffle=True)
testloader = DataLoader(test_data, batch_size=100, shuffle=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if USE_DEFAULT_MODEL:
from torchvision.models.resnet import resnet18
model = resnet18(pretrained=True).to(device)
criterion = nn.CrossEntropyLoss()
else:
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
# start train epoch
for epoch in range(1, args.epochs + 1):
# train
train(args, model, device, trainloader, optimizer, epoch)
# test
test(model, device, testloader)
# save model in output path
torch.save(model.state_dict(), 'output/mymodel.pth')
????????这是用于训练模型的主要文件,batch_size需要根据自己的电脑性能来定,我的3080ti设置128是没问题的。这里自己定义了一个简单的网络,也加载了torch模型库预定义的resnet18作为主干网络,最后模型会保存在output文件夹中
ModelTransformer.py
# -*- coding:utf-8 -*-
from Trainer import Net,USE_DEFAULT_MODEL
import torch
from torch.autograd import Variable
##################################################
# change torch model into onnx
##################################################
if __name__ == "__main__":
# load net shape and train weight
if USE_DEFAULT_MODEL:
from torchvision.models.resnet import resnet18
trained_model = resnet18()
else:
trained_model = Net()
trained_model.load_state_dict(torch.load('output/mymodel.pth'))
# prepare dummy input (this dummy input shape should same with train input)
dummy_input = Variable(torch.randn(1, 3, 224, 224))
# export onnx model
torch.onnx.export(trained_model, dummy_input, "output/mymodel.onnx")
? ? ? ? 这里使用了onnx深度学习模型转换工具,转换成.onnx格式的权重模型
Detector.py
# -*- coding:utf-8 -*-
import time
import numpy as np
from PIL import Image
import onnxruntime
from DataLoader import getDataset
##################################################
# change input data format
##################################################
def preprocess(img_file, w, h):
# convert input data into [1,3,w,h]
img = Image.open(img_file)
img = img.resize((w, h), Image.BILINEAR)
# convert the input data into the float32 input
img_data = np.array(img)
img_data = np.transpose(img_data, [2, 0, 1])
img_data = np.expand_dims(img_data, 0)
mean_vec = np.array([0.485, 0.456, 0.406])
stddev_vec = np.array([0.229, 0.224, 0.225])
norm_img_data = np.zeros(img_data.shape).astype('float32')
for i in range(img_data.shape[1]):
norm_img_data[:, i, :, :] = (img_data[:, i, :, :] / 255 - mean_vec[i]) / stddev_vec[i]
return norm_img_data.astype('float32'), np.array(img)
if __name__ == "__main__":
datapath = r'D:\datapath' # image data path
_, _, data_dict = getDataset(datapath)
img_file = r'D:\1.png' # test image
input_data, raw_data = preprocess(img_file, 224, 224)
session = onnxruntime.InferenceSession('output/mymodel.onnx')
session.get_modelmeta()
startt = time.time()
results = session.run(None, {"input.1": input_data})
print("inference time :%0.6f" % (time.time() - startt))
print('predict label :', data_dict[np.argmax(results)])
????????使用onnxruntime进行模型的推理速度会快很多,就是图像的输入需要再进行一个转换。这个分类模型在推理224*224的图片时基本可以将时间压缩在10ms以内。
|