266 lines
8.5 KiB
Python
Executable File
266 lines
8.5 KiB
Python
Executable File
"""
|
|
Read calibration parameters for each clip in clip_list.txt and save to a CSV table.
|
|
|
|
Usage:
|
|
python tools/pdcl_inference/read_calib_from_clips.py \
|
|
--clip-list-file tools/pdcl_inference/clip_list.txt \
|
|
--output calib_table.csv
|
|
|
|
Calibration source priority (same as run_batch_merged_infer.py):
|
|
1. Nearby camera4.json found by resolve_calib_file_for_clip()
|
|
2. camera4.json embedded as MCAP attachment
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import os
|
|
import sys
|
|
import traceback
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
import pandas as pd
|
|
|
|
FILE = Path(__file__).resolve()
|
|
ROOT = FILE.parents[2]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.append(str(ROOT))
|
|
|
|
os.environ["STS_UID"] = "dis-uploader"
|
|
os.environ["STS_SECRET_KEY"] = "277310cc09724d315514a79701fecb0f"
|
|
|
|
try:
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
except ImportError:
|
|
pass
|
|
|
|
from tools.pdcl_inference.pdcl_clip_service import PDCLClipService
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Calibration helpers (mirrors run_batch_merged_infer.py logic)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
CALIB_ATTACHMENT_NAMES = (
|
|
"sigmastar.1/calibs/camera4.json",
|
|
"test_data/calibs/camera4.json",
|
|
"calibs/camera4.json",
|
|
)
|
|
|
|
|
|
def resolve_calib_file_for_clip(clip_path: str) -> Optional[Path]:
|
|
"""Search for camera4.json in directories near the clip file (same as run_batch_merged_infer.py)."""
|
|
base = Path(clip_path).resolve()
|
|
search_dir = base.parent
|
|
candidates = []
|
|
for _ in range(4):
|
|
candidates.extend([
|
|
search_dir / "calibs" / "camera4.json",
|
|
search_dir / "test_data" / "calibs" / "camera4.json",
|
|
search_dir / "L2_calib" / "camera4.json",
|
|
search_dir / "calib" / "L2_calib" / "camera4.json",
|
|
])
|
|
parent = search_dir.parent
|
|
if parent == search_dir:
|
|
break
|
|
search_dir = parent
|
|
for p in candidates:
|
|
if p.exists():
|
|
return p
|
|
return None
|
|
|
|
|
|
def read_calib_from_mcap_attachment(clip_path: str) -> Optional[dict]:
|
|
"""Extract camera4.json from MCAP attachments."""
|
|
try:
|
|
from pdcl_pyclip.reader import ClipReader
|
|
except ImportError:
|
|
return None
|
|
|
|
try:
|
|
reader = ClipReader(clip_path)
|
|
for attachment in reader.iter_attachments():
|
|
if attachment.name in CALIB_ATTACHMENT_NAMES:
|
|
return json.loads(attachment.data.decode("utf-8"))
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def get_calib_for_clip(clip_path: str) -> tuple[Optional[dict], str]:
|
|
"""Return (calib_dict, source_description). Returns (None, error_msg) on failure."""
|
|
# Priority 1: nearby file
|
|
nearby = resolve_calib_file_for_clip(clip_path)
|
|
if nearby is not None:
|
|
try:
|
|
with open(nearby) as f:
|
|
return json.load(f), f"file:{nearby}"
|
|
except Exception as e:
|
|
pass # fall through to MCAP attachment
|
|
|
|
# Priority 2: MCAP attachment
|
|
if clip_path.lower().endswith(".mcap"):
|
|
calib = read_calib_from_mcap_attachment(clip_path)
|
|
if calib is not None:
|
|
return calib, "mcap_attachment"
|
|
|
|
return None, "not_found"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Clip list parser (same as run_batch_merged_infer.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_clip_list(clip_list_file: str) -> List[dict]:
|
|
tasks = []
|
|
with open(clip_list_file) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
parts = line.split()
|
|
clip_uuid = parts[0]
|
|
date_name = parts[1] if len(parts) >= 2 else ""
|
|
tasks.append({"clip_uuid": clip_uuid, "date_name": date_name})
|
|
return tasks
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Calibration dict → flat record
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def flatten_calib(calib: dict) -> dict:
|
|
coeffs = calib.get("distort_coeffs", [])
|
|
pos = calib.get("pos", [])
|
|
|
|
fx = calib.get("focal_u")
|
|
fy = calib.get("focal_v")
|
|
cx = calib.get("cu")
|
|
cy = calib.get("cv")
|
|
pitch = calib.get("pitch") # degrees
|
|
yaw = calib.get("yaw", 0.0) # degrees
|
|
|
|
# Vanishing point (same formula as dataloaders3d.py __getitem__):
|
|
# vanish_x = cx + fx * tan(yaw_deg * pi/180)
|
|
# vanish_y = cy - fy * tan(pitch_deg * pi/180)
|
|
if None not in (fx, fy, cx, cy, pitch):
|
|
vanish_x = cx + fx * math.tan(yaw * math.pi / 180)
|
|
vanish_y = cy - fy * math.tan(pitch * math.pi / 180)
|
|
else:
|
|
vanish_x = vanish_y = None
|
|
|
|
return {
|
|
"focal_u": fx,
|
|
"focal_v": fy,
|
|
"cu": cx,
|
|
"cv": cy,
|
|
"pitch": pitch,
|
|
"yaw": yaw,
|
|
"roll": calib.get("roll"),
|
|
"fov": calib.get("fov"),
|
|
"pos_x": pos[0] if len(pos) > 0 else None,
|
|
"pos_y": pos[1] if len(pos) > 1 else None,
|
|
"pos_z": pos[2] if len(pos) > 2 else None,
|
|
"vanish_x": vanish_x,
|
|
"vanish_y": vanish_y,
|
|
"k1": coeffs[0] if len(coeffs) > 0 else None,
|
|
"k2": coeffs[1] if len(coeffs) > 1 else None,
|
|
"k3": coeffs[2] if len(coeffs) > 2 else None,
|
|
"k4": coeffs[3] if len(coeffs) > 3 else None,
|
|
"img_width": calib.get("image_width", calib.get("img_width")),
|
|
"img_height": calib.get("image_height", calib.get("img_height")),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Extract calibration params from PDCL clips to CSV")
|
|
parser.add_argument("--clip-list-file", type=str,
|
|
default=str(FILE.parent / "clip_list.txt"),
|
|
help="Path to clip list text file")
|
|
parser.add_argument("--output", type=str, default="calib_table.csv",
|
|
help="Output CSV path")
|
|
args = parser.parse_args()
|
|
|
|
tasks = parse_clip_list(args.clip_list_file)
|
|
print(f"Found {len(tasks)} clips in {args.clip_list_file}")
|
|
|
|
rows = []
|
|
for idx, task in enumerate(tasks, start=1):
|
|
clip_uuid = task["clip_uuid"]
|
|
date_name = task["date_name"]
|
|
print(f"[{idx}/{len(tasks)}] clip_uuid={clip_uuid} ...", end=" ", flush=True)
|
|
|
|
row = {
|
|
"clip_uuid": clip_uuid,
|
|
"date_name": date_name,
|
|
"clip_path": "",
|
|
"calib_source": "",
|
|
"error": "",
|
|
}
|
|
|
|
try:
|
|
clip_path = PDCLClipService.get_clip_path(clip_uuid)
|
|
if clip_path is None:
|
|
row["error"] = "clip_not_found"
|
|
print("SKIP (clip not found)")
|
|
rows.append(row)
|
|
continue
|
|
|
|
row["clip_path"] = clip_path
|
|
calib, source = get_calib_for_clip(clip_path)
|
|
row["calib_source"] = source
|
|
|
|
if calib is None:
|
|
row["error"] = "calib_not_found"
|
|
print("SKIP (calib not found)")
|
|
else:
|
|
row.update(flatten_calib(calib))
|
|
print(f"OK ({source})")
|
|
|
|
except Exception as exc:
|
|
row["error"] = f"{type(exc).__name__}: {exc}"
|
|
print(f"ERROR: {exc}")
|
|
traceback.print_exc()
|
|
|
|
rows.append(row)
|
|
|
|
df = pd.DataFrame(rows)
|
|
|
|
# Reorder columns for readability
|
|
lead_cols = ["clip_uuid", "date_name",
|
|
"focal_u", "focal_v", "cu", "cv",
|
|
"pitch", "yaw", "roll", "fov",
|
|
"pos_x", "pos_y", "pos_z",
|
|
"vanish_x", "vanish_y",
|
|
"k1", "k2", "k3", "k4",
|
|
"img_width", "img_height", "calib_source", "clip_path", "error"]
|
|
present = [c for c in lead_cols if c in df.columns]
|
|
extra = [c for c in df.columns if c not in present]
|
|
df = df[present + extra]
|
|
|
|
out_path = Path(args.output)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
df.to_csv(out_path, index=False)
|
|
print(f"\nSaved {len(df)} rows → {out_path}")
|
|
|
|
# Also save Excel if openpyxl is available
|
|
try:
|
|
xlsx_path = out_path.with_suffix(".xlsx")
|
|
df.to_excel(xlsx_path, index=False)
|
|
print(f"Saved Excel → {xlsx_path}")
|
|
except Exception:
|
|
pass
|
|
|
|
# Print summary table to console
|
|
print("\n" + df.to_string(index=False))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|