317 lines
12 KiB
Python
Executable File
317 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
from pathlib import Path
|
|
from typing import Iterable, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
def load_calib(calib_path: Path) -> dict:
|
|
with calib_path.open("r", encoding="utf-8") as file:
|
|
calib = json.load(file)
|
|
|
|
required = ["focal_u", "focal_v", "cu", "cv"]
|
|
missing = [key for key in required if key not in calib]
|
|
if missing:
|
|
raise KeyError(f"{calib_path} missing required keys: {missing}")
|
|
return calib
|
|
|
|
|
|
def infer_image_size(calib: dict) -> Tuple[int, int]:
|
|
width = calib.get("image_width", calib.get("img_width"))
|
|
height = calib.get("image_height", calib.get("img_height"))
|
|
if width is None or height is None:
|
|
raise ValueError("Calibration JSON is missing image_width/image_height.")
|
|
return int(width), int(height)
|
|
|
|
|
|
def compute_vanishing_point(calib: dict) -> Tuple[float, float]:
|
|
vx = calib["cu"] + calib["focal_u"] * math.tan(math.radians(calib.get("yaw", 0.0)))
|
|
vy = calib["cv"] - calib["focal_v"] * math.tan(math.radians(calib.get("pitch", 0.0)))
|
|
return vx, vy
|
|
|
|
|
|
def iter_images(images_dir: Path, glob_pattern: str) -> Iterable[Path]:
|
|
if not images_dir.is_dir():
|
|
raise FileNotFoundError(f"Images directory not found: {images_dir}")
|
|
for path in sorted(images_dir.glob(glob_pattern)):
|
|
if path.is_file():
|
|
yield path
|
|
|
|
|
|
def compute_simul_calib(
|
|
calib_params: dict,
|
|
ori_img_size: Tuple[int, int],
|
|
target_size: Tuple[int, int],
|
|
crop_center_x: float,
|
|
crop_center_y: float,
|
|
target_fx: float,
|
|
) -> dict:
|
|
fx_orig = calib_params["focal_u"]
|
|
fy_orig = calib_params["focal_v"]
|
|
cx_orig = calib_params["cu"]
|
|
cy_orig = calib_params["cv"]
|
|
distort_coeffs = calib_params.get("distort_coeffs", [])
|
|
|
|
ori_w, ori_h = ori_img_size
|
|
target_w, target_h = target_size
|
|
|
|
k_orig = np.array(
|
|
[[fx_orig, 0.0, cx_orig], [0.0, fy_orig, cy_orig], [0.0, 0.0, 1.0]],
|
|
dtype=np.float64,
|
|
)
|
|
d = np.array(distort_coeffs[:4], dtype=np.float64) if len(distort_coeffs) >= 4 else np.zeros(4, dtype=np.float64)
|
|
|
|
k_new = cv2.fisheye.estimateNewCameraMatrixForUndistortRectify(
|
|
k_orig,
|
|
d,
|
|
(ori_w, ori_h),
|
|
np.eye(3),
|
|
balance=0.0,
|
|
)
|
|
|
|
fx_undistorted = k_new[0, 0]
|
|
fy_undistorted = k_new[1, 1]
|
|
cx_undistorted = k_new[0, 2]
|
|
cy_undistorted = k_new[1, 2]
|
|
|
|
dist_point = np.array([[[crop_center_x, crop_center_y]]], dtype=np.float32)
|
|
undist_point = cv2.fisheye.undistortPoints(dist_point, k_orig, d, P=k_new)
|
|
center_x_undist = float(undist_point[0, 0, 0])
|
|
center_y_undist = float(undist_point[0, 0, 1])
|
|
|
|
max_w = min(center_x_undist * 2.0, (ori_w - center_x_undist) * 2.0)
|
|
max_h = min(center_y_undist * 2.0, (ori_h - center_y_undist) * 2.0)
|
|
|
|
gcd = math.gcd(target_w, target_h)
|
|
ratio_w = target_w // gcd
|
|
ratio_h = target_h // gcd
|
|
|
|
k_from_w = int(max_w / ratio_w)
|
|
k_from_h = int(max_h / ratio_h)
|
|
k_max = min(k_from_w, k_from_h)
|
|
if k_max <= 0:
|
|
raise ValueError(
|
|
f"Unable to find a valid crop for target size {target_size} around center "
|
|
f"({crop_center_x:.2f}, {crop_center_y:.2f})."
|
|
)
|
|
|
|
# Unlike the dataloader training path, this offline export tool should also
|
|
# work when the valid no-black-margin undistorted crop is smaller than the
|
|
# requested output size. In that case we keep the largest valid crop and
|
|
# upscale it back to the target size instead of failing.
|
|
k = k_max
|
|
crop_w = k * ratio_w
|
|
crop_h = k * ratio_h
|
|
|
|
crop_x1 = int(center_x_undist - crop_w / 2)
|
|
crop_y1 = int(center_y_undist - crop_h / 2)
|
|
crop_x2 = crop_x1 + crop_w
|
|
crop_y2 = crop_y1 + crop_h
|
|
|
|
scale_x = target_w / crop_w
|
|
scale_y = target_h / crop_h
|
|
if not np.isclose(scale_x, scale_y):
|
|
raise ValueError(f"Non-uniform resize is not supported: scale_x={scale_x}, scale_y={scale_y}")
|
|
|
|
scaled_fx = scale_x * fx_undistorted
|
|
scaled_fy = scale_y * fy_undistorted
|
|
scaled_cx = (cx_undistorted - crop_x1) * scale_x
|
|
scaled_cy = (cy_undistorted - crop_y1) * scale_y
|
|
|
|
return {
|
|
"fx": float(scaled_fx),
|
|
"fy": float(scaled_fy),
|
|
"cx": float(scaled_cx),
|
|
"cy": float(scaled_cy),
|
|
"distort_coeffs": [],
|
|
"depth_scale": float(scaled_fx / target_fx),
|
|
"crop_bounds": (int(crop_x1), int(crop_y1), int(crop_x2), int(crop_y2)),
|
|
"scale": (float(scale_x), float(scale_y)),
|
|
"K_undistorted": k_new,
|
|
"crop_center_undistorted": (center_x_undist, center_y_undist),
|
|
"fx_to_target_scale": float(target_fx / scaled_fx),
|
|
}
|
|
|
|
|
|
def apply_simul_transform(
|
|
img: np.ndarray,
|
|
simul_calib: dict,
|
|
calib_params: dict,
|
|
target_size: Tuple[int, int],
|
|
) -> np.ndarray:
|
|
fx = calib_params["focal_u"]
|
|
fy = calib_params["focal_v"]
|
|
cx = calib_params["cu"]
|
|
cy = calib_params["cv"]
|
|
distort_coeffs = calib_params.get("distort_coeffs", [])
|
|
|
|
k_orig = np.array(
|
|
[[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]],
|
|
dtype=np.float64,
|
|
)
|
|
d = np.array(distort_coeffs[:4], dtype=np.float64) if len(distort_coeffs) >= 4 else np.zeros(4, dtype=np.float64)
|
|
k_new = simul_calib["K_undistorted"]
|
|
|
|
img_undistorted = cv2.fisheye.undistortImage(img, k_orig, d, Knew=k_new)
|
|
crop_x1, crop_y1, crop_x2, crop_y2 = simul_calib["crop_bounds"]
|
|
img_cropped = img_undistorted[crop_y1:crop_y2, crop_x1:crop_x2]
|
|
return cv2.resize(img_cropped, target_size, interpolation=cv2.INTER_LINEAR)
|
|
|
|
|
|
def build_output_calib(calib: dict, simul_calib: dict, target_size: Tuple[int, int]) -> dict:
|
|
target_w, target_h = target_size
|
|
output_calib = dict(calib)
|
|
output_calib["focal_u"] = simul_calib["fx"]
|
|
output_calib["focal_v"] = simul_calib["fy"]
|
|
output_calib["cu"] = simul_calib["cx"]
|
|
output_calib["cv"] = simul_calib["cy"]
|
|
output_calib["distort_coeffs"] = []
|
|
output_calib["image_width"] = int(target_w)
|
|
output_calib["image_height"] = int(target_h)
|
|
output_calib["img_width"] = int(target_w)
|
|
output_calib["img_height"] = int(target_h)
|
|
output_calib["simul_transform"] = {
|
|
"crop_bounds": list(simul_calib["crop_bounds"]),
|
|
"scale": list(simul_calib["scale"]),
|
|
"depth_scale": simul_calib["depth_scale"],
|
|
"fx_to_target_scale": simul_calib["fx_to_target_scale"],
|
|
}
|
|
return output_calib
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Undistort decoded PDCL clip images using the same virtual-calibration logic as dataloaders3d.py."
|
|
)
|
|
parser.add_argument("--case-dir", type=str, default="", help="Case directory containing images/ and calib/L2_calib/camera4.json")
|
|
parser.add_argument("--images-dir", type=str, default="", help="Input image directory. Used when --case-dir is not set.")
|
|
parser.add_argument("--calib-file", type=str, default="", help="Input camera4.json path. Used when --case-dir is not set.")
|
|
parser.add_argument("--output-dir", type=str, required=True, help="Output directory for undistorted data.")
|
|
parser.add_argument("--glob", type=str, default="*.png", help="Image glob pattern inside images-dir. Default: *.png")
|
|
parser.add_argument("--target-size", nargs=2, type=int, default=None, metavar=("W", "H"), help="Output image size. Default: use source calibration size.")
|
|
parser.add_argument("--crop-center-x", type=float, default=None, help="Crop center x in distorted image. Default: image center.")
|
|
parser.add_argument("--crop-center-y", type=float, default=None, help="Crop center y in distorted image. Default: vanishing point y.")
|
|
parser.add_argument("--target-fx", type=float, default=None, help="Reference target focal length. Default: use source focal_u.")
|
|
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output images.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def resolve_inputs(args: argparse.Namespace) -> Tuple[Path, Path]:
|
|
if args.case_dir:
|
|
case_dir = Path(args.case_dir).resolve()
|
|
images_dir = case_dir / "images"
|
|
calib_path = case_dir / "calib" / "L2_calib" / "camera4.json"
|
|
return images_dir, calib_path
|
|
if not args.images_dir or not args.calib_file:
|
|
raise ValueError("Either --case-dir or both --images-dir and --calib-file are required.")
|
|
return Path(args.images_dir).resolve(), Path(args.calib_file).resolve()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
images_dir, calib_path = resolve_inputs(args)
|
|
output_dir = Path(args.output_dir).resolve()
|
|
output_images_dir = output_dir / "images"
|
|
output_calib_dir = output_dir / "calib" / "L2_calib"
|
|
output_images_dir.mkdir(parents=True, exist_ok=True)
|
|
output_calib_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
calib = load_calib(calib_path)
|
|
ori_img_size = infer_image_size(calib)
|
|
target_size = tuple(args.target_size) if args.target_size else ori_img_size
|
|
vp_x, vp_y = compute_vanishing_point(calib)
|
|
crop_center_x = args.crop_center_x if args.crop_center_x is not None else ori_img_size[0] / 2.0
|
|
crop_center_y = args.crop_center_y if args.crop_center_y is not None else vp_y
|
|
target_fx = args.target_fx if args.target_fx is not None else calib["focal_u"]
|
|
|
|
simul_calib = compute_simul_calib(
|
|
calib_params=calib,
|
|
ori_img_size=ori_img_size,
|
|
target_size=target_size,
|
|
crop_center_x=crop_center_x,
|
|
crop_center_y=crop_center_y,
|
|
target_fx=target_fx,
|
|
)
|
|
output_calib = build_output_calib(calib, simul_calib, target_size)
|
|
|
|
image_paths = list(iter_images(images_dir, args.glob))
|
|
if not image_paths:
|
|
raise FileNotFoundError(f"No images matched {args.glob} in {images_dir}")
|
|
|
|
print(f"Input images : {images_dir}")
|
|
print(f"Input calib : {calib_path}")
|
|
print(f"Output dir : {output_dir}")
|
|
print(f"Image count : {len(image_paths)}")
|
|
print(f"Target size : {target_size[0]}x{target_size[1]}")
|
|
print(f"Crop center : ({crop_center_x:.2f}, {crop_center_y:.2f})")
|
|
print(f"Crop bounds : {simul_calib['crop_bounds']}")
|
|
print(f"Resize scale : ({simul_calib['scale'][0]:.6f}, {simul_calib['scale'][1]:.6f})")
|
|
print(f"Output fx/fy : ({simul_calib['fx']:.4f}, {simul_calib['fy']:.4f})")
|
|
print(f"Output cx/cy : ({simul_calib['cx']:.4f}, {simul_calib['cy']:.4f})")
|
|
|
|
ok_count = 0
|
|
for idx, image_path in enumerate(image_paths, start=1):
|
|
out_path = output_images_dir / image_path.name
|
|
if out_path.exists() and not args.overwrite:
|
|
print(f"[{idx}/{len(image_paths)}] SKIP exists: {out_path}")
|
|
continue
|
|
|
|
image = cv2.imread(str(image_path), cv2.IMREAD_COLOR)
|
|
if image is None:
|
|
print(f"[{idx}/{len(image_paths)}] SKIP unreadable: {image_path}")
|
|
continue
|
|
|
|
transformed = apply_simul_transform(
|
|
img=image,
|
|
simul_calib=simul_calib,
|
|
calib_params=calib,
|
|
target_size=target_size,
|
|
)
|
|
if not cv2.imwrite(str(out_path), transformed):
|
|
print(f"[{idx}/{len(image_paths)}] FAIL write: {out_path}")
|
|
continue
|
|
ok_count += 1
|
|
print(f"[{idx}/{len(image_paths)}] OK -> {out_path}")
|
|
|
|
output_calib_path = output_calib_dir / "camera4.json"
|
|
with output_calib_path.open("w", encoding="utf-8") as file:
|
|
json.dump(output_calib, file, indent=2, ensure_ascii=False)
|
|
|
|
manifest = {
|
|
"input_images": str(images_dir),
|
|
"input_calib": str(calib_path),
|
|
"output_images": str(output_images_dir),
|
|
"output_calib": str(output_calib_path),
|
|
"source_image_size": list(ori_img_size),
|
|
"target_size": list(target_size),
|
|
"source_vanishing_point": [vp_x, vp_y],
|
|
"crop_center": [crop_center_x, crop_center_y],
|
|
"crop_bounds": list(simul_calib["crop_bounds"]),
|
|
"scale": list(simul_calib["scale"]),
|
|
"output_intrinsics": {
|
|
"focal_u": simul_calib["fx"],
|
|
"focal_v": simul_calib["fy"],
|
|
"cu": simul_calib["cx"],
|
|
"cv": simul_calib["cy"],
|
|
},
|
|
"processed_images": ok_count,
|
|
"total_images": len(image_paths),
|
|
}
|
|
manifest_path = output_dir / "undistort_manifest.json"
|
|
with manifest_path.open("w", encoding="utf-8") as file:
|
|
json.dump(manifest, file, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Completed: {ok_count}/{len(image_paths)} image(s) written.")
|
|
print(f"Calib : {output_calib_path}")
|
|
print(f"Manifest : {manifest_path}")
|
|
return 0 if ok_count > 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|