在DETR源码笔记(一)中,阅读到了main()函数的第二部分构建模型部分,接下来是搭建transformer部分。
搭建Transformer
在此之前,建议先看一下transformer理解_在努力的松鼠的博客-CSDN博客。在接着看
def build(args):
num_classes = 2 if args.dataset_file != 'coco' else 2
if args.dataset_file == "coco_panoptic":
num_classes = 2
device = torch.device(args.device)
#搭建主干网络
backbone = build_backbone(args)
#搭建transformer
transformer = build_transformer(args)
model = DETR(
backbone,
transformer,
num_classes=num_classes,
num_queries=args.num_queries,
aux_loss=args.aux_loss,
)
看build_transformer(args):
def build_transformer(args):
return Transformer(
d_model=args.hidden_dim,
dropout=args.dropout,
nhead=args.nheads,
dim_feedforward=args.dim_feedforward,
num_encoder_layers=args.enc_layers,
num_decoder_layers=args.dec_layers,
normalize_before=args.pre_norm,
return_intermediate_dec=True,
)
实质调用的Transformer(),d_model:? transformer输入通道数, nhead: 多头注意力头数, num_encoder_layer: encoder层数,num_decoder_layer: decoder层数, dim_feedforward:前馈网络层输入通道数。
class Transformer(nn.Module):
def __init__(self, d_model=512, nhead=8, num_encoder_layers=6,
num_decoder_layers=6, dim_feedforward=2048, dropout=0.1,
activation="relu", normalize_before=False,
return_intermediate_dec=False):
super().__init__()
#d_model transformer模型输入的维数
#编码器一般由多个层组成,层数由num_encoder_layers设置,其中每一层通过TransformerEncoderLayer()设置
#dim_feedforward 前馈网络层的维数
#构建encoder其中一层
encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward,
dropout, activation, normalize_before)
#normalize_before:在多头注意力后和FFN前还是后进行归一化
#在之前做那么最后layer还需要在FFN输出后接一个归一化
encoder_norm = nn.LayerNorm(d_model) if normalize_before else None
#构建decoder num_encoder_layers:decoder层数
self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)
#解码器 num_decoder_layers:解码器层数
decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward,
dropout, activation, normalize_before)
decoder_norm = nn.LayerNorm(d_model)
#构建解码器
self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm,
return_intermediate=return_intermediate_dec)
#初始化模型参数
self._reset_parameters()
self.d_model = d_model
self.nhead = nhead
#重置初始化参数操作
def _reset_parameters(self):
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
def forward(self, src, mask, query_embed, pos_embed):
# src: transformer输入,mask:图像掩码, query_embed:decoder预测输入embed, pos_embed:位置编码
# flatten NxCxHxW to HWxNxC
bs, c, h, w = src.shape
#获取encoder输入
src = src.flatten(2).permute(2, 0, 1)
#获取位置编码
pos_embed = pos_embed.flatten(2).permute(2, 0, 1)
query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1)
#获取输入掩码
mask = mask.flatten(1)
# torch.zeros_like:生成和括号内变量维度维度一致的全是零的内容。
# tgt初始化,意义为初始化需要预测的目标。因为一开始不清楚需要什么样的目标,所以初始化为0,它会在decoder中
# 不断被refine,但真正在学习的是query embedding,学习到的是整个数据集中目标物体的统计特征。而tgt在每一个epoch都会初始化。
#tgt 也可以理解为上一层解码器的解码输出 shape=(100,N,256) 第一层的tgt=torch.zeros_like(query_embed) 为零矩阵,
# query_pos 是可学习输出位置向量, 个人理解 解码器中的这个参数 全局共享 提供全局注意力 query_pos=(100,N,256)
tgt = torch.zeros_like(query_embed)
#获取encoder输出
memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed)
#获取decoder输出,return_intermediate_dec为true时,得到decoder每一层的输出
hs = self.decoder(tgt, memory, memory_key_padding_mask=mask,
pos=pos_embed, query_pos=query_embed)
return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)
Encoder?
?首先用TransformerEncoderLayer()forward建立encoder中的其中一层,因为每层都是相同的,后面直接复制就行。根据归一化的前后顺序不同有两种构建方式:
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
activation="relu", normalize_before=False):
super().__init__()
#多头自注意力层,其中包含了embeding,在第一层的多头注意力部分的输入是前面backbone和位置编码器的输出,而在后面其他层,输入则是前一层的输出
#d_model: transformer的特征数,输入的channel数
#nhead 理解为多头注意力的头数,一般为8
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
# Implementation of Feedforward model: FFN
#nn.Linear(in_features, out_features)全连接层 out_features也是该全连接层的神经元个数
self.linear1 = nn.Linear(d_model, dim_feedforward)#输出维度为dim_feedforward
#nn.Dropout(dropout) 在训练的过程中,让神经元以dropout设置的概率随机失去活性,即将神经元的参数变为0,一般用在全连接层,通常是为了防止或减轻过拟合
#注意nn.Dropout(dropout) 会让没有置为0的参数以1/(1-dropout)的scale缩放
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)#再接一个全连接层将输出维度变为d_model
#在transformer中,归一化一般使用层归一化nn.LayerNorm(normalized_shape, eps=1e-05, elementwise_affine=True, device=None, dtype=None)
# normalized_shape:归一化的维度 eps:加在方差上的数字,避免分母为0
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
#激活函数 由relu ,gelu ,glu3种,默认使用relu
self.activation = _get_activation_fn(activation)
#在多头注意力后和FFN前还是后进行归一化
self.normalize_before = normalize_before
#加上位置编码
def with_pos_embed(self, tensor, pos: Optional[Tensor]):
return tensor if pos is None else tensor + pos
#在多头注意力后和FFN后进行归一化
def forward_post(self,
src,
src_mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
# 对query和key加上位置编码,value不需要
q = k = self.with_pos_embed(src, pos)
# 自注意力层
src2 = self.self_attn(q, k, value=src, attn_mask=src_mask,
key_padding_mask=src_key_padding_mask)[0]
src = src + self.dropout1(src2)
# 自注意力层后归一化
src = self.norm1(src)
# FFN
src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
src = src + self.dropout2(src2)
# FFN后归一化
src = self.norm2(src)
return src
# 在多头注意力后和FFN前进行归一化
def forward_pre(self, src,
src_mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
#自注意力层前归一化
src2 = self.norm1(src)
#加上位置编码
q = k = self.with_pos_embed(src2, pos)
#自注意力层
src2 = self.self_attn(q, k, value=src2, attn_mask=src_mask,
key_padding_mask=src_key_padding_mask)[0]
src = src + self.dropout1(src2)
#FFN前归一化
src2 = self.norm2(src)
#FFN
src2 = self.linear2(self.dropout(self.activation(self.linear1(src2))))
src = src + self.dropout2(src2)
return src
def forward(self, src,
src_mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
# 判断在多头注意力后和FFN前还是后进行归一化
if self.normalize_before:
return self.forward_pre(src, src_mask, src_key_padding_mask, pos)
return self.forward_post(src, src_mask, src_key_padding_mask, pos)
根据前面建立的encoder层来搭建encoder。
self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)
class TransformerEncoder(nn.Module):
def __init__(self, encoder_layer, num_layers, norm=None):
super().__init__()
#复制encoder_layer直到有num_layers层 encoder_layer
#因为encoder每层都是相同的,复制就行了
self.layers = _get_clones(encoder_layer, num_layers)
self.num_layers = num_layers
self.norm = norm
def forward(self, src,
mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
output = src
#获得encoder输出
for layer in self.layers:
output = layer(output, src_mask=mask,
src_key_padding_mask=src_key_padding_mask, pos=pos)
if self.norm is not None:
output = self.norm(output)
return output
Decoder?
回到Transformer(),接着decoder就是和encoder类似的操作了
#解码器 num_decoder_layers:解码器层数
decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward,
dropout, activation, normalize_before)
decoder_norm = nn.LayerNorm(d_model)
self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm,
return_intermediate=return_intermediate_dec)
但需要注意的是,decoder会多一个输入是query embedding,?query_pos 是可学习输出位置向量, 解码器中的这个参数全局共享,提供全局注意力
我理解它为我们预测的输出,一开始预测就初始化为0, 需要在transformer decoder中不断refine它,具体的了解推荐一个源码解析目标检测的跨界之星DETR(四)、Detection with Transformer - 简书
DETR搭建
Transformer搭建完后又回到最上面的build(args)函数,紧接着是搭建DETR模型,将Backbone和transformer搭在一起。
model = DETR(
backbone,
transformer,
num_classes=num_classes,
num_queries=args.num_queries,
aux_loss=args.aux_loss,
)
#masks: 是否有分割任务,暂时不考虑
if args.masks:
model = DETRsegm(model)
class DETR(nn.Module):
""" This is the DETR module that performs object detection """
def __init__(self, backbone, transformer, num_classes, num_queries, aux_loss=False):
""" Initializes the model.
Parameters:
backbone: torch module of the backbone to be used. See backbone.py
transformer: torch module of the transformer architecture. See transformer.py
num_classes: number of object classes
num_queries: number of object queries, ie detection slot. This is the maximal number of objects
DETR can detect in a single image. For COCO, we recommend 100 queries.
aux_loss: True if auxiliary decoding losses (loss at each decoder layer) are to be used.
"""
super().__init__()
self.num_queries = num_queries
self.transformer = transformer
hidden_dim = transformer.d_model#transformer输出channel
#decoder后再接一个全连接,输出分类结果
self.class_embed = nn.Linear(hidden_dim, num_classes + 1)
#利用MLP对框进行回归
self.bbox_embed = MLP(hidden_dim, hidden_dim, 4, 3)
#decoder预测输入,每帧预测num_queries个目标
self.query_embed = nn.Embedding(num_queries, hidden_dim)
#transformer输入前处理,backbone得到的是num_channels(2048)维度的输出,需要1*1的卷积降维到hidden_dim
self.input_proj = nn.Conv2d(backbone.num_channels, hidden_dim, kernel_size=1)
self.backbone = backbone
self.aux_loss = aux_loss
def forward(self, samples: NestedTensor):
"""?The forward expects a NestedTensor, which consists of:
- samples.tensor: batched images, of shape [batch_size x 3 x H x W]
- samples.mask: a binary mask of shape [batch_size x H x W], containing 1 on padded pixels
It returns a dict with the following elements:
- "pred_logits": the classification logits (including no-object) for all queries.
Shape= [batch_size x num_queries x (num_classes + 1)]
- "pred_boxes": The normalized boxes coordinates for all queries, represented as
(center_x, center_y, height, width). These values are normalized in [0, 1],
relative to the size of each individual image (disregarding possible padding).
See PostProcess for information on how to retrieve the unnormalized bounding box.
- "aux_outputs": Optional, only returned when auxilary losses are activated. It is a list of
dictionnaries containing the two above keys for each decoder layer.
"""
if not isinstance(samples, NestedTensor):
samples = NestedTensor.from_tensor_list(samples)
#backbone得到输出特征图和位置编码
features, pos = self.backbone(samples)
#将特征图与掩码分开
src, mask = features[-1].decompose()
#得到transformer输出
hs = self.transformer(self.input_proj(src), mask, self.query_embed.weight, pos[-1])[0]
#得到分类结果
outputs_class = self.class_embed(hs)
#得到目标框位置
outputs_coord = self.bbox_embed(hs).sigmoid()
out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]}
#得到loss
if self.aux_loss:
out['aux_outputs'] = [{'pred_logits': a, 'pred_boxes': b}
for a, b in zip(outputs_class[:-1], outputs_coord[:-1])]
return out
?LOSS计算和GT匈牙利匹配
DETR整个模型搭建完成后,回到build()中,因为它的预测结果是无序的,是以集合的形式输出,需要准备模型的预测结果与GT的匹配函数,来判断GT是否被检测分类成功,以进行loss计算。匹配函数使用的是匈牙利匹配,一个二分图的最大匹配算法,可以尝试用一下进化版的KM匹配算法试一试。
趣写算法系列之--匈牙利算法_Dark_Scope的博客-CSDN博客_匈牙利算法
这篇博客讲的挺有趣的?
#将预测结果与GT进行匹配的匈牙利算法
matcher = build_matcher(args)
#loss主要由分类loss,box回归loss,和giou loss组成
#各个loss的权重
weight_dict = {'loss_ce': 1, 'loss_bbox': args.bbox_loss_coef} #5
weight_dict['loss_giou'] = args.giou_loss_coef #2
if args.masks:
weight_dict["loss_mask"] = args.mask_loss_coef
weight_dict["loss_dice"] = args.dice_loss_coef
# TODO this is a hack
#当设置了aux_loss,代表需要计算解码器中间层预测结果对应的loss,则也要设置对应的权重
if args.aux_loss:
aux_weight_dict = {}
for i in range(args.dec_layers - 1):
aux_weight_dict.update({k + f'_{i}': v for k, v in weight_dict.items()})
weight_dict.update(aux_weight_dict)
losses = ['labels', 'boxes', 'cardinality']
if args.masks:
losses += ["masks"]
# SetCriterion()构建loss函数
criterion = SetCriterion(num_classes, matcher=matcher, weight_dict=weight_dict,
eos_coef=args.eos_coef, losses=losses)
criterion.to(device)
?SetCriterion()计算各个loss后返回,最后只对分类损失部分的源码贴了注释,其他的也是类似操作,可以自己看一下。
class SetCriterion(nn.Module):
""" This class computes the loss for DETR.
The process happens in two steps:
1) we compute hungarian assignment between ground truth boxes and the outputs of the model
2) we supervise each pair of matched ground-truth / prediction (supervise class and box)
"""
def __init__(self, num_classes, matcher, weight_dict, eos_coef, losses):
""" Create the criterion.
Parameters:
num_classes: number of object categories, omitting the special no-object category
matcher: module able to compute a matching between targets and proposals
weight_dict: dict containing as key the names of the losses and as values their relative weight.
eos_coef: relative classification weight applied to the no-object category
losses: list of all the losses to be applied. See get_loss for list of available losses.
"""
super().__init__()
self.num_classes = num_classes #种类数
self.matcher = matcher #匹配函数,选择了匈牙利匹配
self.weight_dict = weight_dict #各个loss的权重
self.eos_coef = eos_coef #针对背景分类的loss权重
self.losses = losses #loss结果字典
#设置目标的分类的权重为1,背景的权重为eos_coef
empty_weight = torch.ones(self.num_classes + 1)
empty_weight[-1] = self.eos_coef
#empty_weight存入buffer
self.register_buffer('empty_weight', empty_weight)
def forward(self, outputs, targets):
""" This performs the loss computation.
Parameters:
outputs: dict of tensors, see the output specification of the model for the format
targets: list of dicts, such that len(targets) == batch_size.
The expected keys in each dict depends on the losses applied, see each loss' doc
"""
# targets:GT列表
# [{‘boxes’:...,'labels':...,...},{...},...]
# outouts: detr模型输出
# {‘pred_logits’: (b,num_queries, num_classes),
# 'pred_boxes': (b,num_queries,4),
# 'aux_outputs': [{‘pred_logits’:...,'pred_boxes': ..},{...},...]}
# 只取出decoder最后一层的结果
outputs_without_aux = {k: v for k, v in outputs.items() if k != 'aux_outputs'}
# Retrieve the matching between the outputs of the last layer and the targets
#进行匈牙利匹配得到(单张图片中所有query对应的索引,GT索引)的元组列表indices
#ex:[([58,75,92],[1,0,2]),(...),(...),(...)]
indices = self.matcher(outputs_without_aux, targets)
# Compute the average number of target boxes accross all nodes, for normalization purposes
#计算一个batch图像中目标的数量
num_boxes = sum(len(t["labels"]) for t in targets)
num_boxes = torch.as_tensor([num_boxes], dtype=torch.float, device=next(iter(outputs.values())).device)
#分布式训练时使用
if is_dist_avail_and_initialized():
torch.distributed.all_reduce(num_boxes)
# clamp(input,min,max,out)函数的功能将输入input张量每个元素的值压缩到区间[min, max],并返回结果到一个新张量。
num_boxes = torch.clamp(num_boxes / get_world_size(), min=1).item()
# Compute all the requested losses
#通过get_loss()计算各个loss
losses = {}
for loss in self.losses:
losses.update(self.get_loss(loss, outputs, targets, indices, num_boxes))
# In case of auxiliary losses, we repeat this process with the output of each intermediate layer.
# 是否计算中间层多层loss
if 'aux_outputs' in outputs:
for i, aux_outputs in enumerate(outputs['aux_outputs']):
indices = self.matcher(aux_outputs, targets)
for loss in self.losses:
if loss == 'masks':
# Intermediate masks losses are too costly to compute, we ignore them.
continue
kwargs = {}
if loss == 'labels':
# Logging is enabled only for the last layer
kwargs = {'log': False}
l_dict = self.get_loss(loss, aux_outputs, targets, indices, num_boxes, **kwargs)
l_dict = {k + f'_{i}': v for k, v in l_dict.items()}
losses.update(l_dict)
return losses
分类LOSS计算?
#分类loss, 用的交叉熵
def loss_labels(self, outputs, targets, indices, num_boxes, log=True):
"""Classification loss (NLL)
targets dicts must contain the key "labels" containing a tensor of dim [nb_target_boxes]
"""
assert 'pred_logits' in outputs
#取出detr输出这的分类结果,key为pred_logits, size为 4*100*6
src_logits = outputs['pred_logits']
#获得batch索引和单帧目标索引 (tensor([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3]), tensor([58, 75, 92, 75, 92, 58, 75, 92, 58, 75, 92])) 2*11
idx = self._get_src_permutation_idx(indices)
#获取对应每个目标的label按indices顺序拼接 tensor([2, 2, 1, 2, 1, 2, 2, 1, 0, 0, 1], device='cuda:0') ,11
target_classes_o = torch.cat([t["labels"][J] for t, (_, J) in zip(targets, indices)])
# 4*100 初始化为背景
target_classes = torch.full(src_logits.shape[:2], self.num_classes,
dtype=torch.int64, device=src_logits.device)
target_classes[idx] = target_classes_o
#交叉熵
loss_ce = F.cross_entropy(src_logits.transpose(1, 2), target_classes, self.empty_weight)
losses = {'loss_ce': loss_ce}
if log:
# TODO this should probably be a separate loss, not hacked in this one here
losses['class_error'] = 100 - accuracy(src_logits[idx], target_classes_o)[0]
return losses
def _get_src_permutation_idx(self, indices):
# permute predictions following indices
# torch.full_like(input, value),就是将input的形状作为返回结果tensor的形状,值全为value。
#获取对应的目标在batch中的索引tensor([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3])
batch_idx = torch.cat([torch.full_like(src, i) for i, (src, _) in enumerate(indices)])
#目标在单张图像所有目标中的索引 tensor([58, 75, 92, 75, 92, 58, 75, 92, 58, 75, 92])
src_idx = torch.cat([src for (src, _) in indices])
return batch_idx, src_idx
|