模型架构
主要创新点:
论文地址:https://arxiv.org/pdf/2104.09224.pdf
- 双分支结构:两路分别用ResNet作为特征提取网络,中间特征用transformer进行融合并经过上采样后重新输入主干网络;
- 生成全局512长度的语义描述向量;
- 利用MLP进行多轨迹路径规划;
Model部分(model.py)
- 本文不涉及到路径预测部分:PID控制器和GRU路径预测网络部分不涉及
- 自上而下的方式介绍全局向量提取网络
Transfuser主网络部分:
class Encoder(nn.Module):
def __init__():
...
def forward():
image_features = self.image_encoder.features.conv1(image_tensor)
image_features = self.image_encoder.features.bn1(image_features)
image_features = self.image_encoder.features.relu(image_features)
image_features = self.image_encoder.features.maxpool(image_features)
lidar_features = self.lidar_encoder._model.conv1(lidar_tensor)
lidar_features = self.lidar_encoder._model.bn1(lidar_features)
lidar_features = self.lidar_encoder._model.relu(lidar_features)
lidar_features = self.lidar_encoder._model.maxpool(lidar_features)
image_features = self.image_encoder.features.layer1(image_features)
lidar_features = self.lidar_encoder._model.layer1(lidar_features)
image_embd_layer1 = self.avgpool(image_features)
lidar_embd_layer1 = self.avgpool(lidar_features)
image_features_layer1, lidar_features_layer1 = self.transformer1(image_embd_layer1, lidar_embd_layer1, velocity)
image_features_layer1 = F.interpolate(image_features_layer1, scale_factor=8, mode='bilinear')
lidar_features_layer1 = F.interpolate(lidar_features_layer1, scale_factor=8, mode='bilinear')
image_features = image_features + image_features_layer1
lidar_features = lidar_features + lidar_features_layer1
image_features = self.image_encoder.features.avgpool(image_features)
image_features = torch.flatten(image_features, 1)
image_features = image_features.view(bz, self.config.n_views * self.config.seq_len, -1)
lidar_features = self.lidar_encoder._model.avgpool(lidar_features)
lidar_features = torch.flatten(lidar_features, 1)
lidar_features = lidar_features.view(bz, self.config.seq_len, -1)
fused_features = torch.cat([image_features, lidar_features], dim=1)
fused_features = torch.sum(fused_features, dim=1)
GPT部分
- 在这里,作者将文字预测模型GPT做了一些改变应用于Transfuser作为Encoder,里面有n个transformer block
'''
n_embd = 512
block_exp = 4
n_layer = 8
n_head = 4
n_scale = 4
embd_pdrop = 0.1
resid_pdrop = 0.1
attn_pdrop = 0.1
n_views = 1
seq_len = 1
pred_len = 4
class GPT(nn.Module):
def __init__(self, n_embd, n_head, block_exp, n_layer,
vert_anchors, horz_anchors, seq_len,
embd_pdrop, attn_pdrop, resid_pdrop, config):
super().__init__()
self.n_embd = n_embd
self.seq_len = seq_len
self.vert_anchors = vert_anchors
self.horz_anchors = horz_anchors
self.config = config
self.pos_emb = nn.Parameter(torch.zeros(1, (self.config.n_views + 1) * seq_len * vert_anchors * horz_anchors, n_embd))
self.vel_emb = nn.Linear(1, n_embd)
self.drop = nn.Dropout(embd_pdrop)
self.blocks = nn.Sequential(*[Block(n_embd, n_head,
block_exp, attn_pdrop, resid_pdrop)
for layer in range(n_layer)])
self.ln_f = nn.LayerNorm(n_embd)
self.block_size = seq_len
self.apply(self._init_weights)
def forward(self, image_tensor, lidar_tensor, velocity):
bz = lidar_tensor.shape[0] // self.seq_len
h, w = lidar_tensor.shape[2:4]
image_tensor = image_tensor.view(bz, self.config.n_views * self.seq_len, -1, h, w)
lidar_tensor = lidar_tensor.view(bz, self.seq_len, -1, h, w)
token_embeddings = torch.cat([image_tensor, lidar_tensor], dim=1).permute(0,1,3,4,2).contiguous()
token_embeddings = token_embeddings.view(bz, -1, self.n_embd)
velocity_embeddings = self.vel_emb(velocity.unsqueeze(1))
x = self.drop(self.pos_emb + token_embeddings + velocity_embeddings.unsqueeze(1))
x = self.blocks(x)
x = self.ln_f(x)
x = x.view(bz, (self.config.n_views + 1) * self.seq_len, self.vert_anchors, self.horz_anchors, self.n_embd)
x = x.permute(0,1,4,2,3).contiguous()
image_tensor_out = x[:, :self.config.n_views*self.seq_len, :, :, :].contiguous().view(bz * self.config.n_views * self.seq_len, -1, h, w)
lidar_tensor_out = x[:, self.config.n_views*self.seq_len:, :, :, :].contiguous().view(bz * self.seq_len, -1, h, w)
return image_tensor_out, lidar_tensor_out
Block部分
class Block(nn.Module):
def __init__(self, n_embd, n_head, block_exp, attn_pdrop, resid_pdrop):
super().__init__()
self.ln1 = nn.LayerNorm(n_embd)
self.ln2 = nn.LayerNorm(n_embd)
self.attn = SelfAttention(n_embd, n_head, attn_pdrop, resid_pdrop)
self.mlp = nn.Sequential(
nn.Linear(n_embd, block_exp * n_embd),
nn.ReLU(True),
nn.Linear(block_exp * n_embd, n_embd),
nn.Dropout(resid_pdrop),
)
def forward(self, x):
B, T, C = x.size()
x = x + self.attn(self.ln1(x))
x = x + self.mlp(self.ln2(x))
return x
Self-Attension部分
class SelfAttention(nn.Module):
def __init__(self, n_embd, n_head, attn_pdrop, resid_pdrop):
super().__init__()
assert n_embd % n_head == 0
self.key = nn.Linear(n_embd, n_embd)
self.query = nn.Linear(n_embd, n_embd)
self.value = nn.Linear(n_embd, n_embd)
self.attn_drop = nn.Dropout(attn_pdrop)
self.resid_drop = nn.Dropout(resid_pdrop)
self.proj = nn.Linear(n_embd, n_embd)
self.n_head = n_head
def forward(self, x):
B, T, C = x.size()
k = self.key(x).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
q = self.query(x).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
v = self.value(x).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
att = F.softmax(att, dim=-1)
att = self.attn_drop(att)
y = att @ v
y = y.transpose(1, 2).contiguous().view(B, T, C)
y = self.resid_drop(self.proj(y))
return y
ImageCNN和Lidar Encoder
- 这里将原始的image和lidar数据(鸟瞰图投影图像)分割为patch,并作为主干CNN提取网络生成每一阶段的特征图
class ImageCNN(nn.Module):
def __init__(self, c_dim, normalize=True):
super().__init__()
self.normalize = normalize
self.features = models.resnet34(pretrained=True)
self.features.fc = nn.Sequential()
def forward(self, inputs):
c = 0
for x in inputs:
if self.normalize:
x = normalize_imagenet(x)
c += self.features(x)
return c
class LidarEncoder(nn.Module):
def __init__(self, num_classes=512, in_channels=2):
super().__init__()
self._model = models.resnet18()
self._model.fc = nn.Sequential()
_tmp = self._model.conv1
self._model.conv1 = nn.Conv2d(in_channels, out_channels=_tmp.out_channels,
kernel_size=_tmp.kernel_size, stride=_tmp.stride, padding=_tmp.padding, bias=_tmp.bias)
def forward(self, inputs):
features = 0
for lidar_data in inputs:
lidar_feature = self._model(lidar_data)
features += lidar_feature
return features
|