#!/usr/bin/env python3 import argparse import json import math from pathlib import Path from typing import Iterable, Tuple import cv2 import numpy as np def load_calib(calib_path: Path) -> dict: with calib_path.open("r", encoding="utf-8") as file: calib = json.load(file) required = ["focal_u", "focal_v", "cu", "cv"] missing = [key for key in required if key not in calib] if missing: raise KeyError(f"{calib_path} missing required keys: {missing}") return calib def infer_image_size(calib: dict) -> Tuple[int, int]: width = calib.get("image_width", calib.get("img_width")) height = calib.get("image_height", calib.get("img_height")) if width is None or height is None: raise ValueError("Calibration JSON is missing image_width/image_height.") return int(width), int(height) def compute_vanishing_point(calib: dict) -> Tuple[float, float]: vx = calib["cu"] + calib["focal_u"] * math.tan(math.radians(calib.get("yaw", 0.0))) vy = calib["cv"] - calib["focal_v"] * math.tan(math.radians(calib.get("pitch", 0.0))) return vx, vy def iter_images(images_dir: Path, glob_pattern: str) -> Iterable[Path]: if not images_dir.is_dir(): raise FileNotFoundError(f"Images directory not found: {images_dir}") for path in sorted(images_dir.glob(glob_pattern)): if path.is_file(): yield path def compute_simul_calib( calib_params: dict, ori_img_size: Tuple[int, int], target_size: Tuple[int, int], crop_center_x: float, crop_center_y: float, target_fx: float, ) -> dict: fx_orig = calib_params["focal_u"] fy_orig = calib_params["focal_v"] cx_orig = calib_params["cu"] cy_orig = calib_params["cv"] distort_coeffs = calib_params.get("distort_coeffs", []) ori_w, ori_h = ori_img_size target_w, target_h = target_size k_orig = np.array( [[fx_orig, 0.0, cx_orig], [0.0, fy_orig, cy_orig], [0.0, 0.0, 1.0]], dtype=np.float64, ) d = np.array(distort_coeffs[:4], dtype=np.float64) if len(distort_coeffs) >= 4 else np.zeros(4, dtype=np.float64) k_new = cv2.fisheye.estimateNewCameraMatrixForUndistortRectify( k_orig, d, (ori_w, ori_h), np.eye(3), balance=0.0, ) fx_undistorted = k_new[0, 0] fy_undistorted = k_new[1, 1] cx_undistorted = k_new[0, 2] cy_undistorted = k_new[1, 2] dist_point = np.array([[[crop_center_x, crop_center_y]]], dtype=np.float32) undist_point = cv2.fisheye.undistortPoints(dist_point, k_orig, d, P=k_new) center_x_undist = float(undist_point[0, 0, 0]) center_y_undist = float(undist_point[0, 0, 1]) max_w = min(center_x_undist * 2.0, (ori_w - center_x_undist) * 2.0) max_h = min(center_y_undist * 2.0, (ori_h - center_y_undist) * 2.0) gcd = math.gcd(target_w, target_h) ratio_w = target_w // gcd ratio_h = target_h // gcd k_from_w = int(max_w / ratio_w) k_from_h = int(max_h / ratio_h) k_max = min(k_from_w, k_from_h) if k_max <= 0: raise ValueError( f"Unable to find a valid crop for target size {target_size} around center " f"({crop_center_x:.2f}, {crop_center_y:.2f})." ) # Unlike the dataloader training path, this offline export tool should also # work when the valid no-black-margin undistorted crop is smaller than the # requested output size. In that case we keep the largest valid crop and # upscale it back to the target size instead of failing. k = k_max crop_w = k * ratio_w crop_h = k * ratio_h crop_x1 = int(center_x_undist - crop_w / 2) crop_y1 = int(center_y_undist - crop_h / 2) crop_x2 = crop_x1 + crop_w crop_y2 = crop_y1 + crop_h scale_x = target_w / crop_w scale_y = target_h / crop_h if not np.isclose(scale_x, scale_y): raise ValueError(f"Non-uniform resize is not supported: scale_x={scale_x}, scale_y={scale_y}") scaled_fx = scale_x * fx_undistorted scaled_fy = scale_y * fy_undistorted scaled_cx = (cx_undistorted - crop_x1) * scale_x scaled_cy = (cy_undistorted - crop_y1) * scale_y return { "fx": float(scaled_fx), "fy": float(scaled_fy), "cx": float(scaled_cx), "cy": float(scaled_cy), "distort_coeffs": [], "depth_scale": float(scaled_fx / target_fx), "crop_bounds": (int(crop_x1), int(crop_y1), int(crop_x2), int(crop_y2)), "scale": (float(scale_x), float(scale_y)), "K_undistorted": k_new, "crop_center_undistorted": (center_x_undist, center_y_undist), "fx_to_target_scale": float(target_fx / scaled_fx), } def apply_simul_transform( img: np.ndarray, simul_calib: dict, calib_params: dict, target_size: Tuple[int, int], ) -> np.ndarray: fx = calib_params["focal_u"] fy = calib_params["focal_v"] cx = calib_params["cu"] cy = calib_params["cv"] distort_coeffs = calib_params.get("distort_coeffs", []) k_orig = np.array( [[fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0]], dtype=np.float64, ) d = np.array(distort_coeffs[:4], dtype=np.float64) if len(distort_coeffs) >= 4 else np.zeros(4, dtype=np.float64) k_new = simul_calib["K_undistorted"] img_undistorted = cv2.fisheye.undistortImage(img, k_orig, d, Knew=k_new) crop_x1, crop_y1, crop_x2, crop_y2 = simul_calib["crop_bounds"] img_cropped = img_undistorted[crop_y1:crop_y2, crop_x1:crop_x2] return cv2.resize(img_cropped, target_size, interpolation=cv2.INTER_LINEAR) def build_output_calib(calib: dict, simul_calib: dict, target_size: Tuple[int, int]) -> dict: target_w, target_h = target_size output_calib = dict(calib) output_calib["focal_u"] = simul_calib["fx"] output_calib["focal_v"] = simul_calib["fy"] output_calib["cu"] = simul_calib["cx"] output_calib["cv"] = simul_calib["cy"] output_calib["distort_coeffs"] = [] output_calib["image_width"] = int(target_w) output_calib["image_height"] = int(target_h) output_calib["img_width"] = int(target_w) output_calib["img_height"] = int(target_h) output_calib["simul_transform"] = { "crop_bounds": list(simul_calib["crop_bounds"]), "scale": list(simul_calib["scale"]), "depth_scale": simul_calib["depth_scale"], "fx_to_target_scale": simul_calib["fx_to_target_scale"], } return output_calib def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Undistort decoded PDCL clip images using the same virtual-calibration logic as dataloaders3d.py." ) parser.add_argument("--case-dir", type=str, default="", help="Case directory containing images/ and calib/L2_calib/camera4.json") parser.add_argument("--images-dir", type=str, default="", help="Input image directory. Used when --case-dir is not set.") parser.add_argument("--calib-file", type=str, default="", help="Input camera4.json path. Used when --case-dir is not set.") parser.add_argument("--output-dir", type=str, required=True, help="Output directory for undistorted data.") parser.add_argument("--glob", type=str, default="*.png", help="Image glob pattern inside images-dir. Default: *.png") parser.add_argument("--target-size", nargs=2, type=int, default=None, metavar=("W", "H"), help="Output image size. Default: use source calibration size.") parser.add_argument("--crop-center-x", type=float, default=None, help="Crop center x in distorted image. Default: image center.") parser.add_argument("--crop-center-y", type=float, default=None, help="Crop center y in distorted image. Default: vanishing point y.") parser.add_argument("--target-fx", type=float, default=None, help="Reference target focal length. Default: use source focal_u.") parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output images.") return parser.parse_args() def resolve_inputs(args: argparse.Namespace) -> Tuple[Path, Path]: if args.case_dir: case_dir = Path(args.case_dir).resolve() images_dir = case_dir / "images" calib_path = case_dir / "calib" / "L2_calib" / "camera4.json" return images_dir, calib_path if not args.images_dir or not args.calib_file: raise ValueError("Either --case-dir or both --images-dir and --calib-file are required.") return Path(args.images_dir).resolve(), Path(args.calib_file).resolve() def main() -> int: args = parse_args() images_dir, calib_path = resolve_inputs(args) output_dir = Path(args.output_dir).resolve() output_images_dir = output_dir / "images" output_calib_dir = output_dir / "calib" / "L2_calib" output_images_dir.mkdir(parents=True, exist_ok=True) output_calib_dir.mkdir(parents=True, exist_ok=True) calib = load_calib(calib_path) ori_img_size = infer_image_size(calib) target_size = tuple(args.target_size) if args.target_size else ori_img_size vp_x, vp_y = compute_vanishing_point(calib) crop_center_x = args.crop_center_x if args.crop_center_x is not None else ori_img_size[0] / 2.0 crop_center_y = args.crop_center_y if args.crop_center_y is not None else vp_y target_fx = args.target_fx if args.target_fx is not None else calib["focal_u"] simul_calib = compute_simul_calib( calib_params=calib, ori_img_size=ori_img_size, target_size=target_size, crop_center_x=crop_center_x, crop_center_y=crop_center_y, target_fx=target_fx, ) output_calib = build_output_calib(calib, simul_calib, target_size) image_paths = list(iter_images(images_dir, args.glob)) if not image_paths: raise FileNotFoundError(f"No images matched {args.glob} in {images_dir}") print(f"Input images : {images_dir}") print(f"Input calib : {calib_path}") print(f"Output dir : {output_dir}") print(f"Image count : {len(image_paths)}") print(f"Target size : {target_size[0]}x{target_size[1]}") print(f"Crop center : ({crop_center_x:.2f}, {crop_center_y:.2f})") print(f"Crop bounds : {simul_calib['crop_bounds']}") print(f"Resize scale : ({simul_calib['scale'][0]:.6f}, {simul_calib['scale'][1]:.6f})") print(f"Output fx/fy : ({simul_calib['fx']:.4f}, {simul_calib['fy']:.4f})") print(f"Output cx/cy : ({simul_calib['cx']:.4f}, {simul_calib['cy']:.4f})") ok_count = 0 for idx, image_path in enumerate(image_paths, start=1): out_path = output_images_dir / image_path.name if out_path.exists() and not args.overwrite: print(f"[{idx}/{len(image_paths)}] SKIP exists: {out_path}") continue image = cv2.imread(str(image_path), cv2.IMREAD_COLOR) if image is None: print(f"[{idx}/{len(image_paths)}] SKIP unreadable: {image_path}") continue transformed = apply_simul_transform( img=image, simul_calib=simul_calib, calib_params=calib, target_size=target_size, ) if not cv2.imwrite(str(out_path), transformed): print(f"[{idx}/{len(image_paths)}] FAIL write: {out_path}") continue ok_count += 1 print(f"[{idx}/{len(image_paths)}] OK -> {out_path}") output_calib_path = output_calib_dir / "camera4.json" with output_calib_path.open("w", encoding="utf-8") as file: json.dump(output_calib, file, indent=2, ensure_ascii=False) manifest = { "input_images": str(images_dir), "input_calib": str(calib_path), "output_images": str(output_images_dir), "output_calib": str(output_calib_path), "source_image_size": list(ori_img_size), "target_size": list(target_size), "source_vanishing_point": [vp_x, vp_y], "crop_center": [crop_center_x, crop_center_y], "crop_bounds": list(simul_calib["crop_bounds"]), "scale": list(simul_calib["scale"]), "output_intrinsics": { "focal_u": simul_calib["fx"], "focal_v": simul_calib["fy"], "cu": simul_calib["cx"], "cv": simul_calib["cy"], }, "processed_images": ok_count, "total_images": len(image_paths), } manifest_path = output_dir / "undistort_manifest.json" with manifest_path.open("w", encoding="utf-8") as file: json.dump(manifest, file, indent=2, ensure_ascii=False) print(f"Completed: {ok_count}/{len(image_paths)} image(s) written.") print(f"Calib : {output_calib_path}") print(f"Manifest : {manifest_path}") return 0 if ok_count > 0 else 1 if __name__ == "__main__": raise SystemExit(main())