keras版本链接
导包
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from resnets_utils import *
Dataset类
class MyDataset(Dataset):
def __init__(self, x, y):
super(MyDataset, self).__init__()
assert x.shape[0] == y.shape[0]
self.x = x
self.y = y
def __len__(self):
return self.x.shape[0]
def __getitem__(self, item):
return self.x[item], self.y[item]
Flatten类
class Flatten(nn.Module):
def __init__(self, start_dim=1, end_dim=-1):
super(Flatten, self).__init__()
self.start_dim = start_dim
self.end_dim = end_dim
def forward(self, input):
return input.flatten(self.start_dim, self.end_dim)
The identity block
class IdentityBlock(nn.Module):
def __init__(self, channels, f):
super(IdentityBlock, self).__init__()
channel1, channel2, channel3, channel4 = channels
self.conv = nn.Sequential(
nn.Conv2d(in_channels=channel1, out_channels=channel2, kernel_size=1, stride=1, padding=0),
nn.BatchNorm2d(num_features=channel2),
nn.ReLU(),
nn.Conv2d(in_channels=channel2, out_channels=channel3, kernel_size=f, stride=1, padding=(f - 1) // 2),
nn.BatchNorm2d(num_features=channel3),
nn.ReLU(),
nn.Conv2d(in_channels=channel3, out_channels=channel4, kernel_size=1, stride=1, padding=0),
nn.BatchNorm2d(num_features=channel4),
)
def forward(self, input):
x_shortcut = input
x = self.conv(input)
x = x_shortcut + x
x = F.relu(x)
return x
The convolutional block
class ConvolutionalBlock(nn.Module):
def __init__(self, channels, f, s):
super(ConvolutionalBlock, self).__init__()
channel1, channel2, channel3, channel4 = channels
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels=channel1, out_channels=channel2, kernel_size=1, stride=s, padding=0),
nn.BatchNorm2d(num_features=channel2),
nn.ReLU(),
nn.Conv2d(in_channels=channel2, out_channels=channel3, kernel_size=f, stride=1, padding=(f - 1) // 2),
nn.BatchNorm2d(num_features=channel3),
nn.ReLU(),
nn.Conv2d(in_channels=channel3, out_channels=channel4, kernel_size=1, stride=1, padding=0),
nn.BatchNorm2d(num_features=channel4)
)
self.conv2 = nn.Sequential(
nn.Conv2d(in_channels=channel1, out_channels=channel4, kernel_size=1, stride=s, padding=0),
nn.BatchNorm2d(num_features=channel4)
)
def forward(self, input):
x = self.conv1(input)
x_shortcut = self.conv2(input)
x = x + x_shortcut
x = F.relu(x)
return x
ResNet50
class ResNet50(nn.Module):
def __init__(self, classes=6):
super(ResNet50, self).__init__()
self.net = nn.Sequential(
nn.ZeroPad2d(padding=(3, 3, 3, 3)),
nn.Conv2d(in_channels=3, out_channels=64, kernel_size=7, stride=2, padding=0),
nn.BatchNorm2d(num_features=64),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2),
ConvolutionalBlock(channels=[64, 64, 64, 256], f=3, s=1),
IdentityBlock(channels=[256, 64, 64, 256], f=3),
IdentityBlock(channels=[256, 64, 64, 256], f=3),
ConvolutionalBlock(channels=[256, 128, 128, 512], f=3, s=2),
IdentityBlock(channels=[512, 128, 128, 512], f=3),
IdentityBlock(channels=[512, 128, 128, 512], f=3),
IdentityBlock(channels=[512, 128, 128, 512], f=3),
ConvolutionalBlock(channels=[512, 256, 256, 1024], f=3, s=2),
IdentityBlock(channels=[1024, 256, 256, 1024], f=3),
IdentityBlock(channels=[1024, 256, 256, 1024], f=3),
IdentityBlock(channels=[1024, 256, 256, 1024], f=3),
IdentityBlock(channels=[1024, 256, 256, 1024], f=3),
IdentityBlock(channels=[1024, 256, 256, 1024], f=3),
ConvolutionalBlock(channels=[1024, 512, 512, 2048], f=3, s=2),
IdentityBlock(channels=[2048, 512, 512, 2048], f=3),
IdentityBlock(channels=[2048, 512, 512, 2048], f=3),
nn.AvgPool2d(kernel_size=2),
Flatten(),
nn.Linear(2048, classes),
)
def forward(self, input):
x = self.net(input)
return x
加载数据集和预处理
X_train_orig, Y_train_orig, X_test_orig, Y_test_orig, classes = load_dataset()
X_train = X_train_orig / 255.
X_test = X_test_orig / 255.
X_train = np.transpose(X_train, [0, 3, 1, 2])
X_test = np.transpose(X_test, [0, 3, 1, 2])
Y_train = Y_train_orig.T
Y_test = Y_test_orig.T
print("number of training examples = " + str(X_train.shape[0]))
print("number of test examples = " + str(X_test.shape[0]))
print("X_train shape: " + str(X_train.shape))
print("Y_train shape: " + str(Y_train.shape))
print("X_test shape: " + str(X_test.shape))
print("Y_test shape: " + str(Y_test.shape))
构建网络、优化器、损失函数
model = ResNet50()
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
epochs = 2
batch_size = 32
train_dataset = MyDataset(X_train, Y_train)
train_data = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
训练
model.train()
for epoch in range(epochs):
for i, (x, y) in enumerate(train_data):
x = x.float()
y = y.long().squeeze()
optimizer.zero_grad()
y_hat = model(x)
loss = criterion(y_hat, y)
loss.backward()
optimizer.step()
测试
model.eval()
with torch.no_grad():
x = torch.tensor(X_test).float()
y = torch.tensor(Y_test).long().squeeze()
y_hat = model(x)
loss = criterion(y_hat, y)
print("Loss = ", loss.item())
y_hat = torch.argmax(y_hat, dim=-1)
correct_prediction = y_hat == y
test_accuracy = torch.sum(correct_prediction).float() / y.shape[0]
print("Test Accuracy = ", test_accuracy.item())
|