IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 人工智能 -> DETR源码笔记(二) -> 正文阅读

[人工智能]DETR源码笔记(二)

DETR源码笔记(一)中,阅读到了main()函数的第二部分构建模型部分,接下来是搭建transformer部分。

搭建Transformer

在此之前,建议先看一下transformer理解_在努力的松鼠的博客-CSDN博客。在接着看

def build(args):
    num_classes = 2 if args.dataset_file != 'coco' else 2
    if args.dataset_file == "coco_panoptic":
        num_classes = 2
    device = torch.device(args.device)
    #搭建主干网络
    backbone = build_backbone(args)
    #搭建transformer
    transformer = build_transformer(args)

    model = DETR(
        backbone,
        transformer,
        num_classes=num_classes,
        num_queries=args.num_queries,
        aux_loss=args.aux_loss,
    )

看build_transformer(args):

def build_transformer(args):
    return Transformer(
        d_model=args.hidden_dim,
        dropout=args.dropout,
        nhead=args.nheads,
        dim_feedforward=args.dim_feedforward,
        num_encoder_layers=args.enc_layers,
        num_decoder_layers=args.dec_layers,
        normalize_before=args.pre_norm,
        return_intermediate_dec=True,
    )

实质调用的Transformer(),d_model:? transformer输入通道数, nhead: 多头注意力头数, num_encoder_layer: encoder层数,num_decoder_layer: decoder层数, dim_feedforward:前馈网络层输入通道数。

class Transformer(nn.Module):

    def __init__(self, d_model=512, nhead=8, num_encoder_layers=6,
                 num_decoder_layers=6, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False,
                 return_intermediate_dec=False):
        super().__init__()
        #d_model transformer模型输入的维数
        #编码器一般由多个层组成,层数由num_encoder_layers设置,其中每一层通过TransformerEncoderLayer()设置
        #dim_feedforward 前馈网络层的维数
        #构建encoder其中一层
        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward,
                                                dropout, activation, normalize_before)
        #normalize_before:在多头注意力后和FFN前还是后进行归一化
        #在之前做那么最后layer还需要在FFN输出后接一个归一化
        encoder_norm = nn.LayerNorm(d_model) if normalize_before else None
        #构建decoder num_encoder_layers:decoder层数
        self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)
        #解码器 num_decoder_layers:解码器层数
        decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward,
                                                dropout, activation, normalize_before)
        decoder_norm = nn.LayerNorm(d_model)
        #构建解码器
        self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm,
                                          return_intermediate=return_intermediate_dec)
        #初始化模型参数
        self._reset_parameters()

        self.d_model = d_model
        self.nhead = nhead

    #重置初始化参数操作
    def _reset_parameters(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def forward(self, src, mask, query_embed, pos_embed):
        # src: transformer输入,mask:图像掩码, query_embed:decoder预测输入embed, pos_embed:位置编码
        # flatten NxCxHxW to HWxNxC
        bs, c, h, w = src.shape
        #获取encoder输入
        src = src.flatten(2).permute(2, 0, 1)
        #获取位置编码
        pos_embed = pos_embed.flatten(2).permute(2, 0, 1)
        query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1)
        #获取输入掩码
        mask = mask.flatten(1)
        # torch.zeros_like:生成和括号内变量维度维度一致的全是零的内容。
        # tgt初始化,意义为初始化需要预测的目标。因为一开始不清楚需要什么样的目标,所以初始化为0,它会在decoder中
        # 不断被refine,但真正在学习的是query embedding,学习到的是整个数据集中目标物体的统计特征。而tgt在每一个epoch都会初始化。
        #tgt 也可以理解为上一层解码器的解码输出 shape=(100,N,256) 第一层的tgt=torch.zeros_like(query_embed) 为零矩阵,
        # query_pos 是可学习输出位置向量, 个人理解 解码器中的这个参数 全局共享 提供全局注意力 query_pos=(100,N,256)
        tgt = torch.zeros_like(query_embed)
        #获取encoder输出
        memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed)
        #获取decoder输出,return_intermediate_dec为true时,得到decoder每一层的输出
        hs = self.decoder(tgt, memory, memory_key_padding_mask=mask,
                          pos=pos_embed, query_pos=query_embed)
        return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)

