#!/usr/bin/env python3 """Run ROI1 crop-center compensation experiments and compare detection stability.""" from __future__ import annotations import argparse import json import math import re import statistics import subprocess import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] DEFAULT_MODEL = ROOT / "runs" / "export" / "train_mono3d_two_roi_20260416-raw_no_edge" / "merged_model.torchscript" DEFAULT_OUTPUT_ROOT = Path("/data1/dongying/Mono3d/G1Q3/feishu_project/roi1_crop_compensation_experiments") FRAME_FILE_RE = re.compile(r"camera4_(\d+)_") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--baseline-case-dir", required=True, type=Path, help="Baseline inference case output directory.") parser.add_argument("--video-case-dir", required=True, type=Path, help="Input video case path passed to --video-case-dir.") parser.add_argument("--tracking-json", type=Path, default=None, help="Tracking JSON used to derive the reference target trajectory.") parser.add_argument("--track-id", type=int, default=7) parser.add_argument("--frame-id-start", type=int, required=True) parser.add_argument("--frame-id-end", type=int, required=True) parser.add_argument("--alpha", type=float, default=1.0, help="Scale factor applied to the bbox-derived ROI1 compensation.") parser.add_argument("--exported-model", type=Path, default=DEFAULT_MODEL) parser.add_argument("--device", type=str, default="cuda") parser.add_argument("--output-root", type=Path, default=DEFAULT_OUTPUT_ROOT) parser.add_argument("--skip-existing", action="store_true", help="Reuse existing experiment outputs when present.") return parser.parse_args() def load_json(path: Path) -> Any: with path.open("r", encoding="utf-8") as file: return json.load(file) def save_json(path: Path, payload: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as file: json.dump(payload, file, ensure_ascii=False, indent=2) file.write("\n") def coerce_bbox(values: Any) -> tuple[float, float, float, float] | None: if not isinstance(values, (list, tuple)) or len(values) < 4: return None try: x1, y1, x2, y2 = (float(values[0]), float(values[1]), float(values[2]), float(values[3])) except (TypeError, ValueError): return None return x1, y1, x2, y2 def bbox_iou(box_a: tuple[float, float, float, float], box_b: tuple[float, float, float, float]) -> float: ax1, ay1, ax2, ay2 = box_a bx1, by1, bx2, by2 = box_b inter_x1 = max(ax1, bx1) inter_y1 = max(ay1, by1) inter_x2 = min(ax2, bx2) inter_y2 = min(ay2, by2) inter_w = max(0.0, inter_x2 - inter_x1) inter_h = max(0.0, inter_y2 - inter_y1) inter_area = inter_w * inter_h if inter_area <= 0.0: return 0.0 area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1) area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1) denom = area_a + area_b - inter_area return inter_area / denom if denom > 0.0 else 0.0 def center_distance(box_a: tuple[float, float, float, float], box_b: tuple[float, float, float, float]) -> float: acx = (box_a[0] + box_a[2]) * 0.5 acy = (box_a[1] + box_a[3]) * 0.5 bcx = (box_b[0] + box_b[2]) * 0.5 bcy = (box_b[1] + box_b[3]) * 0.5 return math.hypot(acx - bcx, acy - bcy) def frame_id_from_tracking_frame(frame_data: dict[str, Any], fallback_idx: int) -> int: for det in frame_data.get("detections", []): for key in ("frameId", "frame_id"): value = det.get(key) if value is None: continue try: return int(value) except (TypeError, ValueError): continue frame_info = frame_data.get("frame_info") if isinstance(frame_info, dict): for key in ("frame_id", "frameId", "original_frame_id"): value = frame_info.get(key) if value is None: continue try: return int(value) except (TypeError, ValueError): continue return fallback_idx def load_reference_track(tracking_json: Path, track_id: int, frame_id_start: int, frame_id_end: int) -> list[dict[str, Any]]: payload = load_json(tracking_json) frames = payload.get("frames", payload) if isinstance(payload, dict) else payload if not isinstance(frames, list): raise ValueError(f"Unsupported tracking JSON structure in {tracking_json}") reference = [] for frame_idx, frame_data in enumerate(frames): if not isinstance(frame_data, dict): continue frame_id = frame_id_from_tracking_frame(frame_data, frame_idx) if frame_id < frame_id_start or frame_id > frame_id_end: continue for det in frame_data.get("detections", []): if det.get("track_id") != track_id: continue bbox = coerce_bbox(det.get("bbox")) if bbox is None: continue class_id = int(det.get("class_id")) if det.get("class_id") is not None else None reference.append( { "frame_id": frame_id, "frame_idx": frame_idx, "bbox": bbox, "class_id": class_id, "type_name": det.get("type_name"), "y2": bbox[3], } ) break reference.sort(key=lambda item: item["frame_id"]) if not reference: raise FileNotFoundError( f"Track {track_id} not found in {tracking_json} within frame_id [{frame_id_start}, {frame_id_end}]" ) return reference def build_offset_maps( reference_track: list[dict[str, Any]], alpha: float, ) -> tuple[dict[int, float], dict[int, float], dict[int, float]]: ref_y2 = float(reference_track[0]["y2"]) oracle: dict[int, float] = {} causal: dict[int, float] = {} frame_delta: dict[int, float] = {} prev_oracle_offset = 0.0 prev_y2 = ref_y2 for idx, item in enumerate(reference_track): frame_id = int(item["frame_id"]) current_offset = alpha * (float(item["y2"]) - ref_y2) oracle[frame_id] = current_offset causal[frame_id] = prev_oracle_offset if idx > 0 else 0.0 frame_delta[frame_id] = alpha * (float(item["y2"]) - prev_y2) if idx > 0 else 0.0 prev_oracle_offset = current_offset prev_y2 = float(item["y2"]) return oracle, causal, frame_delta def write_offset_map(path: Path, offsets: dict[int, float], metadata: dict[str, Any]) -> None: save_json( path, { "default_offset_px": 0.0, "frame_id_offsets": {str(frame_id): offset for frame_id, offset in offsets.items()}, "metadata": metadata, }, ) def build_prediction_frame_map(predictions_merge_dir: Path) -> dict[int, Path]: frame_map: dict[int, Path] = {} for json_path in sorted(predictions_merge_dir.glob("*.json")): match = FRAME_FILE_RE.search(json_path.name) if not match: continue frame_map[int(match.group(1))] = json_path return frame_map def extract_candidate_records(frame_json_path: Path) -> list[dict[str, Any]]: payload = load_json(frame_json_path) if not isinstance(payload, dict): raise ValueError(f"Unexpected frame prediction structure in {frame_json_path}") candidates = [] for value in payload.values(): if not isinstance(value, dict): continue bbox = coerce_bbox(value.get("box2d")) if bbox is None: continue class_id = value.get("type") try: class_id = int(class_id) if class_id is not None else None except (TypeError, ValueError): class_id = None x_ego = None if isinstance(value.get("box_center_xyz_ego"), list) and value["box_center_xyz_ego"]: try: x_ego = float(value["box_center_xyz_ego"][0]) except (TypeError, ValueError): x_ego = None if x_ego is None and isinstance(value.get("xyzlhwyaw_ego"), list) and value["xyzlhwyaw_ego"]: try: x_ego = float(value["xyzlhwyaw_ego"][0]) except (TypeError, ValueError): x_ego = None if x_ego is None: continue candidates.append( { "bbox": bbox, "class_id": class_id, "type_name": value.get("type_name"), "score": float(value.get("score", 0.0)), "x_ego": x_ego, } ) return candidates def match_reference_detection( frame_json_path: Path, reference_bbox: tuple[float, float, float, float], reference_class_id: int | None, ) -> dict[str, Any] | None: candidates = extract_candidate_records(frame_json_path) same_class = [ candidate for candidate in candidates if reference_class_id is not None and candidate["class_id"] == reference_class_id ] candidate_pool = same_class or candidates if not candidate_pool: return None best = max( candidate_pool, key=lambda candidate: ( bbox_iou(candidate["bbox"], reference_bbox), -center_distance(candidate["bbox"], reference_bbox), candidate["score"], ), ) best = dict(best) best["iou_to_reference"] = bbox_iou(best["bbox"], reference_bbox) return best def collect_variant_series(reference_track: list[dict[str, Any]], predictions_merge_dir: Path) -> list[dict[str, Any]]: frame_map = build_prediction_frame_map(predictions_merge_dir) series: list[dict[str, Any]] = [] for item in reference_track: frame_id = int(item["frame_id"]) frame_json_path = frame_map.get(frame_id) if frame_json_path is None: continue matched = match_reference_detection(frame_json_path, item["bbox"], item["class_id"]) if matched is None: continue bbox = matched["bbox"] series.append( { "frame_id": frame_id, "x_ego": float(matched["x_ego"]), "score": float(matched["score"]), "iou_to_reference": float(matched["iou_to_reference"]), "bbox": bbox, "y2": bbox[3], "cy": (bbox[1] + bbox[3]) * 0.5, "w": bbox[2] - bbox[0], "h": bbox[3] - bbox[1], } ) series.sort(key=lambda item: item["frame_id"]) return series def percentile(sorted_values: list[float], q: float) -> float: if not sorted_values: raise ValueError("sorted_values must not be empty") if q <= 0: return sorted_values[0] if q >= 1: return sorted_values[-1] index = max(0, math.ceil(q * len(sorted_values)) - 1) return sorted_values[index] def compute_series_metrics(series: list[dict[str, Any]]) -> dict[str, Any]: if len(series) < 2: raise ValueError("Series must contain at least two samples") x_values = [item["x_ego"] for item in series] y2_values = [item["y2"] for item in series] cy_values = [item["cy"] for item in series] ious = [item["iou_to_reference"] for item in series] scores = [item["score"] for item in series] dx = [x_values[idx] - x_values[idx - 1] for idx in range(1, len(x_values))] dy2 = [y2_values[idx] - y2_values[idx - 1] for idx in range(1, len(y2_values))] dcy = [cy_values[idx] - cy_values[idx - 1] for idx in range(1, len(cy_values))] abs_dx = sorted(abs(value) for value in dx) abs_dy2 = sorted(abs(value) for value in dy2) abs_dcy = sorted(abs(value) for value in dcy) window = 5 local_dev = [] for idx in range(window, len(x_values) - window): local_mean = sum(x_values[idx - window : idx + window + 1]) / (2 * window + 1) local_dev.append(abs(x_values[idx] - local_mean)) local_dev_sorted = sorted(local_dev) if local_dev else [0.0] return { "samples": len(series), "frame_id_start": int(series[0]["frame_id"]), "frame_id_end": int(series[-1]["frame_id"]), "x_start": float(x_values[0]), "x_end": float(x_values[-1]), "x_change": float(x_values[-1] - x_values[0]), "abs_dx_mean": float(statistics.mean(abs(value) for value in dx)), "abs_dx_p95": float(percentile(abs_dx, 0.95)), "abs_dx_max": float(max(abs_dx)), "abs_dy2_mean": float(statistics.mean(abs(value) for value in dy2)), "abs_dy2_p95": float(percentile(abs_dy2, 0.95)), "abs_dcy_mean": float(statistics.mean(abs(value) for value in dcy)), "abs_dcy_p95": float(percentile(abs_dcy, 0.95)), "local_dev_mean": float(statistics.mean(local_dev_sorted)), "local_dev_p95": float(percentile(local_dev_sorted, 0.95)), "local_dev_max": float(max(local_dev_sorted)), "mean_iou_to_reference": float(statistics.mean(ious)), "min_iou_to_reference": float(min(ious)), "mean_score": float(statistics.mean(scores)), } def run_inference_variant( *, video_case_dir: Path, output_dir: Path, frame_id_start: int, frame_id_end: int, exported_model: Path, device: str, offset_map_path: Path | None = None, ) -> None: cmd = [ sys.executable, str(ROOT / "tools" / "model_inference" / "core" / "run_two_roi_exported_onnx_infer.py"), "--video-case-dir", str(video_case_dir), "--output-dir", str(output_dir), "--frame-id-start", str(frame_id_start), "--frame-id-end", str(frame_id_end), "--video-stride", "1", "--exported-model", str(exported_model), "--device", device, "--enable-cross-class-merge-prior", "--save-aggregate-predictions", ] if offset_map_path is not None: cmd.extend(["--roi1-crop-center-y-offset-map", str(offset_map_path)]) subprocess.run(cmd, check=True, cwd=str(ROOT)) def build_report_payload( *, args: argparse.Namespace, reference_track: list[dict[str, Any]], baseline_metrics: dict[str, Any], oracle_metrics: dict[str, Any], causal_metrics: dict[str, Any], frame_delta_metrics: dict[str, Any], oracle_offsets_path: Path, causal_offsets_path: Path, frame_delta_offsets_path: Path, baseline_dir: Path, oracle_dir: Path, causal_dir: Path, frame_delta_dir: Path, ) -> dict[str, Any]: baseline_abs_dx_p95 = baseline_metrics["abs_dx_p95"] baseline_local_dev_p95 = baseline_metrics["local_dev_p95"] def compare_metrics(metrics: dict[str, Any]) -> dict[str, Any]: return { "abs_dx_p95_delta": float(metrics["abs_dx_p95"] - baseline_abs_dx_p95), "abs_dx_p95_reduction_ratio": float((baseline_abs_dx_p95 - metrics["abs_dx_p95"]) / baseline_abs_dx_p95) if baseline_abs_dx_p95 > 0 else 0.0, "local_dev_p95_delta": float(metrics["local_dev_p95"] - baseline_local_dev_p95), "local_dev_p95_reduction_ratio": float((baseline_local_dev_p95 - metrics["local_dev_p95"]) / baseline_local_dev_p95) if baseline_local_dev_p95 > 0 else 0.0, } return { "track_id": int(args.track_id), "frame_id_start": int(args.frame_id_start), "frame_id_end": int(args.frame_id_end), "alpha": float(args.alpha), "exported_model": str(args.exported_model.resolve()), "baseline_case_dir": str(baseline_dir.resolve()), "video_case_dir": str(args.video_case_dir.resolve()), "reference_track_frames": len(reference_track), "oracle_offset_map": str(oracle_offsets_path.resolve()), "causal_offset_map": str(causal_offsets_path.resolve()), "frame_delta_offset_map": str(frame_delta_offsets_path.resolve()), "variants": { "baseline": { "output_dir": str(baseline_dir.resolve()), "metrics": baseline_metrics, }, "oracle": { "output_dir": str(oracle_dir.resolve()), "metrics": oracle_metrics, "comparison_to_baseline": compare_metrics(oracle_metrics), }, "causal": { "output_dir": str(causal_dir.resolve()), "metrics": causal_metrics, "comparison_to_baseline": compare_metrics(causal_metrics), }, "frame_delta": { "output_dir": str(frame_delta_dir.resolve()), "metrics": frame_delta_metrics, "comparison_to_baseline": compare_metrics(frame_delta_metrics), }, }, } def main() -> int: args = parse_args() baseline_case_dir = args.baseline_case_dir.resolve() tracking_json = args.tracking_json.resolve() if args.tracking_json else baseline_case_dir / "merge.json" reference_track = load_reference_track( tracking_json=tracking_json, track_id=args.track_id, frame_id_start=args.frame_id_start, frame_id_end=args.frame_id_end, ) oracle_offsets, causal_offsets, frame_delta_offsets = build_offset_maps(reference_track, alpha=args.alpha) case_tag = f"{baseline_case_dir.name}_track{args.track_id}_f{args.frame_id_start}_{args.frame_id_end}_a{str(args.alpha).replace('.', 'p')}" output_root = args.output_root.resolve() / case_tag configs_dir = output_root / "configs" oracle_offsets_path = configs_dir / "oracle_roi1_offsets.json" causal_offsets_path = configs_dir / "causal_prev_frame_roi1_offsets.json" frame_delta_offsets_path = configs_dir / "frame_delta_roi1_offsets.json" oracle_dir = output_root / "oracle" causal_dir = output_root / "causal" frame_delta_dir = output_root / "frame_delta" report_path = output_root / "experiment_summary.json" write_offset_map( oracle_offsets_path, oracle_offsets, { "mode": "same_frame_oracle", "track_id": args.track_id, "alpha": args.alpha, "reference_frame_id": reference_track[0]["frame_id"], "reference_y2": reference_track[0]["y2"], }, ) write_offset_map( causal_offsets_path, causal_offsets, { "mode": "previous_frame_causal", "track_id": args.track_id, "alpha": args.alpha, "reference_frame_id": reference_track[0]["frame_id"], "reference_y2": reference_track[0]["y2"], }, ) write_offset_map( frame_delta_offsets_path, frame_delta_offsets, { "mode": "same_frame_prev_delta", "track_id": args.track_id, "alpha": args.alpha, "reference_frame_id": reference_track[0]["frame_id"], "reference_y2": reference_track[0]["y2"], }, ) if not args.skip_existing or not (oracle_dir / "predictions" / "merge").is_dir(): run_inference_variant( video_case_dir=args.video_case_dir.resolve(), output_dir=oracle_dir, frame_id_start=args.frame_id_start, frame_id_end=args.frame_id_end, exported_model=args.exported_model.resolve(), device=args.device, offset_map_path=oracle_offsets_path, ) if not args.skip_existing or not (causal_dir / "predictions" / "merge").is_dir(): run_inference_variant( video_case_dir=args.video_case_dir.resolve(), output_dir=causal_dir, frame_id_start=args.frame_id_start, frame_id_end=args.frame_id_end, exported_model=args.exported_model.resolve(), device=args.device, offset_map_path=causal_offsets_path, ) if not args.skip_existing or not (frame_delta_dir / "predictions" / "merge").is_dir(): run_inference_variant( video_case_dir=args.video_case_dir.resolve(), output_dir=frame_delta_dir, frame_id_start=args.frame_id_start, frame_id_end=args.frame_id_end, exported_model=args.exported_model.resolve(), device=args.device, offset_map_path=frame_delta_offsets_path, ) baseline_series = collect_variant_series(reference_track, baseline_case_dir / "predictions" / "merge") oracle_series = collect_variant_series(reference_track, oracle_dir / "predictions" / "merge") causal_series = collect_variant_series(reference_track, causal_dir / "predictions" / "merge") frame_delta_series = collect_variant_series(reference_track, frame_delta_dir / "predictions" / "merge") baseline_metrics = compute_series_metrics(baseline_series) oracle_metrics = compute_series_metrics(oracle_series) causal_metrics = compute_series_metrics(causal_series) frame_delta_metrics = compute_series_metrics(frame_delta_series) report_payload = build_report_payload( args=args, reference_track=reference_track, baseline_metrics=baseline_metrics, oracle_metrics=oracle_metrics, causal_metrics=causal_metrics, frame_delta_metrics=frame_delta_metrics, oracle_offsets_path=oracle_offsets_path, causal_offsets_path=causal_offsets_path, frame_delta_offsets_path=frame_delta_offsets_path, baseline_dir=baseline_case_dir, oracle_dir=oracle_dir, causal_dir=causal_dir, frame_delta_dir=frame_delta_dir, ) save_json(report_path, report_payload) print("") print("ROI1 crop compensation experiment summary") print(f"summary_json: {report_path}") print(f"baseline abs_dx_p95 : {baseline_metrics['abs_dx_p95']:.4f} m/frame") print(f"oracle abs_dx_p95 : {oracle_metrics['abs_dx_p95']:.4f} m/frame") print(f"causal abs_dx_p95 : {causal_metrics['abs_dx_p95']:.4f} m/frame") print(f"delta abs_dx_p95 : {frame_delta_metrics['abs_dx_p95']:.4f} m/frame") print(f"baseline local_dev_p95 : {baseline_metrics['local_dev_p95']:.4f} m") print(f"oracle local_dev_p95 : {oracle_metrics['local_dev_p95']:.4f} m") print(f"causal local_dev_p95 : {causal_metrics['local_dev_p95']:.4f} m") print(f"delta local_dev_p95 : {frame_delta_metrics['local_dev_p95']:.4f} m") return 0 if __name__ == "__main__": sys.exit(main())