项目目录
train_nerf.py
- 构建模型
model = RadianceFieldRenderer(
image_size=cfg.data.image_size,
n_pts_per_ray=cfg.raysampler.n_pts_per_ray,
n_pts_per_ray_fine=cfg.raysampler.n_pts_per_ray,
n_rays_per_image=cfg.raysampler.n_rays_per_image,
min_depth=cfg.raysampler.min_depth,
max_depth=cfg.raysampler.max_depth,
stratified=cfg.raysampler.stratified,
stratified_test=cfg.raysampler.stratified_test,
chunk_size_test=cfg.raysampler.chunk_size_test,
n_harmonic_functions_xyz=cfg.implicit_function.n_harmonic_functions_xyz,
n_harmonic_functions_dir=cfg.implicit_function.n_harmonic_functions_dir,
n_hidden_neurons_xyz=cfg.implicit_function.n_hidden_neurons_xyz,
n_hidden_neurons_dir=cfg.implicit_function.n_hidden_neurons_dir,
n_layers_xyz=cfg.implicit_function.n_layers_xyz,
density_noise_std=cfg.implicit_function.density_noise_std,
visualization=cfg.visualization.visdom,
)
- resume
checkpoint_path = os.path.join(hydra.utils.get_original_cwd(), cfg.checkpoint_path)
if len(cfg.checkpoint_path) > 0:
checkpoint_dir = os.path.split(checkpoint_path)[0]
os.makedirs(checkpoint_dir, exist_ok=True)
if cfg.resume and os.path.isfile(checkpoint_path):
print(f"Resuming from checkpoint {checkpoint_path}.")
loaded_data = torch.load(checkpoint_path)
model.load_state_dict(loaded_data["model"])
stats = pickle.loads(loaded_data["stats"])
print(f" => resuming from epoch {stats.epoch}.")
optimizer_state_dict = loaded_data["optimizer"]
start_epoch = stats.epoch
- 初始化optimizer
optimizer = torch.optim.Adam(
model.parameters(),
lr=cfg.optimizer.lr,
)
- 加载optimizer state dict
if optimizer_state_dict is not None:
optimizer.load_state_dict(optimizer_state_dict)
optimizer.last_epoch = start_epoch
- 初始化stats 对象
if stats is None:
stats = Stats(
["loss", "mse_coarse", "mse_fine", "psnr_coarse", "psnr_fine", "sec/it"],
)
- 定义学习率
def lr_lambda(epoch):
return cfg.optimizer.lr_scheduler_gamma ** (
epoch / cfg.optimizer.lr_scheduler_step_size
)
- learning rate scheduling
lr_scheduler = torch.optim.lr_scheduler.LambdaLR(
optimizer, lr_lambda, last_epoch=start_epoch - 1, verbose=False
)
- 加载训练和验证数据
train_dataset, val_dataset, _ = get_nerf_datasets(
dataset_name=cfg.data.dataset_name,
image_size=cfg.data.image_size,
)
- 设定训练验证dataloader
train_dataloader = torch.utils.data.DataLoader(
train_dataset,
batch_size=1,
shuffle=True,
num_workers=0,
collate_fn=trivial_collate,
)
val_dataloader = torch.utils.data.DataLoader(
val_dataset,
batch_size=1,
num_workers=0,
collate_fn=trivial_collate,
sampler=torch.utils.data.RandomSampler(
val_dataset,
replacement=True,
num_samples=cfg.optimizer.max_epochs,
),
)
- 模型训练。
nerf/datasets.py
get_nerf_datasets()
dataset_name: str,
image_size: Tuple[int, int],
data_root: str = DEFAULT_DATA_ROOT,
autodownload: bool = True,
Tuple[Dataset, Dataset, Dataset]:
- cameras_path , image_path
cameras_path = os.path.join(data_root, dataset_name + ".pth")
image_path = cameras_path.replace(".pth", ".png")
if autodownload and any(not os.path.isfile(p) for p in (cameras_path, image_path)):
download_data((dataset_name,), data_root=data_root)
train_data = torch.load(cameras_path)
n_cameras = train_data["cameras"]["R"].shape[0]
_image_max_image_pixels = Image.MAX_IMAGE_PIXELS
Image.MAX_IMAGE_PIXELS = None
images = torch.FloatTensor(np.array(Image.open(image_path))) / 255.0
images = torch.stack(torch.chunk(images, n_cameras, dim=0))[..., :3]
Image.MAX_IMAGE_PIXELS = _image_max_image_pixels
scale_factors = [s_new / s for s, s_new in zip(images.shape[1:3], image_size)]
if abs(scale_factors[0] - scale_factors[1]) > 1e-3:
raise ValueError(
"Non-isotropic scaling is not allowed. Consider changing the 'image_size' argument."
)
scale_factor = sum(scale_factors) * 0.5
if scale_factor != 1.0:
print(f"Rescaling dataset (factor={scale_factor})")
images = torch.nn.functional.interpolate(
images.permute(0, 3, 1, 2),
size=tuple(image_size),
mode="bilinear",
).permute(0, 2, 3, 1)
scale_factors = [s_new / s for s, s_new in zip(images.shape[1:3], image_size)]
if abs(scale_factors[0] - scale_factors[1]) > 1e-3:
raise ValueError(
"Non-isotropic scaling is not allowed. Consider changing the 'image_size' argument."
)
scale_factor = sum(scale_factors) * 0.5
if scale_factor != 1.0:
print(f"Rescaling dataset (factor={scale_factor})")
images = torch.nn.functional.interpolate(
images.permute(0, 3, 1, 2),
size=tuple(image_size),
mode="bilinear",
).permute(0, 2, 3, 1)
- 将train_data划分为三个部分,得到训练集,验证集,测试集 并返回。
train_dataset, val_dataset, test_dataset = [
ListDataset(
[
{"image": images[i], "camera": cameras[i], "camera_idx": int(i)}
for i in idx
]
)
for idx in [train_idx, val_idx, test_idx]
]
return train_dataset, val_dataset, test_dataset
nerf/nerf_render.py
文件实现了 RadianceFieldRenderer 类, 集成torch.nn.Module rendering 的前向传播过程如下:
- 对于给定的输入 camera, rendering ray 通过“NeRFRaysampler” 产生。
- 在training mode下, 射线是一组“射线是一组“n_rays_per_image”的图像网格的随机2D位置。
- 在evaluation mode下, 光束对应的是 整张图片的网格。 光束被进一步分割为“chunk_size_test” 大小的块,以防止内存不足的错误。
init()构造函数
参数:
image_size: Tuple[int, int],
n_pts_per_ray: int,
n_pts_per_ray_fine: int,
n_rays_per_image: int,
min_depth: float,
max_depth: float,
stratified: bool,
stratified_test: bool,
chunk_size_test: int,
n_harmonic_functions_xyz: int = 6,
n_harmonic_functions_dir: int = 4,
n_hidden_neurons_xyz: int = 256,
n_hidden_neurons_dir: int = 128,
n_layers_xyz: int = 8,
append_xyz: Tuple[int] = (5,),
density_noise_std: float = 0.0,
visualization: bool = False,
属性:
self._renderer = torch.nn.ModuleDict()
self._implicit_function = torch.nn.ModuleDict()
self._density_noise_std = density_noise_std
self._chunk_size_test = chunk_size_test
self._image_size = image_size
self.visualization = visualization
定义raymarcher:
raymarcher = EmissionAbsorptionNeRFRaymarcher()
获取图像尺寸
image_height, image_width = image_size
**确定raysampler方式: **
for render_pass in ("coarse", "fine"):
if render_pass == "coarse":
raysampler = NeRFRaysampler(
n_pts_per_ray=n_pts_per_ray,
min_depth=min_depth,
max_depth=max_depth,
stratified=stratified,
stratified_test=stratified_test,
n_rays_per_image=n_rays_per_image,
image_height=image_height,
image_width=image_width,
)
elif render_pass == "fine":
raysampler = ProbabilisticRaysampler(
n_pts_per_ray=n_pts_per_ray_fine,
stratified=stratified,
stratified_test=stratified_test,
)
初始化render
self._renderer[render_pass] = ImplicitRenderer(
raysampler=raysampler,
raymarcher=raymarcher,
)
实例化网络
self._implicit_function[render_pass] = NeuralRadianceField(
n_harmonic_functions_xyz=n_harmonic_functions_xyz,
n_harmonic_functions_dir=n_harmonic_functions_dir,
n_hidden_neurons_xyz=n_hidden_neurons_xyz,
n_hidden_neurons_dir=n_hidden_neurons_dir,
n_layers_xyz=n_layers_xyz,
append_xyz=append_xyz,
)
precache_rays()
参数:
cache_cameras: List[CamerasBase],
cache_camera_hashes: List[str],
self._renderer["coarse"].raysampler.precache_rays(
cache_cameras,
cache_camera_hashes,
_process_ray_chunk()
参数:
` camera_hash: Optional[str],
camera: CamerasBase,
image: torch.Tensor,
chunk_idx: int,
返回: dict
out: `dict` containing the outputs of the rendering:
`rgb_coarse`: The result of the coarse rendering pass.
`rgb_fine`: The result of the fine rendering pass.
`rgb_gt`: The corresponding ground-truth RGB values.
通过self._renderer来获取 渲染后的rgb,权重等:
for renderer_pass in ("coarse", "fine"):
(rgb, weights), ray_bundle_out = self._renderer[renderer_pass](
cameras=camera,
volumetric_function=self._implicit_function[renderer_pass],
chunksize=self._chunk_size_test,
chunk_idx=chunk_idx,
density_noise_std=(self._density_noise_std if self.training else 0.0),
input_ray_bundle=coarse_ray_bundle,
ray_weights=coarse_weights,
camera_hash=camera_hash,
)
if renderer_pass == "coarse":
rgb_coarse = rgb
coarse_ray_bundle = ray_bundle_out
coarse_weights = weights
if image is not None:
rgb_gt = sample_images_at_mc_locs(
image[..., :3][None],
ray_bundle_out.xys,
)
else:
rgb_gt = None
elif renderer_pass == "fine":
rgb_fine = rgb
else:
raise ValueError(f"No such rendering pass {renderer_pass}")
forward()
根据输入camera的视角点,执行coarse和fine的渲染过程,。 渲染结果会和ground truth做比较。 对比峰值信噪比和均方误差。
- 在training模型下,选取图像光束的随机子集进行渲染。
- 在非training模式下, 渲染整个图像。 但是为了防止out of memory,所以光束将按照 chunksize 按批次进行采样和渲染。
参数
camera_hash: Optional[str],
camera: CamerasBase,
image: torch.Tensor,
返回:Tuple[dict, dict] out: dictcontaining the outputs of the rendering: rgb_coarse: The result of the coarse rendering pass. rgb_fine: The result of the fine rendering pass. rgb_gt: The corresponding ground-truth RGB values.
-
rgb的shapa取决于 是否是training模式。 -
在training模式下, 三个rgb tensors 都是(batchsize, n_rays_per_image,3) -
非training模式下,三个rgb tensors 都是((batch_size, image_size[0], image_size[1], 3) -
metrics: dict 包含fine、coarse 和真实值对比的 误差metrics。 -
mse_coarse : coarse 和 真实值的MSE -
mse_fine: fine 和 真实值的MSE -
psnr_coarse: coarse 和 真实值的峰值信噪比 -
psnr_fine: fine 和 真实值的峰值信噪比
如果是测试的话,获取chunks的数量
if not self.training:
n_chunks = self._renderer["coarse"].raysampler.get_n_chunks(
self._chunk_size_test,
camera.R.shape[0],
)
else:
n_chunks = 1
计算一个chunk的输出:
chunk_outputs = [
self._process_ray_chunk(
camera_hash,
camera,
image,
chunk_idx,
)
for chunk_idx in range(n_chunks)
]
对于完整的渲染通道,连接输出块,并重塑为图像大小。
if not self.training:
out = {
k: torch.cat(
[ch_o[k] for ch_o in chunk_outputs],
dim=1,
).view(-1, *self._image_size, 3)
if chunk_outputs[0][k] is not None
else None
for k in ("rgb_fine", "rgb_coarse", "rgb_gt")
}
else:
out = chunk_outputs[0]
计算误差矩阵
metrics = {}
if image is not None:
for render_pass in ("coarse", "fine"):
for metric_name, metric_fun in zip(
("mse", "psnr"), (calc_mse, calc_psnr)
):
metrics[f"{metric_name}_{render_pass}"] = metric_fun(
out["rgb_" + render_pass][..., :3],
out["rgb_gt"][..., :3],
)
return out, metrics
visualize_nerf_outputs()
可视化RadianceFieldRenderer 的输出 参数
nerf_out: dict,
output_cache: List,
viz: Visdom,
visdom_env: str
展示训练的图片
ims = torch.stack([o["image"] for o in output_cache])
ims = torch.cat(list(ims), dim=1)
viz.image(
ims.permute(2, 0, 1),
env=visdom_env,
win="images",
opts={"title": "train_images"},
)
将coarse、fine 和真实值一起展示
ims_full = torch.cat(
[
nerf_out[imvar][0].permute(2, 0, 1).detach().cpu().clamp(0.0, 1.0)
for imvar in ("rgb_coarse", "rgb_fine", "rgb_gt")
],
dim=2,
)
viz.image(
ims_full,
env=visdom_env,
win="images_full",
opts={"title": "coarse | fine | target"},
)
制作训练摄像机及其发射光线的 3D 图。
camera_trace = {
f"camera_{ci:03d}": o["camera"].cpu() for ci, o in enumerate(output_cache)
}
ray_pts_trace = {
f"ray_pts_{ci:03d}": Pointclouds(
ray_bundle_to_ray_points(o["coarse_ray_bundle"])
.detach()
.cpu()
.view(1, -1, 3)
)
for ci, o in enumerate(output_cache)
}
plotly_plot = plot_scene(
{
"training_scene": {
**camera_trace,
**ray_pts_trace,
},
},
pointcloud_max_points=5000,
pointcloud_marker_size=1,
camera_scale=0.3,
)
viz.plotlyplot(plotly_plot, env=visdom_env, win="scenes")
implicit_function.py _ class NeuralRadianceField()
** 参数**
n_harmonic_functions_xyz: int = 6,
n_harmonic_functions_dir: int = 4,
n_hidden_neurons_xyz: int = 256,
n_hidden_neurons_dir: int = 128,
n_layers_xyz: int = 8,
append_xyz: Tuple[int] = (5,),
use_multiple_streams: bool = True,
**kwargs,
构造函数
self.harmonic_embedding_xyz = HarmonicEmbedding(n_harmonic_functions_xyz)
self.harmonic_embedding_dir = HarmonicEmbedding(n_harmonic_functions_dir)
embedding_dim_xyz = n_harmonic_functions_xyz * 2 * 3 + 3
embedding_dim_dir = n_harmonic_functions_dir * 2 * 3 + 3
self.mlp_xyz = MLPWithInputSkips(
n_layers_xyz,
embedding_dim_xyz,
n_hidden_neurons_xyz,
embedding_dim_xyz,
n_hidden_neurons_xyz,
input_skips=append_xyz,
)
self.intermediate_linear = torch.nn.Linear(
n_hidden_neurons_xyz, n_hidden_neurons_xyz
)
_xavier_init(self.intermediate_linear)
self.density_layer = torch.nn.Linear(n_hidden_neurons_xyz, 1)
_xavier_init(self.density_layer)
self.density_layer.bias.data[:] = 0.0
self.color_layer = torch.nn.Sequential(
LinearWithRepeat(
n_hidden_neurons_xyz + embedding_dim_dir, n_hidden_neurons_dir
),
torch.nn.ReLU(True),
torch.nn.Linear(n_hidden_neurons_dir, 3),
torch.nn.Sigmoid(),
)
self.use_multiple_streams = use_multiple_streams
_get_densities()
此函数采用 self.mlp_xyz 预测的 features 并使用 self.density_layer 将它们转换为 raw_densities 。 raw_densities 稍后使用深度步长重新加权,并映射到 [0-1] 范围,其中 1 - raw_densities 的反指数。 参数
features: torch.Tensor,
depth_values: torch.Tensor,
density_noise_std: float,
返回 : torch.tensor 内容:
raw_densities = self.density_layer(features)
deltas = torch.cat(
(
depth_values[..., 1:] - depth_values[..., :-1],
1e10 * torch.ones_like(depth_values[..., :1]),
),
dim=-1,
)[..., None]
if density_noise_std > 0.0:
raw_densities = (
raw_densities + torch.randn_like(raw_densities) * density_noise_std
)
densities = 1 - (-deltas * torch.relu(raw_densities)).exp()
return densities
_get_colors()
预测每个点的rgb颜色 参数
self, features: torch.Tensor,
rays_directions: torch.Tensor
) -> torch.Tensor:
rays_directions_normed = torch.nn.functional.normalize(rays_directions, dim=-1)
rays_embedding = self.harmonic_embedding_dir(rays_directions_normed)
return self.color_layer((self.intermediate_linear(features), rays_embedding))
_get_densities_and_colors()
对于一个batch而言,做整体的计算 参数
features: torch.Tensor,
ray_bundle: RayBundle,
density_noise_std: float
) -> Tuple[torch.Tensor, torch.Tensor]:
两个输出,一个是rays_densities,shape (minibatch, ..., num_points_per_ray, 1) ,用于表示每个射线点的不透明度 一个是rays_colors, shape (minibatch, …, num_points_per_ray, 3) 用于表示每个射线点的颜色。
if self.use_multiple_streams and features.is_cuda:
current_stream = torch.cuda.current_stream(features.device)
other_stream = torch.cuda.Stream(features.device)
other_stream.wait_stream(current_stream)
with torch.cuda.stream(other_stream):
rays_densities = self._get_densities(
features, ray_bundle.lengths, density_noise_std
)
rays_colors = self._get_colors(features, ray_bundle.directions)
current_stream.wait_stream(other_stream)
else:
rays_densities = self._get_densities(
features, ray_bundle.lengths, density_noise_std
)
rays_colors = self._get_colors(features, ray_bundle.directions)
return rays_densities, rays_colors
forward()
参数
ray_bundle: RayBundle,
density_noise_std: float = 0.0,
**kwargs,
) -> Tuple[torch.Tensor, torch.Tensor]:
ray_bundle 包含了光束原点、方向、和长度等信息。
- 原点: shape
(minibatch, ..., 3) - 方向: shape
(minibatch, ..., 3) - 长度:shape
(minibatch, ..., num_points_per_ray) 光线被采样的长度
返回
- rays_densities : A tensor of shape
(minibatch, ..., num_points_per_ray, 1) denoting the opacity of each ray point. - rays_colors: A tensor of shape
(minibatch, ..., num_points_per_ray, 3) denoting the color of each ray point.
转换参数为世界坐标系
rays_points_world = ray_bundle_to_ray_points(ray_bundle)
embeds_xyz = self.harmonic_embedding_xyz(rays_points_world)
features = self.mlp_xyz(embeds_xyz, embeds_xyz)
rays_densities, rays_colors = self._get_densities_and_colors(
features, ray_bundle, density_noise_std
)
return rays_densities, rays_colors
Implicit_function.py —— class MLPWithInputSkips()
实现NeRF的多层感知机
构造函数
def __init__(
self,
n_layers: int,
input_dim: int,
output_dim: int,
skip_dim: int,
hidden_dim: int,
input_skips: Tuple[int] = (),
):
"""
Args:
n_layers: The number of linear layers of the MLP.
input_dim: The number of channels of the input tensor.
output_dim: The number of channels of the output.
skip_dim: The number of channels of the tensor `z` appended when
evaluating the skip layers. 在评估跳过层时附加的张量“z”的通道数。
hidden_dim: The number of hidden units of the MLP.
input_skips: The list of layer indices at which we append the skip
tensor `z`.
"""
super().__init__()
layers = []
for layeri in range(n_layers):
if layeri == 0:
dimin = input_dim
dimout = hidden_dim
elif layeri in input_skips:
dimin = hidden_dim + skip_dim
dimout = hidden_dim
else:
dimin = hidden_dim
dimout = hidden_dim
linear = torch.nn.Linear(dimin, dimout)
_xavier_init(linear)
layers.append(torch.nn.Sequential(linear, torch.nn.ReLU(True)))
self.mlp = torch.nn.ModuleList(layers)
self._input_skips = set(input_skips)
forward()
def forward(self, x: torch.Tensor, z: torch.Tensor) -> torch.Tensor:
"""
Args:
x: The input tensor of shape `(..., input_dim)`.
z: The input skip tensor of shape `(..., skip_dim)` which is appended
to layers whose indices are specified by `input_skips`.
Returns:
y: The output tensor of shape `(..., output_dim)`.
"""
y = x
for li, layer in enumerate(self.mlp):
if li in self._input_skips:
y = torch.cat((y, z), dim=-1)
y = layer(y)
return y
其他文件
- raymarcher.py
- raysampler.py
- stats.py
- utils.py
- eval_video_utils.py
参看文献
https://github.com/facebookresearch/pytorch3d/tree/main/projects/nerf
|