""" Read calibration parameters for each clip in clip_list.txt and save to a CSV table. Usage: python tools/pdcl_inference/read_calib_from_clips.py \ --clip-list-file tools/pdcl_inference/clip_list.txt \ --output calib_table.csv Calibration source priority (same as run_batch_merged_infer.py): 1. Nearby camera4.json found by resolve_calib_file_for_clip() 2. camera4.json embedded as MCAP attachment """ import argparse import json import math import os import sys import traceback from pathlib import Path from typing import List, Optional import pandas as pd FILE = Path(__file__).resolve() ROOT = FILE.parents[2] if str(ROOT) not in sys.path: sys.path.append(str(ROOT)) os.environ["STS_UID"] = "dis-uploader" os.environ["STS_SECRET_KEY"] = "277310cc09724d315514a79701fecb0f" try: from dotenv import load_dotenv load_dotenv() except ImportError: pass from tools.pdcl_inference.pdcl_clip_service import PDCLClipService # --------------------------------------------------------------------------- # Calibration helpers (mirrors run_batch_merged_infer.py logic) # --------------------------------------------------------------------------- CALIB_ATTACHMENT_NAMES = ( "sigmastar.1/calibs/camera4.json", "test_data/calibs/camera4.json", "calibs/camera4.json", ) def resolve_calib_file_for_clip(clip_path: str) -> Optional[Path]: """Search for camera4.json in directories near the clip file (same as run_batch_merged_infer.py).""" base = Path(clip_path).resolve() search_dir = base.parent candidates = [] for _ in range(4): candidates.extend([ search_dir / "calibs" / "camera4.json", search_dir / "test_data" / "calibs" / "camera4.json", search_dir / "L2_calib" / "camera4.json", search_dir / "calib" / "L2_calib" / "camera4.json", ]) parent = search_dir.parent if parent == search_dir: break search_dir = parent for p in candidates: if p.exists(): return p return None def read_calib_from_mcap_attachment(clip_path: str) -> Optional[dict]: """Extract camera4.json from MCAP attachments.""" try: from pdcl_pyclip.reader import ClipReader except ImportError: return None try: reader = ClipReader(clip_path) for attachment in reader.iter_attachments(): if attachment.name in CALIB_ATTACHMENT_NAMES: return json.loads(attachment.data.decode("utf-8")) except Exception: pass return None def get_calib_for_clip(clip_path: str) -> tuple[Optional[dict], str]: """Return (calib_dict, source_description). Returns (None, error_msg) on failure.""" # Priority 1: nearby file nearby = resolve_calib_file_for_clip(clip_path) if nearby is not None: try: with open(nearby) as f: return json.load(f), f"file:{nearby}" except Exception as e: pass # fall through to MCAP attachment # Priority 2: MCAP attachment if clip_path.lower().endswith(".mcap"): calib = read_calib_from_mcap_attachment(clip_path) if calib is not None: return calib, "mcap_attachment" return None, "not_found" # --------------------------------------------------------------------------- # Clip list parser (same as run_batch_merged_infer.py) # --------------------------------------------------------------------------- def parse_clip_list(clip_list_file: str) -> List[dict]: tasks = [] with open(clip_list_file) as f: for line in f: line = line.strip() if not line: continue parts = line.split() clip_uuid = parts[0] date_name = parts[1] if len(parts) >= 2 else "" tasks.append({"clip_uuid": clip_uuid, "date_name": date_name}) return tasks # --------------------------------------------------------------------------- # Calibration dict → flat record # --------------------------------------------------------------------------- def flatten_calib(calib: dict) -> dict: coeffs = calib.get("distort_coeffs", []) pos = calib.get("pos", []) fx = calib.get("focal_u") fy = calib.get("focal_v") cx = calib.get("cu") cy = calib.get("cv") pitch = calib.get("pitch") # degrees yaw = calib.get("yaw", 0.0) # degrees # Vanishing point (same formula as dataloaders3d.py __getitem__): # vanish_x = cx + fx * tan(yaw_deg * pi/180) # vanish_y = cy - fy * tan(pitch_deg * pi/180) if None not in (fx, fy, cx, cy, pitch): vanish_x = cx + fx * math.tan(yaw * math.pi / 180) vanish_y = cy - fy * math.tan(pitch * math.pi / 180) else: vanish_x = vanish_y = None return { "focal_u": fx, "focal_v": fy, "cu": cx, "cv": cy, "pitch": pitch, "yaw": yaw, "roll": calib.get("roll"), "fov": calib.get("fov"), "pos_x": pos[0] if len(pos) > 0 else None, "pos_y": pos[1] if len(pos) > 1 else None, "pos_z": pos[2] if len(pos) > 2 else None, "vanish_x": vanish_x, "vanish_y": vanish_y, "k1": coeffs[0] if len(coeffs) > 0 else None, "k2": coeffs[1] if len(coeffs) > 1 else None, "k3": coeffs[2] if len(coeffs) > 2 else None, "k4": coeffs[3] if len(coeffs) > 3 else None, "img_width": calib.get("image_width", calib.get("img_width")), "img_height": calib.get("image_height", calib.get("img_height")), } # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="Extract calibration params from PDCL clips to CSV") parser.add_argument("--clip-list-file", type=str, default=str(FILE.parent / "clip_list.txt"), help="Path to clip list text file") parser.add_argument("--output", type=str, default="calib_table.csv", help="Output CSV path") args = parser.parse_args() tasks = parse_clip_list(args.clip_list_file) print(f"Found {len(tasks)} clips in {args.clip_list_file}") rows = [] for idx, task in enumerate(tasks, start=1): clip_uuid = task["clip_uuid"] date_name = task["date_name"] print(f"[{idx}/{len(tasks)}] clip_uuid={clip_uuid} ...", end=" ", flush=True) row = { "clip_uuid": clip_uuid, "date_name": date_name, "clip_path": "", "calib_source": "", "error": "", } try: clip_path = PDCLClipService.get_clip_path(clip_uuid) if clip_path is None: row["error"] = "clip_not_found" print("SKIP (clip not found)") rows.append(row) continue row["clip_path"] = clip_path calib, source = get_calib_for_clip(clip_path) row["calib_source"] = source if calib is None: row["error"] = "calib_not_found" print("SKIP (calib not found)") else: row.update(flatten_calib(calib)) print(f"OK ({source})") except Exception as exc: row["error"] = f"{type(exc).__name__}: {exc}" print(f"ERROR: {exc}") traceback.print_exc() rows.append(row) df = pd.DataFrame(rows) # Reorder columns for readability lead_cols = ["clip_uuid", "date_name", "focal_u", "focal_v", "cu", "cv", "pitch", "yaw", "roll", "fov", "pos_x", "pos_y", "pos_z", "vanish_x", "vanish_y", "k1", "k2", "k3", "k4", "img_width", "img_height", "calib_source", "clip_path", "error"] present = [c for c in lead_cols if c in df.columns] extra = [c for c in df.columns if c not in present] df = df[present + extra] out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) df.to_csv(out_path, index=False) print(f"\nSaved {len(df)} rows → {out_path}") # Also save Excel if openpyxl is available try: xlsx_path = out_path.with_suffix(".xlsx") df.to_excel(xlsx_path, index=False) print(f"Saved Excel → {xlsx_path}") except Exception: pass # Print summary table to console print("\n" + df.to_string(index=False)) if __name__ == "__main__": main()