Encoder?

?首先用TransformerEncoderLayer()forward建立encoder中的其中一层,因为每层都是相同的,后面直接复制就行。根据归一化的前后顺序不同有两种构建方式:

class TransformerEncoderLayer(nn.Module):

    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False):
        super().__init__()
        #多头自注意力层,其中包含了embeding,在第一层的多头注意力部分的输入是前面backbone和位置编码器的输出,而在后面其他层,输入则是前一层的输出
        #d_model: transformer的特征数,输入的channel数
        #nhead 理解为多头注意力的头数,一般为8
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Implementation of Feedforward model: FFN
        #nn.Linear(in_features, out_features)全连接层 out_features也是该全连接层的神经元个数
        self.linear1 = nn.Linear(d_model, dim_feedforward)#输出维度为dim_feedforward
        #nn.Dropout(dropout) 在训练的过程中,让神经元以dropout设置的概率随机失去活性,即将神经元的参数变为0,一般用在全连接层,通常是为了防止或减轻过拟合
        #注意nn.Dropout(dropout) 会让没有置为0的参数以1/(1-dropout)的scale缩放
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)#再接一个全连接层将输出维度变为d_model
        #在transformer中,归一化一般使用层归一化nn.LayerNorm(normalized_shape, eps=1e-05, elementwise_affine=True, device=None, dtype=None)
        #   normalized_shape:归一化的维度  eps:加在方差上的数字,避免分母为0
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        #激活函数 由relu ,gelu ,glu3种,默认使用relu
        self.activation = _get_activation_fn(activation)
        #在多头注意力后和FFN前还是后进行归一化
        self.normalize_before = normalize_before
    #加上位置编码
    def with_pos_embed(self, tensor, pos: Optional[Tensor]):
        return tensor if pos is None else tensor + pos

    #在多头注意力后和FFN后进行归一化
    def forward_post(self,
                     src,
                     src_mask: Optional[Tensor] = None,
                     src_key_padding_mask: Optional[Tensor] = None,
                     pos: Optional[Tensor] = None):
        # 对query和key加上位置编码,value不需要
        q = k = self.with_pos_embed(src, pos)
        # 自注意力层
        src2 = self.self_attn(q, k, value=src, attn_mask=src_mask,
                              key_padding_mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)
        # 自注意力层后归一化
        src = self.norm1(src)
        # FFN
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)
        # FFN后归一化
        src = self.norm2(src)
        return src

    # 在多头注意力后和FFN前进行归一化
    def forward_pre(self, src,
                    src_mask: Optional[Tensor] = None,
                    src_key_padding_mask: Optional[Tensor] = None,
                    pos: Optional[Tensor] = None):
        #自注意力层前归一化
        src2 = self.norm1(src)
        #加上位置编码
        q = k = self.with_pos_embed(src2, pos)
        #自注意力层
        src2 = self.self_attn(q, k, value=src2, attn_mask=src_mask,
                              key_padding_mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)
        #FFN前归一化
        src2 = self.norm2(src)
        #FFN
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src2))))
        src = src + self.dropout2(src2)
        return src

    def forward(self, src,
                src_mask: Optional[Tensor] = None,
                src_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None):
        # 判断在多头注意力后和FFN前还是后进行归一化
        if self.normalize_before:
            return self.forward_pre(src, src_mask, src_key_padding_mask, pos)
        return self.forward_post(src, src_mask, src_key_padding_mask, pos)

根据前面建立的encoder层来搭建encoder。
self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)
class TransformerEncoder(nn.Module):

    def __init__(self, encoder_layer, num_layers, norm=None):
        super().__init__()
        #复制encoder_layer直到有num_layers层 encoder_layer
        #因为encoder每层都是相同的,复制就行了
        self.layers = _get_clones(encoder_layer, num_layers)
        self.num_layers = num_layers
        self.norm = norm

    def forward(self, src,
                mask: Optional[Tensor] = None,
                src_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None):
        output = src
        #获得encoder输出
        for layer in self.layers:
            output = layer(output, src_mask=mask,
                           src_key_padding_mask=src_key_padding_mask, pos=pos)

        if self.norm is not None:
            output = self.norm(output)

        return output

Decoder?

回到Transformer(),接着decoder就是和encoder类似的操作了

