代码列表: 1. utils.py
class AverageMeter(object):
def __init__(self):
self.reset()
def reset(self):
self.total = 0
self.avg = 0
self.cnt = 0
def update(self, val, n=1):
self.cnt += n
self.total += val * n
self.avg = self.total / self.cnt
2. vgg.py
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.dygraph import to_variable
from paddle.fluid.dygraph import Conv2D
from paddle.fluid.dygraph import Dropout
from paddle.fluid.dygraph import BatchNorm
from paddle.fluid.dygraph import Pool2D
from paddle.fluid.dygraph import Linear
model_path = {
'vgg16bn': './vgg16_bn',
}
class ConvBNLayer(fluid.dygraph.Layer):
def __init__(self,
num_channels,
num_filters,
filter_size=3,
stride=1,
groups=1,
use_bn=True,
act='relu',
name=None):
super(ConvBNLayer, self).__init__(name)
self.use_bn = use_bn
if use_bn:
self.conv = Conv2D(num_channels=num_channels,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size-1)//2,
groups=groups,
act=None,
bias_attr=None)
self.bn = BatchNorm(num_filters, act=act)
else:
self.conv = Conv2D(num_channels=num_channels,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size-1)//2,
groups=groups,
act=act,
bias_attr=None)
def forward(self, inputs):
y = self.conv(inputs)
if self.use_bn:
y = self.bn(y)
return y
class VGG(fluid.dygraph.Layer):
def __init__(self, layers=16, use_bn=False, num_classes=1000):
super(VGG, self).__init__()
self.layers = layers
self.use_bn = use_bn
supported_layers = [16, 19]
assert layers in supported_layers
if layers == 16:
depth = [2, 2, 3, 3, 3]
elif layers == 19:
depth = [2, 2, 4, 4, 4]
num_channels = [3, 64, 128, 256, 512]
num_filters = [64, 128, 256, 512, 512]
self.layer1 = fluid.dygraph.Sequential(*self.make_layer(num_channels[0], num_filters[0], depth[0], use_bn, name='layer1'))
self.layer2 = fluid.dygraph.Sequential(*self.make_layer(num_channels[1], num_filters[1], depth[1], use_bn, name='layer2'))
self.layer3 = fluid.dygraph.Sequential(*self.make_layer(num_channels[2], num_filters[2], depth[2], use_bn, name='layer3'))
self.layer4 = fluid.dygraph.Sequential(*self.make_layer(num_channels[3], num_filters[3], depth[3], use_bn, name='layer4'))
self.layer5 = fluid.dygraph.Sequential(*self.make_layer(num_channels[4], num_filters[4], depth[4], use_bn, name='layer5'))
self.classifier = fluid.dygraph.Sequential(
Linear(input_dim=512 * 7 * 7, output_dim=4096, act='relu'),
Dropout(),
Linear(input_dim=4096, output_dim=4096, act='relu'),
Dropout(),
Linear(input_dim=4096, output_dim=num_classes))
self.out_dim = 512 * 7 * 7
def forward(self, inputs):
x = self.layer1(inputs)
x = fluid.layers.pool2d(x, pool_size=2, pool_stride=2)
x = self.layer2(x)
x = fluid.layers.pool2d(x, pool_size=2, pool_stride=2)
x = self.layer3(x)
x = fluid.layers.pool2d(x, pool_size=2, pool_stride=2)
x = self.layer4(x)
x = fluid.layers.pool2d(x, pool_size=2, pool_stride=2)
x = self.layer5(x)
x = fluid.layers.pool2d(x, pool_size=2, pool_stride=2)
x = fluid.layers.adaptive_pool2d(x, pool_size=(7,7), pool_type='avg')
x = fluid.layers.reshape(x, shape=[-1, self.out_dim])
x = self.classifier(x)
return x
def make_layer(self, num_channels, num_filters, depth, use_bn, name=None):
layers = []
layers.append(ConvBNLayer(num_channels, num_filters, use_bn=use_bn, name=f'{name}.0'))
for i in range(1, depth):
layers.append(ConvBNLayer(num_filters, num_filters, use_bn=use_bn, name=f'{name}.{i}'))
return layers
def VGG16BN(pretrained=False):
model = VGG(layers=16, use_bn=True)
if pretrained:
model_dict, _ = fluid.load_dygraph(model_path['vgg16bn'])
model.set_dict(model_dict)
return model
def main():
with fluid.dygraph.guard():
x_data = np.random.rand(2, 3, 224, 224).astype(np.float32)
x = to_variable(x_data)
model = VGG16BN()
model.eval()
pred = model(x)
print('vgg16bn: pred.shape = ', pred.shape)
if __name__ == "__main__":
main()
3. basic_transforms.py
import cv2
import numpy as np
class Compose(object):
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, image, label=None):
for t in self.transforms:
image, label = t(image, label)
return image, label
class Normalize(object):
def __init__(self, mean_val, std_val, val_scale=1):
self.mean = np.array(mean_val, dtype=np.float32)
self.std = np.array(std_val, dtype=np.float32)
self.val_scale = 1/255.0 if val_scale==1 else 1
def __call__(self, image, label=None):
image = image.astype(np.float32)
image = image * self.val_scale
image = image - self.mean
image = image * (1 / self.std)
return image, label
class ConvertDataType(object):
def __call__(self, image, label=None):
if label is not None:
label = label.astype(np.int64)
return image.astype(np.float32), label
class Pad(object):
def __init__(self, size, ignore_label=255, mean_val=0, val_scale=1):
factor = 255 if val_scale == 1 else 1
self.size = size
self.ignore_label = ignore_label
self.mean_val=mean_val
if isinstance(self.mean_val, (tuple,list)):
self.mean_val = [int(x* factor) for x in self.mean_val]
else:
self.mean_val = int(self.mean_val * factor)
def __call__(self, image, label=None):
h, w, c = image.shape
pad_h = max(self.size - h, 0)
pad_w = max(self.size - w, 0)
pad_h_half = int(pad_h / 2)
pad_w_half = int(pad_w / 2)
if pad_h > 0 or pad_w > 0:
image = cv2.copyMakeBorder(image,
top=pad_h_half,
left=pad_w_half,
bottom=pad_h - pad_h_half,
right=pad_w - pad_w_half,
borderType=cv2.BORDER_CONSTANT,
value=self.mean_val)
if label is not None:
label = cv2.copyMakeBorder(label,
top=pad_h_half,
left=pad_w_half,
bottom=pad_h - pad_h_half,
right=pad_w - pad_w_half,
borderType=cv2.BORDER_CONSTANT,
value=self.ignore_label)
return image, label
class CenterCrop(object):
def __init__(self, crop_size):
self.crop_h = crop_size
self.crop_w = crop_size
def __call__(self, image, label = None):
h, w, c = image.shape
top = (h - self.crop_h) // 2
left = (w - self.crop_w) // 2
image = image[top:top+self.crop_h, left:left+self.crop_w,:]
if label is not None:
label = label[top:top+self.crop_h, left:left+self.crop_w,:]
return image, label
class Resize(object):
def __init__(self, size):
self.size = size
def __call__(self, image, label=None):
image = cv2.resize(image, (self.size, self.size), interpolation=cv2.INTER_LINEAR)
if label is not None:
label = cv2.resize(label, (self.size, self.size), interpolation=cv2.INTER_NEAREST)
return image,label
class RandomFlip(object):
def __call__(self, image, label = None):
prob_of_filp = np.random.rand()
if prob_of_filp > 0.5:
image = cv2.flip(image, 1)
if label is not None:
label = cv2.flip(label,1)
return image, label
class RandomCrop(object):
def __init__(self, crop_size):
self.crop_size = crop_size
def __call__(self, image, label = None):
h, w, c = image.shape
top = np.random.uniform(h-self.crop_size)
left = np.random.uniform(w-self.crop_size)
assert top >= 0, "Error: crop_size > image height!"
assert left >= 0, "Error: crop_size > image width!"
rect = np.array([int(left),
int(top),
int(left + self.crop_size),
int(top + self.crop_size)])
image = image[rect[1]:rect[3], rect[0]:rect[2], :]
if label is not None:
label = label[rect[1]:rect[3], rect[0]:rect[2], :]
return image,label
class Scale(object):
def __call__(self, image, label=None, scale=1.0):
if not isinstance(scale,(list,tuple)):
scale = [scale, scale]
h, w, c = image.shape
image = cv2.resize(image,
(int(w*scale[0]),int(h*scale[1])),
interpolation=cv2.INTER_LINEAR)
if label is not None:
label = cv2.resize(label,
(int(w*scale[0]),int(h*scale[1])),
interpolation=cv2.INTER_NEAREST)
return image,label
class RandomScale(object):
def __init__(self, min_scale=0.5, max_scale=2.0, step=0.25):
self.min_scale = min_scale
self.max_scale = max_scale
self.step = step
self.scale = Scale()
def __call__(self, image, label=None, scale=1.0):
if self.step == 0:
self.random_scale = np.random.rand.uniform(self.min_scale,
self.max_scale,
1)[0]
else:
num_steps = int((self.max_scale - self.min_scale) / self.step + 1)
scale_factors = np.linspace(self.min_scale,
self.max_scale,
num_steps)
np.random.shuffle(scale_factors)
self.random_scale = scale_factors[0]
image, label = self.scale(image, label, self.random_scale)
return image,label
def main():
image = cv2.imread('./work/dummy_data/JPEGImages/2008_000064.jpg')
label = cv2.imread('./work/dummy_data/GroundTruth_trainval_png/2008_000064.png')
crop_size = 200
augment = Compose([
RandomScale(),
RandomFlip(),
Pad(crop_size, mean_val=[0.485, 0.456, 0.406]),
RandomCrop(crop_size),
ConvertDataType(),
])
new_img, new_label = augment(image,label)
cv2.imwrite('img_new.png', new_img)
cv2.imwrite('label_new.png', new_label)
if __name__ == "__main__":
main()
4. basic_data_preprocessing.py
import basic_transforms as t
class TrainAugmentation():
def __init__(self, image_size, mean_val=0, std_val=1.0):
self.image_size = image_size
self.mean_val = mean_val
self.std_val = std_val
def __call__(self, image, label):
self.augment = t.Compose([
t.RandomScale(),
t.RandomFlip(),
t.ConvertDataType(),
])
return self.augment(image, label)
5. fcn8s.py
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.dygraph import to_variable
from paddle.fluid.dygraph import Conv2D
from paddle.fluid.dygraph import Conv2DTranspose
from paddle.fluid.dygraph import Dropout
from paddle.fluid.dygraph import Pool2D
from vgg import VGG16BN
class FCN8s(fluid.dygraph.Layer):
def __init__(self, num_classes=59):
super(FCN8s, self).__init__()
backbone = VGG16BN(pretrained=False)
self.layer1 = backbone.layer1
self.layer1[0].conv._padding = [100,100]
self.pool1 = Pool2D(pool_size=2, pool_stride=2, ceil_mode=True)
self.layer2 = backbone.layer2
self.pool2 = Pool2D(pool_size=2, pool_stride=2, ceil_mode=True)
self.layer3 = backbone.layer3
self.pool3 = Pool2D(pool_size=2, pool_stride=2, ceil_mode=True)
self.layer4 = backbone.layer4
self.pool4 = Pool2D(pool_size=2, pool_stride=2, ceil_mode=True)
self.layer5 = backbone.layer5
self.pool5 = Pool2D(pool_size=2, pool_stride=2, ceil_mode=True)
self.fc6 = Conv2D(512, 4096, 7, act='relu')
self.fc7 = Conv2D(4096, 4096, 1, act='relu')
self.drop6 = Dropout()
self.drop7 = Dropout()
self.score = Conv2D(4096, num_classes, 1)
self.score_pool3 = Conv2D(256, num_classes, 1)
self.score_pool4 = Conv2D(512, num_classes, 1)
self.up_output = Conv2DTranspose(num_channels=num_classes,
num_filters=num_classes,
filter_size=4,
stride=2,
bias_attr=False)
self.up_pool4 = Conv2DTranspose(num_channels=num_classes,
num_filters=num_classes,
filter_size=4,
stride=2,
bias_attr=False)
self.up_final = Conv2DTranspose(num_channels=num_classes,
num_filters=num_classes,
filter_size=16,
stride=8,
bias_attr=False)
def forward(self, inputs):
x = self.layer1(inputs)
x = self.pool1(x)
x = self.layer2(x)
x = self.pool2(x)
x = self.layer3(x)
x = self.pool3(x)
pool3 = x
x = self.layer4(x)
x = self.pool4(x)
pool4 = x
x = self.layer5(x)
x = self.pool5(x)
x = self.fc6(x)
x = self.drop6(x)
x = self.fc7(x)
x = self.drop7(x)
x = self.score(x)
x = self.up_output(x)
up_output = x
x = self.score_pool4(pool4)
x = x[:, :, 5:5+up_output.shape[2], 5:5+up_output.shape[3]]
up_pool4 = x
x = up_pool4 + up_output
x = self.up_pool4(x)
up_pool4 = x
x = self.score_pool3(pool3)
x = x[:, :, 9:9+up_pool4.shape[2], 9:9+up_pool4.shape[3]]
up_pool3 = x
x = up_pool3 + up_pool4
x = self.up_final(x)
x = x[:, :, 31:31+inputs.shape[2], 31:31+inputs.shape[3]]
return x
6. basic_seg_loss.py
import paddle
import paddle.fluid as fluid
import numpy as np
import cv2
eps = 1e-8
def Basic_SegLoss(preds, labels, ignore_index=255):
n, c, h, w = preds.shape
print(preds.shape)
print(labels.shape)
loss = fluid.layers.softmax_with_cross_entropy(preds, labels,axis=1)
mask = labels!=ignore_index
mask = fluid.layers.cast(mask, 'float32')
loss = loss * mask
avg_loss = fluid.layers.mean(loss) / (fluid.layers.mean(mask) + eps)
return avg_loss
def main():
label = cv2.imread('./work/dummy_data/GroundTruth_trainval_png/2008_000007.png')
label = cv2.cvtColor(label, cv2.COLOR_BGR2GRAY).astype(np.int64)
pred = np.random.uniform(0, 1, (1, 59, label.shape[0], label.shape[1])).astype(np.float32)
label = label[np.newaxis,:,:]
label = label[np.newaxis, :, :, :]
with fluid.dygraph.guard(fluid.CPUPlace()):
pred = fluid.dygraph.to_variable(pred)
label = fluid.dygraph.to_variable(label)
loss = Basic_SegLoss(pred, label)
print(loss)
if __name__ == "__main__":
main()
7. basic_train.py
import os
import paddle
import paddle.fluid as fluid
from paddle.fluid.optimizer import AdamOptimizer
import numpy as np
import cv2
import argparse
from utils import AverageMeter
from basic_seg_loss import Basic_SegLoss
from basic_data_preprocessing import TrainAugmentation
from fcn8s import FCN8s
parser = argparse.ArgumentParser()
parser.add_argument('--net', type=str, default='basic')
parser.add_argument('--lr', type=float, default=0.001)
parser.add_argument('--num_epochs', type=int, default=10)
parser.add_argument('--batch_size', type=int, default=1)
parser.add_argument('--image_folder', type=str, default='./work/dummy_data')
parser.add_argument('--image_list_file', type=str, default='./work/dummy_data/list.txt')
parser.add_argument('--checkpoint_folder', type=str, default='./output')
parser.add_argument('--save_freq', type=int, default=2)
args = parser.parse_args()
def train(dataloader, model, criterion, optimizer, epoch, basic_augmentation):
model.train()
train_loss_meter = AverageMeter()
for batch_id, datas in enumerate(dataloader()):
imgs = [fluid.dygraph.to_variable(_) for _ in datas[0]]
imgs = fluid.layers.concat(imgs, axis=0)
labels = [fluid.dygraph.to_variable(_) for _ in datas[1]]
labels = fluid.layers.concat(labels, axis=0)
out = model(imgs)
loss = criterion(out, labels)
loss.backward()
optimizer.minimize(loss)
optimizer.clear_gradients()
n = imgs.shape[0]
train_loss_meter.update(loss.numpy()[0], n)
print(f"Epoch[{epoch:03d}/{args.num_epochs:03d}], " +
f"Step[{batch_id:04d}, " +
f"Average Loss: {train_loss_meter.avg:4f}")
return train_loss_meter.avg
def train_dataloader(batch_size):
with open(args.image_list_file, 'r') as f:
train_data = [line.strip() for line in f.readlines()]
def reader():
index = np.arange(len(train_data))
mask = np.random.choice(index, batch_size, replace = False)
imgs = []
labels = []
for indexs in mask:
img_path, label_path = train_data[indexs].split()
img_path = os.path.join(args.image_folder, img_path)
label_path = os.path.join(args.image_folder, label_path)
img = cv2.imread(img_path)
img = cv2.resize(img, (512, 512), cv2.INTER_NEAREST)
if len(img.shape) == 3:
img = np.transpose(img, (2,0,1))
img = np.expand_dims(img, axis=0).astype('float32')
label = cv2.imread(label_path).astype('float32')
label = cv2.resize(label, (512, 512), cv2.INTER_NEAREST)
label = cv2.cvtColor(label, cv2.COLOR_BGR2GRAY)
if len(label.shape) == 3:
label = np.transpose(label, (2,0,1))
label = np.expand_dims(label, axis=0)
label = np.expand_dims(label, axis=0).astype('int64')
assert label.shape[2:] == img.shape[2:],'Error'
imgs.append(img)
labels.append(label)
assert len(labels) == len(mask), 'Error'
yield imgs, labels
return reader
def main():
place = paddle.fluid.CPUPlace()
with fluid.dygraph.guard(place):
basic_augmentation = TrainAugmentation(image_size=256)
train_reader = train_dataloader(args.batch_size)
if args.net == "basic":
model = FCN8s(num_classes=59)
else:
raise NotImplementedError(f"args.net: {args.net} is not Supported!")
criterion = Basic_SegLoss
optimizer = AdamOptimizer(learning_rate=args.lr, parameter_list=model.parameters())
for epoch in range(1, args.num_epochs+1):
train_loss = train(train_reader,
model,
criterion,
optimizer,
epoch,
basic_augmentation)
print(f"----- Epoch[{epoch}/{args.num_epochs}] Train Loss: {train_loss:.4f}")
if epoch % args.save_freq == 0 or epoch == args.num_epochs:
model_path = os.path.join(args.checkpoint_folder, f"{args.net}-Epoch-{epoch}-Loss-{train_loss}")
fluid.save_dygraph(model.state_dict(), 'save_model_state_dict')
fluid.save_dygraph(optimizer.state_dict(), 'save_optimizer_state_dict')
print(f'----- Save model: {model_path}.pdparams')
print(f'----- Save optimizer: {model_path}.pdopt')
if __name__ == "__main__":
main()
结果:
aistudio@jupyter-559108-2495737:~$ python ./work/basic_train.py
[1, 59, 512, 512]
[1, 1, 512, 512]
Epoch[001/010], Step[0000, Average Loss: 4.101622
----- Epoch[1/10] Train Loss: 4.1016
[1, 59, 512, 512]
[1, 1, 512, 512]
Epoch[002/010], Step[0000, Average Loss: 4.543366
----- Epoch[2/10] Train Loss: 4.5434
...
|