GitHub - DengPingFan/Polyp-PVT: Polyp-PVT: Polyp Segmentation with Pyramid Vision TransformersPolyp-PVT: Polyp Segmentation with Pyramid Vision Transformers - GitHub - DengPingFan/Polyp-PVT: Polyp-PVT: Polyp Segmentation with Pyramid Vision Transformershttps://github.com/DengPingFan/Polyp-PVT这个项目很简单,数据准备好就能跑,下面简单说下数据准备和小改动。
1.数据准备
这里我以建筑为例进行测试,注意这是二分类网络。
一级目录
?
二级目录 (train)
?三级目录(train),注意标签的值是0和255
?二级目录(val),这个项目的验证设计还是比较好的,考虑了去验证模型的泛化能力,我这里放了两种类型的数据,谷歌和高分的数据,其中最终的test评价数据还是用的高分数据,因为我的训练数据(train里面的数据)是高分数据
?三级目录(val)
?四级目录(val)
?注意:train、val、images、masks、test这几个文件夹的名字请不要更改,保持一致,除非你愿意去找代码里的对应地方更改。
2.训练
下面是我用的train,其实基本没有改动,就是改了数据路径,还有就是train函数有一个地方不合理,稍微改了下,多了一个val_list参数来输入验证文件,原始的不太好用。
import torch
from torch.autograd import Variable
import os
import argparse
from datetime import datetime
from lib.pvt import PolypPVT
from utils.dataloader import get_loader, test_dataset
from utils.utils import clip_gradient, adjust_lr, AvgMeter
import torch.nn.functional as F
import numpy as np
import logging
import matplotlib.pyplot as plt
def structure_loss(pred, mask):
weit = 1 + 5 * torch.abs(F.avg_pool2d(mask, kernel_size=31, stride=1, padding=15) - mask)
wbce = F.binary_cross_entropy_with_logits(pred, mask, reduce='none')
wbce = (weit * wbce).sum(dim=(2, 3)) / weit.sum(dim=(2, 3))
pred = torch.sigmoid(pred)
inter = ((pred * mask) * weit).sum(dim=(2, 3))
union = ((pred + mask) * weit).sum(dim=(2, 3))
wiou = 1 - (inter + 1) / (union - inter + 1)
return (wbce + wiou).mean()
def test(model, path, dataset):
data_path = os.path.join(path, dataset)
image_root = '{}/images/'.format(data_path)
gt_root = '{}/masks/'.format(data_path)
model.eval()
num1 = len(os.listdir(gt_root))
test_loader = test_dataset(image_root, gt_root, 352)
DSC = 0.0
for i in range(num1):
image, gt, name = test_loader.load_data()
gt = np.asarray(gt, np.float32)
gt /= (gt.max() + 1e-8)
image = image.cuda()
res, res1 = model(image)
# eval Dice
res = F.upsample(res + res1 , size=gt.shape, mode='bilinear', align_corners=False)
res = res.sigmoid().data.cpu().numpy().squeeze()
res = (res - res.min()) / (res.max() - res.min() + 1e-8)
input = res
target = np.array(gt)
N = gt.shape
smooth = 1
input_flat = np.reshape(input, (-1))
target_flat = np.reshape(target, (-1))
intersection = (input_flat * target_flat)
dice = (2 * intersection.sum() + smooth) / (input.sum() + target.sum() + smooth)
dice = '{:.4f}'.format(dice)
dice = float(dice)
DSC = DSC + dice
return DSC / num1
def train(train_loader, model, optimizer, epoch, test_path, val_list):
model.train()
global best
size_rates = [0.75, 1, 1.25]
loss_P2_record = AvgMeter()
for i, pack in enumerate(train_loader, start=1):
for rate in size_rates:
optimizer.zero_grad()
# ---- data prepare ----
images, gts = pack
images = Variable(images).cuda()
gts = Variable(gts).cuda()
# ---- rescale ----
trainsize = int(round(opt.trainsize * rate / 32) * 32)
if rate != 1:
images = F.upsample(images, size=(trainsize, trainsize), mode='bilinear', align_corners=True)
gts = F.upsample(gts, size=(trainsize, trainsize), mode='bilinear', align_corners=True)
# ---- forward ----
P1, P2= model(images)
# ---- loss function ----
loss_P1 = structure_loss(P1, gts)
loss_P2 = structure_loss(P2, gts)
loss = loss_P1 + loss_P2
# ---- backward ----
loss.backward()
clip_gradient(optimizer, opt.clip)
optimizer.step()
# ---- recording loss ----
if rate == 1:
loss_P2_record.update(loss_P2.data, opt.batchsize)
# ---- train visualization ----
if i % 20 == 0 or i == total_step:
print('{} Epoch [{:03d}/{:03d}], Step [{:04d}/{:04d}], '
' lateral-5: {:0.4f}]'.
format(datetime.now(), epoch, opt.epoch, i, total_step,
loss_P2_record.show()))
# save model
save_path = (opt.train_save)
if not os.path.exists(save_path):
os.makedirs(save_path)
torch.save(model.state_dict(), save_path +str(epoch)+ 'PolypPVT.pth')
# choose the best model
global dict_plot
# test1path = './dataset/build/val/'
if (epoch + 1) % 1 == 0:
for dataset in val_list:
dataset_dice = test(model, test_path, dataset)
logging.info('epoch: {}, dataset: {}, dice: {}'.format(epoch, dataset, dataset_dice))
print(dataset, ': ', dataset_dice)
dict_plot[dataset].append(dataset_dice)
meandice = test(model, test_path, 'test')
dict_plot['test'].append(meandice)
if meandice > best:
best = meandice
torch.save(model.state_dict(), save_path + 'PolypPVT.pth')
torch.save(model.state_dict(), save_path +str(epoch)+ 'PolypPVT-best.pth')
print('##############################################################################best', best)
logging.info('##############################################################################best:{}'.format(best))
def plot_train(dict_plot=None, name = None):
color = ['red', 'lawngreen', 'lime', 'gold', 'm', 'plum', 'blue']
line = ['-', "--"]
for i in range(len(name)):
plt.plot(dict_plot[name[i]], label=name[i], color=color[i], linestyle=line[(i + 1) % 2])
transfuse = {'CVC-300': 0.902, 'CVC-ClinicDB': 0.918, 'Kvasir': 0.918, 'CVC-ColonDB': 0.773,'ETIS-LaribPolypDB': 0.733, 'test':0.83}
plt.axhline(y=transfuse[name[i]], color=color[i], linestyle='-')
plt.xlabel("epoch")
plt.ylabel("dice")
plt.title('Train')
plt.legend()
plt.savefig('eval.png')
# plt.show()
if __name__ == '__main__':
dict_plot = {'GF2':[], 'GG':[], 'test':[]} #注意这里是自己数据val里面的列表
# name = ['GF2', 'GG', 'test']
data_lists = ['GF2', 'GG'] #注意这里是自己数据val里面的列表,并且要去掉test,因为代码有个地方已经固定给了test文件的验证
##################model_name#############################
model_name = 'gf_build'
###############################################
parser = argparse.ArgumentParser()
parser.add_argument('--epoch', type=int,
default=100, help='epoch number')
parser.add_argument('--lr', type=float,
default=1e-3, help='learning rate')
parser.add_argument('--optimizer', type=str,
default='AdamW', help='choosing optimizer AdamW or SGD')
parser.add_argument('--augmentations',
default=True, help='choose to do random flip rotation')
parser.add_argument('--batchsize', type=int,
default=4, help='training batch size')
parser.add_argument('--trainsize', type=int,
default=448, help='training dataset size')
parser.add_argument('--clip', type=float,
default=0.5, help='gradient clipping margin')
parser.add_argument('--decay_rate', type=float,
default=0.1, help='decay rate of learning rate')
parser.add_argument('--decay_epoch', type=int,
default=50, help='every n epochs decay learning rate')
parser.add_argument('--train_path', type=str,
default='./dataset/build/train/', #自己的数据路径
help='path to train dataset')
parser.add_argument('--test_path', type=str,
default='./dataset/build/val/', #自己的数据路径
help='path to testing Kvasir dataset')
parser.add_argument('--train_save', type=str,
default='./model_pth/'+model_name+'/')
opt = parser.parse_args()
logging.basicConfig(filename='train_log.log',
format='[%(asctime)s-%(filename)s-%(levelname)s:%(message)s]',
level=logging.INFO, filemode='a', datefmt='%Y-%m-%d %I:%M:%S %p')
# ---- build models ----
# torch.cuda.set_device(0) # set your gpu device
model = PolypPVT().cuda()
best = 0
params = model.parameters()
if opt.optimizer == 'AdamW':
optimizer = torch.optim.AdamW(params, opt.lr, weight_decay=1e-4)
else:
optimizer = torch.optim.SGD(params, opt.lr, weight_decay=1e-4, momentum=0.9)
print(optimizer)
image_root = '{}/images/'.format(opt.train_path)
gt_root = '{}/masks/'.format(opt.train_path)
train_loader = get_loader(image_root, gt_root, batchsize=opt.batchsize, trainsize=opt.trainsize,
augmentations=opt.augmentations)
total_step = len(train_loader)
print("#" * 20, "Start Training", "#" * 20)
for epoch in range(1, opt.epoch):
adjust_lr(optimizer, opt.lr, epoch, 0.1, 200)
train(train_loader, model, optimizer, epoch, opt.test_path, data_lists)
# plot the eval.png in the training stage
# plot_train(dict_plot, name)
另外,utils/dataloader.py脚本里的数据加载函数下面红框改成和我一致,不然就算你打开了数据增强也不起作用
改完上面以后,命令行运行下面命令就开始训练了。
python -W ignore Train.py
3.测试
测试其实没什么好说的,把训练好的模型路径给一下,数据路径给一下就行了,命令行运行
python -W ignore Test.py
?下面是我用的
import torch
import torch.nn.functional as F
import numpy as np
import os, argparse
# from scipy import misc
from lib.pvt import PolypPVT
from utils.dataloader import test_dataset
import cv2
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--testsize', type=int, default=448, help='testing size')
parser.add_argument('--pth_path', type=str, default='./model_pth/gf_build/PolypPVT.pth')
opt = parser.parse_args()
model = PolypPVT()
model.load_state_dict(torch.load(opt.pth_path))
model.cuda()
model.eval()
for _data_name in ['GF2', 'GG', 'test']:
##### put data_path here #####
data_path = './dataset/build/val/{}'.format(_data_name)
##### save_path #####
save_path = './result_map/PolypPVT/{}/'.format(_data_name)
if not os.path.exists(save_path):
os.makedirs(save_path)
image_root = '{}/images/'.format(data_path)
gt_root = '{}/masks/'.format(data_path)
num1 = len(os.listdir(gt_root))
test_loader = test_dataset(image_root, gt_root, 352)
for i in range(num1):
image, gt, name = test_loader.load_data()
gt = np.asarray(gt, np.float32)
gt /= (gt.max() + 1e-8)
image = image.cuda()
P1,P2 = model(image)
res = F.upsample(P1+P2, size=gt.shape, mode='bilinear', align_corners=False)
res = res.sigmoid().data.cpu().numpy().squeeze()
res = (res - res.min()) / (res.max() - res.min() + 1e-8)
cv2.imwrite(save_path+name, res*255)
print(_data_name, 'Finish!')
4.测试结果?
我这里跑了80epoch,但是其实在50以后loss就不下降了,后面训练也没什么大的意义?,模型总的来说还是不好出效果的,可能用于遥感还要调整很多吧。
?
|