#解码器 num_decoder_layers:解码器层数
decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward,
                                                dropout, activation, normalize_before)
decoder_norm = nn.LayerNorm(d_model)
self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm,
                                          return_intermediate=return_intermediate_dec)

但需要注意的是,decoder会多一个输入是query embedding,?query_pos 是可学习输出位置向量, 解码器中的这个参数全局共享,提供全局注意力

我理解它为我们预测的输出,一开始预测就初始化为0, 需要在transformer decoder中不断refine它,具体的了解推荐一个源码解析目标检测的跨界之星DETR(四)、Detection with Transformer - 简书

DETR搭建

Transformer搭建完后又回到最上面的build(args)函数,紧接着是搭建DETR模型,将Backbone和transformer搭在一起。

model = DETR(
        backbone,
        transformer,
        num_classes=num_classes,
        num_queries=args.num_queries,
        aux_loss=args.aux_loss,
    )
#masks: 是否有分割任务,暂时不考虑
if args.masks:
     model = DETRsegm(model)
class DETR(nn.Module):
    """ This is the DETR module that performs object detection """
    def __init__(self, backbone, transformer, num_classes, num_queries, aux_loss=False):
        """ Initializes the model.
        Parameters:
            backbone: torch module of the backbone to be used. See backbone.py
            transformer: torch module of the transformer architecture. See transformer.py
            num_classes: number of object classes
            num_queries: number of object queries, ie detection slot. This is the maximal number of objects
                         DETR can detect in a single image. For COCO, we recommend 100 queries.
            aux_loss: True if auxiliary decoding losses (loss at each decoder layer) are to be used.
        """
        super().__init__()
        self.num_queries = num_queries
        self.transformer = transformer
        hidden_dim = transformer.d_model#transformer输出channel
        #decoder后再接一个全连接,输出分类结果
        self.class_embed = nn.Linear(hidden_dim, num_classes + 1)
        #利用MLP对框进行回归
        self.bbox_embed = MLP(hidden_dim, hidden_dim, 4, 3)
        #decoder预测输入,每帧预测num_queries个目标
        self.query_embed = nn.Embedding(num_queries, hidden_dim)
        #transformer输入前处理,backbone得到的是num_channels(2048)维度的输出,需要1*1的卷积降维到hidden_dim
        self.input_proj = nn.Conv2d(backbone.num_channels, hidden_dim, kernel_size=1)
        self.backbone = backbone
        self.aux_loss = aux_loss

    def forward(self, samples: NestedTensor):
        """?The forward expects a NestedTensor, which consists of:
               - samples.tensor: batched images, of shape [batch_size x 3 x H x W]
               - samples.mask: a binary mask of shape [batch_size x H x W], containing 1 on padded pixels

            It returns a dict with the following elements:
               - "pred_logits": the classification logits (including no-object) for all queries.
                                Shape= [batch_size x num_queries x (num_classes + 1)]
               - "pred_boxes": The normalized boxes coordinates for all queries, represented as
                               (center_x, center_y, height, width). These values are normalized in [0, 1],
                               relative to the size of each individual image (disregarding possible padding).
                               See PostProcess for information on how to retrieve the unnormalized bounding box.
               - "aux_outputs": Optional, only returned when auxilary losses are activated. It is a list of
                                dictionnaries containing the two above keys for each decoder layer.
        """
        if not isinstance(samples, NestedTensor):
            samples = NestedTensor.from_tensor_list(samples)
        #backbone得到输出特征图和位置编码
        features, pos = self.backbone(samples)
        #将特征图与掩码分开
        src, mask = features[-1].decompose()
        #得到transformer输出
        hs = self.transformer(self.input_proj(src), mask, self.query_embed.weight, pos[-1])[0]
        #得到分类结果
        outputs_class = self.class_embed(hs)
        #得到目标框位置
        outputs_coord = self.bbox_embed(hs).sigmoid()
        out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]}
        #得到loss
        if self.aux_loss:
            out['aux_outputs'] = [{'pred_logits': a, 'pred_boxes': b}
                                  for a, b in zip(outputs_class[:-1], outputs_coord[:-1])]
        return out

?LOSS计算和GT匈牙利匹配

