Files
yolov26_3d/tools/pdcl_inference/read_calib_from_clips.py
2026-06-24 09:35:46 +08:00

266 lines
8.5 KiB
Python
Executable File

"""
Read calibration parameters for each clip in clip_list.txt and save to a CSV table.
Usage:
python tools/pdcl_inference/read_calib_from_clips.py \
--clip-list-file tools/pdcl_inference/clip_list.txt \
--output calib_table.csv
Calibration source priority (same as run_batch_merged_infer.py):
1. Nearby camera4.json found by resolve_calib_file_for_clip()
2. camera4.json embedded as MCAP attachment
"""
import argparse
import json
import math
import os
import sys
import traceback
from pathlib import Path
from typing import List, Optional
import pandas as pd
FILE = Path(__file__).resolve()
ROOT = FILE.parents[2]
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT))
os.environ["STS_UID"] = "dis-uploader"
os.environ["STS_SECRET_KEY"] = "277310cc09724d315514a79701fecb0f"
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
from tools.pdcl_inference.pdcl_clip_service import PDCLClipService
# ---------------------------------------------------------------------------
# Calibration helpers (mirrors run_batch_merged_infer.py logic)
# ---------------------------------------------------------------------------
CALIB_ATTACHMENT_NAMES = (
"sigmastar.1/calibs/camera4.json",
"test_data/calibs/camera4.json",
"calibs/camera4.json",
)
def resolve_calib_file_for_clip(clip_path: str) -> Optional[Path]:
"""Search for camera4.json in directories near the clip file (same as run_batch_merged_infer.py)."""
base = Path(clip_path).resolve()
search_dir = base.parent
candidates = []
for _ in range(4):
candidates.extend([
search_dir / "calibs" / "camera4.json",
search_dir / "test_data" / "calibs" / "camera4.json",
search_dir / "L2_calib" / "camera4.json",
search_dir / "calib" / "L2_calib" / "camera4.json",
])
parent = search_dir.parent
if parent == search_dir:
break
search_dir = parent
for p in candidates:
if p.exists():
return p
return None
def read_calib_from_mcap_attachment(clip_path: str) -> Optional[dict]:
"""Extract camera4.json from MCAP attachments."""
try:
from pdcl_pyclip.reader import ClipReader
except ImportError:
return None
try:
reader = ClipReader(clip_path)
for attachment in reader.iter_attachments():
if attachment.name in CALIB_ATTACHMENT_NAMES:
return json.loads(attachment.data.decode("utf-8"))
except Exception:
pass
return None
def get_calib_for_clip(clip_path: str) -> tuple[Optional[dict], str]:
"""Return (calib_dict, source_description). Returns (None, error_msg) on failure."""
# Priority 1: nearby file
nearby = resolve_calib_file_for_clip(clip_path)
if nearby is not None:
try:
with open(nearby) as f:
return json.load(f), f"file:{nearby}"
except Exception as e:
pass # fall through to MCAP attachment
# Priority 2: MCAP attachment
if clip_path.lower().endswith(".mcap"):
calib = read_calib_from_mcap_attachment(clip_path)
if calib is not None:
return calib, "mcap_attachment"
return None, "not_found"
# ---------------------------------------------------------------------------
# Clip list parser (same as run_batch_merged_infer.py)
# ---------------------------------------------------------------------------
def parse_clip_list(clip_list_file: str) -> List[dict]:
tasks = []
with open(clip_list_file) as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split()
clip_uuid = parts[0]
date_name = parts[1] if len(parts) >= 2 else ""
tasks.append({"clip_uuid": clip_uuid, "date_name": date_name})
return tasks
# ---------------------------------------------------------------------------
# Calibration dict → flat record
# ---------------------------------------------------------------------------
def flatten_calib(calib: dict) -> dict:
coeffs = calib.get("distort_coeffs", [])
pos = calib.get("pos", [])
fx = calib.get("focal_u")
fy = calib.get("focal_v")
cx = calib.get("cu")
cy = calib.get("cv")
pitch = calib.get("pitch") # degrees
yaw = calib.get("yaw", 0.0) # degrees
# Vanishing point (same formula as dataloaders3d.py __getitem__):
# vanish_x = cx + fx * tan(yaw_deg * pi/180)
# vanish_y = cy - fy * tan(pitch_deg * pi/180)
if None not in (fx, fy, cx, cy, pitch):
vanish_x = cx + fx * math.tan(yaw * math.pi / 180)
vanish_y = cy - fy * math.tan(pitch * math.pi / 180)
else:
vanish_x = vanish_y = None
return {
"focal_u": fx,
"focal_v": fy,
"cu": cx,
"cv": cy,
"pitch": pitch,
"yaw": yaw,
"roll": calib.get("roll"),
"fov": calib.get("fov"),
"pos_x": pos[0] if len(pos) > 0 else None,
"pos_y": pos[1] if len(pos) > 1 else None,
"pos_z": pos[2] if len(pos) > 2 else None,
"vanish_x": vanish_x,
"vanish_y": vanish_y,
"k1": coeffs[0] if len(coeffs) > 0 else None,
"k2": coeffs[1] if len(coeffs) > 1 else None,
"k3": coeffs[2] if len(coeffs) > 2 else None,
"k4": coeffs[3] if len(coeffs) > 3 else None,
"img_width": calib.get("image_width", calib.get("img_width")),
"img_height": calib.get("image_height", calib.get("img_height")),
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Extract calibration params from PDCL clips to CSV")
parser.add_argument("--clip-list-file", type=str,
default=str(FILE.parent / "clip_list.txt"),
help="Path to clip list text file")
parser.add_argument("--output", type=str, default="calib_table.csv",
help="Output CSV path")
args = parser.parse_args()
tasks = parse_clip_list(args.clip_list_file)
print(f"Found {len(tasks)} clips in {args.clip_list_file}")
rows = []
for idx, task in enumerate(tasks, start=1):
clip_uuid = task["clip_uuid"]
date_name = task["date_name"]
print(f"[{idx}/{len(tasks)}] clip_uuid={clip_uuid} ...", end=" ", flush=True)
row = {
"clip_uuid": clip_uuid,
"date_name": date_name,
"clip_path": "",
"calib_source": "",
"error": "",
}
try:
clip_path = PDCLClipService.get_clip_path(clip_uuid)
if clip_path is None:
row["error"] = "clip_not_found"
print("SKIP (clip not found)")
rows.append(row)
continue
row["clip_path"] = clip_path
calib, source = get_calib_for_clip(clip_path)
row["calib_source"] = source
if calib is None:
row["error"] = "calib_not_found"
print("SKIP (calib not found)")
else:
row.update(flatten_calib(calib))
print(f"OK ({source})")
except Exception as exc:
row["error"] = f"{type(exc).__name__}: {exc}"
print(f"ERROR: {exc}")
traceback.print_exc()
rows.append(row)
df = pd.DataFrame(rows)
# Reorder columns for readability
lead_cols = ["clip_uuid", "date_name",
"focal_u", "focal_v", "cu", "cv",
"pitch", "yaw", "roll", "fov",
"pos_x", "pos_y", "pos_z",
"vanish_x", "vanish_y",
"k1", "k2", "k3", "k4",
"img_width", "img_height", "calib_source", "clip_path", "error"]
present = [c for c in lead_cols if c in df.columns]
extra = [c for c in df.columns if c not in present]
df = df[present + extra]
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(out_path, index=False)
print(f"\nSaved {len(df)} rows → {out_path}")
# Also save Excel if openpyxl is available
try:
xlsx_path = out_path.with_suffix(".xlsx")
df.to_excel(xlsx_path, index=False)
print(f"Saved Excel → {xlsx_path}")
except Exception:
pass
# Print summary table to console
print("\n" + df.to_string(index=False))
if __name__ == "__main__":
main()