《Unsupervised Feature Learning via Non-parametric Instance Discrimination》
2018年的cvpr论文,之后的许多对比学习的工作都有这篇文章的影子。
Introduction
作者通过观察监督学习的实验结果发现,视觉分类任务中,在softmax计算目标属于某一类的概率时,得分排在第二位的与得分排在第一位的类别在视觉上也会更接近,这种现象暗示出,一个好的学习方法能够认为相近的类别有大的相似度,如美洲豹与美洲虎;而不怎么有相似特征的类别,则相似度比较小,如美洲豹与书包。 进而,考虑到数据集中的每个样本实际上都是不一样的,在极端情况下,将每个样本视为一个类别,那么一个优秀的模型也能够按照对类别的相似度建模的方式来对不同样本之间的相似度建模,而如果真的按照样本个数来当做类别数目的话,对于ImageNet这种上百万张图片的数据集,计算softmax会非常的耗时,因此作者借鉴了noise-contrastive estimation (NCE) 中的方法,近似的计算softmax,所以其实本质上,对比学习的损失函数就是想计算softmax,好多博客里面说对比学习的损失函数是在计算softmax,但是没有说为什么要这样设计。
Method
parametric classifier VS non-parametric classifier
假设有
n
n
n张图片
x
1
,
x
2
,
.
.
.
,
x
n
x_1,x_2,...,x_n
x1?,x2?,...,xn?,对它们进行特征提取之后的表示为
v
1
,
v
2
,
.
.
.
,
v
n
v_1,v_2,...,v_n
v1?,v2?,...,vn?,softmax计算已知某个特征
v
v
v属于第
i
i
i类的条件概率为:
P
(
i
∣
v
)
=
exp
?
(
w
i
T
v
)
∑
j
=
1
n
exp
?
(
w
j
T
v
)
P(i|v)=\frac{\exp (w_i^Tv)}{\sum_{j=1}^n\exp(w_j^Tv)}
P(i∣v)=∑j=1n?exp(wjT?v)exp(wiT?v)? 其中,
w
j
T
v
w_j^Tv
wjT?v反映了第
j
j
j类与向量
v
v
v的匹配程度,如果再进一步考虑,
w
j
T
w_j^T
wjT?反应
j
j
j类的信息,如果
w
j
T
v
w_j^Tv
wjT?v的乘积越大,说明
w
j
w_j
wj?与
v
v
v的相似度越高,通过这种计算方式可以在空间上拉大不同类之间的距离,那么是否可以有一种不需要参数的表示形式,也能达到相近的效果呢?也就是说我们可不可以找到网络权重
w
j
T
w_j^T
wjT?的替代呢? 既然
w
j
T
w_j^T
wjT?反应
j
j
j类的信息,那么有什么非参数的信息也同样可以表示第
i
i
i类?所以作者用另一个变量
v
j
v_j
vj?来替代权重,
v
j
v_j
vj?表示第
j
j
j个或第
j
j
j类样本经过特征提取之后的向量,可以认为它跟
w
j
T
w_j^T
wjT?有相似的作用,这样做不仅减少了网络的训练参数量,已训练好的特征在下游任务上也会有更好的泛化能力,上式变为:
P
(
i
∣
v
)
=
exp
?
(
v
i
T
v
/
τ
)
∑
j
=
1
n
exp
?
(
v
j
T
v
/
τ
)
P(i|v)=\frac{\exp (v_i^Tv/\tau)}{\sum_{j=1}^n\exp(v_j^Tv/\tau)}
P(i∣v)=∑j=1n?exp(vjT?v/τ)exp(viT?v/τ)? 其中
τ
\tau
τ用来控制分布的离散程度,它的作用不是我们探究的重点,不做进一步解释。根据监督学习中对于分类问题的损失函数计算:
J
(
θ
)
=
?
∑
i
=
1
n
log
?
P
(
i
∣
f
θ
(
x
i
)
)
J(\theta)=-\sum_{i=1}^n\log P(i|f_{\theta}(x_i))
J(θ)=?i=1∑n?logP(i∣fθ?(xi?))
如何选取不同的
v
j
v_j
vj??
对于整个数据集,初始化一个memory bank中存放每个样本经过特征提取之后的表示
V
=
v
j
V={v_j}
V=vj?,在训练过程中,随着网络逐渐收敛,不停地更新memory bank中的特征表示,这一部分直接看程序吧,语言表达能力有限 (?■_■)
上面的过程存在什么问题?
正如我们在introduction里面提到的,在计算
P
(
i
∣
v
)
P(i|v)
P(i∣v)的时候,分母上需要计算整个数据集中的每一个表征与当前样本表征的乘积,在计算上很耗时,所以作者借鉴了noise-contrastive estimation (NCE) 和negative sampling 来近似计算原始的softmax。 损失函数:
J
N
C
E
(
θ
)
=
?
E
P
d
[
log
?
h
(
i
,
v
)
]
?
m
E
P
n
[
log
?
(
1
?
h
(
i
,
v
′
)
)
]
J_{NCE}(\theta)=-E_{P_d}[\log h(i,v)]-mE_{P_n}[\log (1-h(i,v'))]
JNCE?(θ)=?EPd??[logh(i,v)]?mEPn??[log(1?h(i,v′))] 其中
h
(
i
,
v
)
:
=
P
(
D
=
1
∣
i
,
v
)
=
P
(
i
∣
v
)
P
(
i
∣
v
)
+
m
P
n
(
i
)
h(i,v):=P(D=1|i,v)=\frac{P(i|v)}{P(i|v)+mP_n(i)}
h(i,v):=P(D=1∣i,v)=P(i∣v)+mPn?(i)P(i∣v)?
P
(
i
∣
v
)
=
exp
?
(
v
T
f
i
/
τ
)
Z
i
P(i|v)=\frac{\exp (v^Tf_i/\tau)}{Z_i}
P(i∣v)=Zi?exp(vTfi?/τ)?
Z
i
=
∑
j
=
1
n
exp
?
(
v
j
T
f
j
/
τ
)
Z_i=\sum_{j=1}^n\exp(v_j^Tf_j/\tau)
Zi?=j=1∑n?exp(vjT?fj?/τ) 其中
f
i
f_i
fi?表示当前样本的特征,
v
j
v_j
vj?表示从memory bank中随机选取的特征,
n
n
n为均匀分布的噪声
P
n
=
1
/
n
P_n=1/n
Pn?=1/n,在实际情况下,不选取整个数据库中的样本做负样本,而是用蒙特卡洛来近似:
Z
i
=
n
E
j
[
exp
?
(
v
j
T
f
i
/
τ
)
]
=
n
m
∑
k
=
1
m
exp
?
(
v
j
k
T
f
i
/
τ
)
Z_i=nE_j[\exp(v_j^Tf_i/\tau)]=\frac{n}{m}\sum_{k=1}^m\exp(v_{jk}^Tf_i/\tau)
Zi?=nEj?[exp(vjT?fi?/τ)]=mn?k=1∑m?exp(vjkT?fi?/τ)
如何用训好的特征测试分类器?
假设有一张待测试的图像
x
^
\hat x
x^,计算其特征
f
^
=
f
θ
(
x
^
)
\hat f=f_{\theta}(\hat x)
f^?=fθ?(x^),计算其与memory bank里面所有向量的相似度
s
i
s_i
si?,选择前
k
k
k个相似度最高的样本
N
k
\mathcal{N}_k
Nk?,计算这
k
k
k个样本在每一个类别上的权重
w
c
=
∑
i
∈
N
k
α
i
?
1
(
c
i
=
c
)
w_c=\sum_{i \in \mathcal{N}_k}\alpha_i * 1(c_i=c)
wc?=∑i∈Nk??αi??1(ci?=c),其中
α
i
=
exp
?
(
s
i
/
τ
)
\alpha_i=\exp(s_i/\tau)
αi?=exp(si?/τ),最后权重最高的那个类别为该测试图像的预测类别。
代码
官方的版本写的很啰嗦,这篇文章重点在损失函数的计算过程上,原版包括很多的实验验证,打开一个project的时候一上来找不到重点很难受,并且python和pytorch的维护环境做的不太友好,不同版本之间的pytorch在类上兼容性很差,近几年的深度学习都开始在分布式上面跑程序了,然而实验室还是每人一张卡,每次看到torch.distributed都自动略过 ⊙﹏⊙∥
预训练模型采用resnet-18,跟前几篇套路一样。重点看训练过程(包含损失函数的计算)和测试过程。
import argparse
import pandas as pd
import torch
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.data import DataLoader
from tqdm import tqdm
import utils
from model import Model
'''
好像比官方版写的更像人话的样子
'''
def train(net, data_loader, train_optimizer):
global z
net.train()
total_loss, total_num, train_bar = 0.0, 0, tqdm(data_loader)
for data, target, pos_index in train_bar:
data = data.to('cuda')
train_optimizer.zero_grad()
features = net(data)
idx = torch.randint(high=n, size=(data.size(0), m + 1))
idx[:, 0] = pos_index
samples = torch.index_select(memory_bank, dim=0, index=idx.view(-1)).view(data.size(0), -1, feature_dim)
sim_matrix = torch.bmm(samples.to(device=features.device), features.unsqueeze(dim=-1)).view(data.size(0), -1)
out = torch.exp(sim_matrix / temperature)
if z is None:
z = torch.mean(out, dim=1) * n
z = z.view(z.shape[0], -1).detach()
output = out / z
p_d = (output.select(dim=-1, index=0) / (output.select(dim=-1, index=0) + m / n)).log()
p_n = ((m / n) / (output.narrow(dim=-1, start=1, length=m) + m / n)).log()
loss = - (p_d.sum() + p_n.sum()) / data.size(0)
loss.backward(retain_graph=True)
train_optimizer.step()
pos_samples = samples.select(dim=1, index=0)
pos_samples = features.detach().cpu() * momentum + pos_samples * (1.0 - momentum)
pos_samples = F.normalize(pos_samples, dim=-1)
memory_bank.index_copy_(dim=0, index=pos_index, source=pos_samples)
total_num += data.size(0)
total_loss += loss.item() * data.size(0)
train_bar.set_description('Train Epoch: [{}/{}] Loss: {:.4f}'.format(epoch, epochs, total_loss / total_num))
return total_loss / total_num
def test(net, memory_data_loader, test_data_loader):
net.eval()
total_top1, total_top5, total_num, feature_bank = 0.0, 0.0, 0, []
with torch.no_grad():
for data, target, _ in tqdm(memory_data_loader, desc='Feature extracting'):
feature_bank.append(net(data.to('cuda')))
feature_bank = torch.cat(feature_bank).t().contiguous()
feature_labels = torch.tensor(memory_data_loader.dataset.targets, device=feature_bank.device)
test_bar = tqdm(test_data_loader)
for data, target, _ in test_bar:
data, target = data.to('cuda'), target.to('cuda')
output = net(data)
total_num += data.size(0)
sim_matrix = torch.mm(output, feature_bank)
sim_weight, sim_indices = sim_matrix.topk(k=k, dim=-1)
sim_labels = torch.gather(feature_labels.expand(data.size(0), -1), dim=-1, index=sim_indices)
sim_weight = (sim_weight / temperature).exp()
one_hot_label = torch.zeros(data.size(0) * k, c, device=sim_labels.device)
one_hot_label = one_hot_label.scatter(dim=-1, index=sim_labels.view(-1, 1), value=1.0)
pred_scores = torch.sum(one_hot_label.view(data.size(0), -1, c) * sim_weight.unsqueeze(dim=-1), dim=1)
pred_labels = pred_scores.argsort(dim=-1, descending=True)
total_top1 += torch.sum((pred_labels[:, :1] == target.unsqueeze(dim=-1)).any(dim=-1).float()).item()
total_top5 += torch.sum((pred_labels[:, :5] == target.unsqueeze(dim=-1)).any(dim=-1).float()).item()
test_bar.set_description('Test Epoch: [{}/{}] Acc@1:{:.2f}% Acc@5:{:.2f}%'
.format(epoch, epochs, total_top1 / total_num * 100, total_top5 / total_num * 100))
return total_top1 / total_num * 100, total_top5 / total_num * 100
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Train NPID')
parser.add_argument('--feature_dim', default=128, type=int, help='Feature dim for each image')
parser.add_argument('--m', default=4096, type=int, help='Negative sample number')
parser.add_argument('--temperature', default=0.1, type=float, help='Temperature used in softmax')
parser.add_argument('--momentum', default=0.5, type=float, help='Momentum used for the update of memory bank')
parser.add_argument('--k', default=200, type=int, help='Top k most similar images used to predict the label')
parser.add_argument('--batch_size', default=64, type=int, help='Number of images in each mini-batch')
parser.add_argument('--epochs', default=200, type=int, help='Number of sweeps over the dataset to train')
args = parser.parse_args()
feature_dim, m, temperature, momentum = args.feature_dim, args.m, args.temperature, args.momentum
k, batch_size, epochs = args.k, args.batch_size, args.epochs
train_data = utils.CIFAR10Instance(root='../data', train=True, transform=utils.train_transform, download=True)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
memory_data = utils.CIFAR10Instance(root='../data', train=True, transform=utils.test_transform, download=True)
memory_loader = DataLoader(memory_data, batch_size=batch_size, shuffle=False, drop_last=True)
test_data = utils.CIFAR10Instance(root='../data', train=False, transform=utils.test_transform, download=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=True)
model = Model(feature_dim).to('cuda')
optimizer = optim.SGD(model.parameters(), lr=0.03, momentum=0.9, weight_decay=5e-4)
print("# trainable model parameters:", sum(param.numel() if param.requires_grad else 0
for param in model.parameters()))
lr_scheduler = MultiStepLR(optimizer, milestones=[int(epochs * 0.6), int(epochs * 0.8)], gamma=0.1)
z, c, n = None, len(memory_data.classes), len(train_data)
memory_bank = F.normalize(torch.randn(n, feature_dim), dim=-1)
results = {'train_loss': [], 'test_acc@1': [], 'test_acc@5': []}
best_acc = 0.0
for epoch in range(1, epochs + 1):
train_loss = train(model, train_loader, optimizer)
results['train_loss'].append(train_loss)
test_acc_1, test_acc_5 = test(model, memory_loader, test_loader)
results['test_acc@1'].append(test_acc_1)
results['test_acc@5'].append(test_acc_5)
data_frame = pd.DataFrame(data=results, index=range(1, epoch + 1))
data_frame.to_csv('results/{}_results.csv'.format(feature_dim), index_label='epoch')
lr_scheduler.step(epoch)
if test_acc_1 > best_acc:
best_acc = test_acc_1
torch.save(model.state_dict(), 'epochs/{}_model.pth'.format(feature_dim))
数据库:
import torchvision.datasets as datasets
from PIL import Image
from torchvision import transforms
class CIFAR10Instance(datasets.CIFAR10):
"""CIFAR10Instance Dataset.
"""
def __getitem__(self, index):
img, target = self.data[index], self.targets[index]
img = Image.fromarray(img)
if self.transform is not None:
img = self.transform(img)
if self.target_transform is not None:
target = self.target_transform(target)
return img, target, index
train_transform = transforms.Compose([
transforms.RandomResizedCrop(32, scale=(0.2, 1.0)),
transforms.RandomApply([transforms.ColorJitter(0.4, 0.4, 0.4, 0.4)], p=0.8),
transforms.RandomGrayscale(p=0.2),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))])
test_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))])
|