DETR整个模型搭建完成后,回到build()中,因为它的预测结果是无序的,是以集合的形式输出,需要准备模型的预测结果与GT的匹配函数,来判断GT是否被检测分类成功,以进行loss计算。匹配函数使用的是匈牙利匹配,一个二分图的最大匹配算法,可以尝试用一下进化版的KM匹配算法试一试。

趣写算法系列之--匈牙利算法_Dark_Scope的博客-CSDN博客_匈牙利算法

这篇博客讲的挺有趣的?


    #将预测结果与GT进行匹配的匈牙利算法
    matcher = build_matcher(args)
    #loss主要由分类loss,box回归loss,和giou loss组成
    #各个loss的权重
    weight_dict = {'loss_ce': 1, 'loss_bbox': args.bbox_loss_coef} #5
    weight_dict['loss_giou'] = args.giou_loss_coef #2
    if args.masks:
        weight_dict["loss_mask"] = args.mask_loss_coef
        weight_dict["loss_dice"] = args.dice_loss_coef
    # TODO this is a hack
    #当设置了aux_loss,代表需要计算解码器中间层预测结果对应的loss,则也要设置对应的权重
    if args.aux_loss:
        aux_weight_dict = {}
        for i in range(args.dec_layers - 1):
            aux_weight_dict.update({k + f'_{i}': v for k, v in weight_dict.items()})
        weight_dict.update(aux_weight_dict)

    losses = ['labels', 'boxes', 'cardinality']
    if args.masks:
        losses += ["masks"]
    # SetCriterion()构建loss函数
    criterion = SetCriterion(num_classes, matcher=matcher, weight_dict=weight_dict,
                             eos_coef=args.eos_coef, losses=losses)
    criterion.to(device)

?SetCriterion()计算各个loss后返回,最后只对分类损失部分的源码贴了注释,其他的也是类似操作,可以自己看一下。

class SetCriterion(nn.Module):
    """ This class computes the loss for DETR.
    The process happens in two steps:
        1) we compute hungarian assignment between ground truth boxes and the outputs of the model
        2) we supervise each pair of matched ground-truth / prediction (supervise class and box)
    """
    def __init__(self, num_classes, matcher, weight_dict, eos_coef, losses):
        """ Create the criterion.
        Parameters:
            num_classes: number of object categories, omitting the special no-object category
            matcher: module able to compute a matching between targets and proposals
            weight_dict: dict containing as key the names of the losses and as values their relative weight.
            eos_coef: relative classification weight applied to the no-object category
            losses: list of all the losses to be applied. See get_loss for list of available losses.
        """
        super().__init__()
        self.num_classes = num_classes #种类数
        self.matcher = matcher #匹配函数,选择了匈牙利匹配
        self.weight_dict = weight_dict #各个loss的权重
        self.eos_coef = eos_coef #针对背景分类的loss权重
        self.losses = losses #loss结果字典
        #设置目标的分类的权重为1,背景的权重为eos_coef
        empty_weight = torch.ones(self.num_classes + 1)
        empty_weight[-1] = self.eos_coef
        #empty_weight存入buffer
        self.register_buffer('empty_weight', empty_weight)

    
   def forward(self, outputs, targets):
        """ This performs the loss computation.
        Parameters:
             outputs: dict of tensors, see the output specification of the model for the format
             targets: list of dicts, such that len(targets) == batch_size.
                      The expected keys in each dict depends on the losses applied, see each loss' doc
        """
        # targets:GT列表
        # [{‘boxes’:...,'labels':...,...},{...},...]
        # outouts: detr模型输出
        # {‘pred_logits’: (b,num_queries, num_classes),
        # 'pred_boxes': (b,num_queries,4),
        # 'aux_outputs': [{‘pred_logits’:...,'pred_boxes': ..},{...},...]}
        # 只取出decoder最后一层的结果
        outputs_without_aux = {k: v for k, v in outputs.items() if k != 'aux_outputs'}

        # Retrieve the matching between the outputs of the last layer and the targets
        #进行匈牙利匹配得到(单张图片中所有query对应的索引,GT索引)的元组列表indices
        #ex:[([58,75,92],[1,0,2]),(...),(...),(...)]
        indices = self.matcher(outputs_without_aux, targets)

        # Compute the average number of target boxes accross all nodes, for normalization purposes
        #计算一个batch图像中目标的数量
        num_boxes = sum(len(t["labels"]) for t in targets)
        num_boxes = torch.as_tensor([num_boxes], dtype=torch.float, device=next(iter(outputs.values())).device)
        #分布式训练时使用
        if is_dist_avail_and_initialized():
            torch.distributed.all_reduce(num_boxes)
        # clamp(input,min,max,out)函数的功能将输入input张量每个元素的值压缩到区间[min, max],并返回结果到一个新张量。
        num_boxes = torch.clamp(num_boxes / get_world_size(), min=1).item()

        # Compute all the requested losses
        #通过get_loss()计算各个loss
        losses = {}
        for loss in self.losses:
            losses.update(self.get_loss(loss, outputs, targets, indices, num_boxes))

        # In case of auxiliary losses, we repeat this process with the output of each intermediate layer.
        # 是否计算中间层多层loss
        if 'aux_outputs' in outputs:
            for i, aux_outputs in enumerate(outputs['aux_outputs']):
                indices = self.matcher(aux_outputs, targets)
                for loss in self.losses:
                    if loss == 'masks':
                        # Intermediate masks losses are too costly to compute, we ignore them.
                        continue
                    kwargs = {}
                    if loss == 'labels':
                        # Logging is enabled only for the last layer
                        kwargs = {'log': False}
                    l_dict = self.get_loss(loss, aux_outputs, targets, indices, num_boxes, **kwargs)
                    l_dict = {k + f'_{i}': v for k, v in l_dict.items()}
                    losses.update(l_dict)

        return losses

分类LOSS计算?

#分类loss, 用的交叉熵
    def loss_labels(self, outputs, targets, indices, num_boxes, log=True):
        """Classification loss (NLL)
        targets dicts must contain the key "labels" containing a tensor of dim [nb_target_boxes]
        """
        assert 'pred_logits' in outputs
        #取出detr输出这的分类结果,key为pred_logits, size为 4*100*6
        src_logits = outputs['pred_logits']
        #获得batch索引和单帧目标索引 (tensor([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3]), tensor([58, 75, 92, 75, 92, 58, 75, 92, 58, 75, 92])) 2*11
        idx = self._get_src_permutation_idx(indices)
        #获取对应每个目标的label按indices顺序拼接 tensor([2, 2, 1, 2, 1, 2, 2, 1, 0, 0, 1], device='cuda:0') ,11
        target_classes_o = torch.cat([t["labels"][J] for t, (_, J) in zip(targets, indices)])
        # 4*100 初始化为背景
        target_classes = torch.full(src_logits.shape[:2], self.num_classes,
                                    dtype=torch.int64, device=src_logits.device)
        target_classes[idx] = target_classes_o
        #交叉熵
        loss_ce = F.cross_entropy(src_logits.transpose(1, 2), target_classes, self.empty_weight)
        losses = {'loss_ce': loss_ce}

        if log:
            # TODO this should probably be a separate loss, not hacked in this one here
            losses['class_error'] = 100 - accuracy(src_logits[idx], target_classes_o)[0]
        return losses
    
   def _get_src_permutation_idx(self, indices):
        # permute predictions following indices
        # torch.full_like(input, value),就是将input的形状作为返回结果tensor的形状,值全为value。
        #获取对应的目标在batch中的索引tensor([0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3])
        batch_idx = torch.cat([torch.full_like(src, i) for i, (src, _) in enumerate(indices)])
        #目标在单张图像所有目标中的索引 tensor([58, 75, 92, 75, 92, 58, 75, 92, 58, 75, 92])
        src_idx = torch.cat([src for (src, _) in indices])
        return batch_idx, src_idx

  人工智能 最新文章
2022吴恩达机器学习课程——第二课(神经网
第十五章 规则学习
FixMatch: Simplifying Semi-Supervised Le
数据挖掘Java——Kmeans算法的实现
大脑皮层的分割方法
【翻译】GPT-3是如何工作的
论文笔记:TEACHTEXT: CrossModal Generaliz
python从零学(六)
详解Python 3.x 导入(import)
【答读者问27】backtrader不支持最新版本的
上一篇文章      下一篇文章      查看所有文章
加:2022-06-08 19:03:35  更:2022-06-08 19:03:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/30 2:24:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码