Files
yolov26_3d/tools/feishu_project/run_roi1_crop_compensation_experiment.py
2026-06-24 09:35:46 +08:00

584 lines
22 KiB
Python
Executable File

#!/usr/bin/env python3
"""Run ROI1 crop-center compensation experiments and compare detection stability."""
from __future__ import annotations
import argparse
import json
import math
import re
import statistics
import subprocess
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_MODEL = ROOT / "runs" / "export" / "train_mono3d_two_roi_20260416-raw_no_edge" / "merged_model.torchscript"
DEFAULT_OUTPUT_ROOT = Path("/data1/dongying/Mono3d/G1Q3/feishu_project/roi1_crop_compensation_experiments")
FRAME_FILE_RE = re.compile(r"camera4_(\d+)_")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--baseline-case-dir", required=True, type=Path, help="Baseline inference case output directory.")
parser.add_argument("--video-case-dir", required=True, type=Path, help="Input video case path passed to --video-case-dir.")
parser.add_argument("--tracking-json", type=Path, default=None, help="Tracking JSON used to derive the reference target trajectory.")
parser.add_argument("--track-id", type=int, default=7)
parser.add_argument("--frame-id-start", type=int, required=True)
parser.add_argument("--frame-id-end", type=int, required=True)
parser.add_argument("--alpha", type=float, default=1.0, help="Scale factor applied to the bbox-derived ROI1 compensation.")
parser.add_argument("--exported-model", type=Path, default=DEFAULT_MODEL)
parser.add_argument("--device", type=str, default="cuda")
parser.add_argument("--output-root", type=Path, default=DEFAULT_OUTPUT_ROOT)
parser.add_argument("--skip-existing", action="store_true", help="Reuse existing experiment outputs when present.")
return parser.parse_args()
def load_json(path: Path) -> Any:
with path.open("r", encoding="utf-8") as file:
return json.load(file)
def save_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as file:
json.dump(payload, file, ensure_ascii=False, indent=2)
file.write("\n")
def coerce_bbox(values: Any) -> tuple[float, float, float, float] | None:
if not isinstance(values, (list, tuple)) or len(values) < 4:
return None
try:
x1, y1, x2, y2 = (float(values[0]), float(values[1]), float(values[2]), float(values[3]))
except (TypeError, ValueError):
return None
return x1, y1, x2, y2
def bbox_iou(box_a: tuple[float, float, float, float], box_b: tuple[float, float, float, float]) -> float:
ax1, ay1, ax2, ay2 = box_a
bx1, by1, bx2, by2 = box_b
inter_x1 = max(ax1, bx1)
inter_y1 = max(ay1, by1)
inter_x2 = min(ax2, bx2)
inter_y2 = min(ay2, by2)
inter_w = max(0.0, inter_x2 - inter_x1)
inter_h = max(0.0, inter_y2 - inter_y1)
inter_area = inter_w * inter_h
if inter_area <= 0.0:
return 0.0
area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1)
area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1)
denom = area_a + area_b - inter_area
return inter_area / denom if denom > 0.0 else 0.0
def center_distance(box_a: tuple[float, float, float, float], box_b: tuple[float, float, float, float]) -> float:
acx = (box_a[0] + box_a[2]) * 0.5
acy = (box_a[1] + box_a[3]) * 0.5
bcx = (box_b[0] + box_b[2]) * 0.5
bcy = (box_b[1] + box_b[3]) * 0.5
return math.hypot(acx - bcx, acy - bcy)
def frame_id_from_tracking_frame(frame_data: dict[str, Any], fallback_idx: int) -> int:
for det in frame_data.get("detections", []):
for key in ("frameId", "frame_id"):
value = det.get(key)
if value is None:
continue
try:
return int(value)
except (TypeError, ValueError):
continue
frame_info = frame_data.get("frame_info")
if isinstance(frame_info, dict):
for key in ("frame_id", "frameId", "original_frame_id"):
value = frame_info.get(key)
if value is None:
continue
try:
return int(value)
except (TypeError, ValueError):
continue
return fallback_idx
def load_reference_track(tracking_json: Path, track_id: int, frame_id_start: int, frame_id_end: int) -> list[dict[str, Any]]:
payload = load_json(tracking_json)
frames = payload.get("frames", payload) if isinstance(payload, dict) else payload
if not isinstance(frames, list):
raise ValueError(f"Unsupported tracking JSON structure in {tracking_json}")
reference = []
for frame_idx, frame_data in enumerate(frames):
if not isinstance(frame_data, dict):
continue
frame_id = frame_id_from_tracking_frame(frame_data, frame_idx)
if frame_id < frame_id_start or frame_id > frame_id_end:
continue
for det in frame_data.get("detections", []):
if det.get("track_id") != track_id:
continue
bbox = coerce_bbox(det.get("bbox"))
if bbox is None:
continue
class_id = int(det.get("class_id")) if det.get("class_id") is not None else None
reference.append(
{
"frame_id": frame_id,
"frame_idx": frame_idx,
"bbox": bbox,
"class_id": class_id,
"type_name": det.get("type_name"),
"y2": bbox[3],
}
)
break
reference.sort(key=lambda item: item["frame_id"])
if not reference:
raise FileNotFoundError(
f"Track {track_id} not found in {tracking_json} within frame_id [{frame_id_start}, {frame_id_end}]"
)
return reference
def build_offset_maps(
reference_track: list[dict[str, Any]],
alpha: float,
) -> tuple[dict[int, float], dict[int, float], dict[int, float]]:
ref_y2 = float(reference_track[0]["y2"])
oracle: dict[int, float] = {}
causal: dict[int, float] = {}
frame_delta: dict[int, float] = {}
prev_oracle_offset = 0.0
prev_y2 = ref_y2
for idx, item in enumerate(reference_track):
frame_id = int(item["frame_id"])
current_offset = alpha * (float(item["y2"]) - ref_y2)
oracle[frame_id] = current_offset
causal[frame_id] = prev_oracle_offset if idx > 0 else 0.0
frame_delta[frame_id] = alpha * (float(item["y2"]) - prev_y2) if idx > 0 else 0.0
prev_oracle_offset = current_offset
prev_y2 = float(item["y2"])
return oracle, causal, frame_delta
def write_offset_map(path: Path, offsets: dict[int, float], metadata: dict[str, Any]) -> None:
save_json(
path,
{
"default_offset_px": 0.0,
"frame_id_offsets": {str(frame_id): offset for frame_id, offset in offsets.items()},
"metadata": metadata,
},
)
def build_prediction_frame_map(predictions_merge_dir: Path) -> dict[int, Path]:
frame_map: dict[int, Path] = {}
for json_path in sorted(predictions_merge_dir.glob("*.json")):
match = FRAME_FILE_RE.search(json_path.name)
if not match:
continue
frame_map[int(match.group(1))] = json_path
return frame_map
def extract_candidate_records(frame_json_path: Path) -> list[dict[str, Any]]:
payload = load_json(frame_json_path)
if not isinstance(payload, dict):
raise ValueError(f"Unexpected frame prediction structure in {frame_json_path}")
candidates = []
for value in payload.values():
if not isinstance(value, dict):
continue
bbox = coerce_bbox(value.get("box2d"))
if bbox is None:
continue
class_id = value.get("type")
try:
class_id = int(class_id) if class_id is not None else None
except (TypeError, ValueError):
class_id = None
x_ego = None
if isinstance(value.get("box_center_xyz_ego"), list) and value["box_center_xyz_ego"]:
try:
x_ego = float(value["box_center_xyz_ego"][0])
except (TypeError, ValueError):
x_ego = None
if x_ego is None and isinstance(value.get("xyzlhwyaw_ego"), list) and value["xyzlhwyaw_ego"]:
try:
x_ego = float(value["xyzlhwyaw_ego"][0])
except (TypeError, ValueError):
x_ego = None
if x_ego is None:
continue
candidates.append(
{
"bbox": bbox,
"class_id": class_id,
"type_name": value.get("type_name"),
"score": float(value.get("score", 0.0)),
"x_ego": x_ego,
}
)
return candidates
def match_reference_detection(
frame_json_path: Path,
reference_bbox: tuple[float, float, float, float],
reference_class_id: int | None,
) -> dict[str, Any] | None:
candidates = extract_candidate_records(frame_json_path)
same_class = [
candidate for candidate in candidates if reference_class_id is not None and candidate["class_id"] == reference_class_id
]
candidate_pool = same_class or candidates
if not candidate_pool:
return None
best = max(
candidate_pool,
key=lambda candidate: (
bbox_iou(candidate["bbox"], reference_bbox),
-center_distance(candidate["bbox"], reference_bbox),
candidate["score"],
),
)
best = dict(best)
best["iou_to_reference"] = bbox_iou(best["bbox"], reference_bbox)
return best
def collect_variant_series(reference_track: list[dict[str, Any]], predictions_merge_dir: Path) -> list[dict[str, Any]]:
frame_map = build_prediction_frame_map(predictions_merge_dir)
series: list[dict[str, Any]] = []
for item in reference_track:
frame_id = int(item["frame_id"])
frame_json_path = frame_map.get(frame_id)
if frame_json_path is None:
continue
matched = match_reference_detection(frame_json_path, item["bbox"], item["class_id"])
if matched is None:
continue
bbox = matched["bbox"]
series.append(
{
"frame_id": frame_id,
"x_ego": float(matched["x_ego"]),
"score": float(matched["score"]),
"iou_to_reference": float(matched["iou_to_reference"]),
"bbox": bbox,
"y2": bbox[3],
"cy": (bbox[1] + bbox[3]) * 0.5,
"w": bbox[2] - bbox[0],
"h": bbox[3] - bbox[1],
}
)
series.sort(key=lambda item: item["frame_id"])
return series
def percentile(sorted_values: list[float], q: float) -> float:
if not sorted_values:
raise ValueError("sorted_values must not be empty")
if q <= 0:
return sorted_values[0]
if q >= 1:
return sorted_values[-1]
index = max(0, math.ceil(q * len(sorted_values)) - 1)
return sorted_values[index]
def compute_series_metrics(series: list[dict[str, Any]]) -> dict[str, Any]:
if len(series) < 2:
raise ValueError("Series must contain at least two samples")
x_values = [item["x_ego"] for item in series]
y2_values = [item["y2"] for item in series]
cy_values = [item["cy"] for item in series]
ious = [item["iou_to_reference"] for item in series]
scores = [item["score"] for item in series]
dx = [x_values[idx] - x_values[idx - 1] for idx in range(1, len(x_values))]
dy2 = [y2_values[idx] - y2_values[idx - 1] for idx in range(1, len(y2_values))]
dcy = [cy_values[idx] - cy_values[idx - 1] for idx in range(1, len(cy_values))]
abs_dx = sorted(abs(value) for value in dx)
abs_dy2 = sorted(abs(value) for value in dy2)
abs_dcy = sorted(abs(value) for value in dcy)
window = 5
local_dev = []
for idx in range(window, len(x_values) - window):
local_mean = sum(x_values[idx - window : idx + window + 1]) / (2 * window + 1)
local_dev.append(abs(x_values[idx] - local_mean))
local_dev_sorted = sorted(local_dev) if local_dev else [0.0]
return {
"samples": len(series),
"frame_id_start": int(series[0]["frame_id"]),
"frame_id_end": int(series[-1]["frame_id"]),
"x_start": float(x_values[0]),
"x_end": float(x_values[-1]),
"x_change": float(x_values[-1] - x_values[0]),
"abs_dx_mean": float(statistics.mean(abs(value) for value in dx)),
"abs_dx_p95": float(percentile(abs_dx, 0.95)),
"abs_dx_max": float(max(abs_dx)),
"abs_dy2_mean": float(statistics.mean(abs(value) for value in dy2)),
"abs_dy2_p95": float(percentile(abs_dy2, 0.95)),
"abs_dcy_mean": float(statistics.mean(abs(value) for value in dcy)),
"abs_dcy_p95": float(percentile(abs_dcy, 0.95)),
"local_dev_mean": float(statistics.mean(local_dev_sorted)),
"local_dev_p95": float(percentile(local_dev_sorted, 0.95)),
"local_dev_max": float(max(local_dev_sorted)),
"mean_iou_to_reference": float(statistics.mean(ious)),
"min_iou_to_reference": float(min(ious)),
"mean_score": float(statistics.mean(scores)),
}
def run_inference_variant(
*,
video_case_dir: Path,
output_dir: Path,
frame_id_start: int,
frame_id_end: int,
exported_model: Path,
device: str,
offset_map_path: Path | None = None,
) -> None:
cmd = [
sys.executable,
str(ROOT / "tools" / "model_inference" / "core" / "run_two_roi_exported_onnx_infer.py"),
"--video-case-dir",
str(video_case_dir),
"--output-dir",
str(output_dir),
"--frame-id-start",
str(frame_id_start),
"--frame-id-end",
str(frame_id_end),
"--video-stride",
"1",
"--exported-model",
str(exported_model),
"--device",
device,
"--enable-cross-class-merge-prior",
"--save-aggregate-predictions",
]
if offset_map_path is not None:
cmd.extend(["--roi1-crop-center-y-offset-map", str(offset_map_path)])
subprocess.run(cmd, check=True, cwd=str(ROOT))
def build_report_payload(
*,
args: argparse.Namespace,
reference_track: list[dict[str, Any]],
baseline_metrics: dict[str, Any],
oracle_metrics: dict[str, Any],
causal_metrics: dict[str, Any],
frame_delta_metrics: dict[str, Any],
oracle_offsets_path: Path,
causal_offsets_path: Path,
frame_delta_offsets_path: Path,
baseline_dir: Path,
oracle_dir: Path,
causal_dir: Path,
frame_delta_dir: Path,
) -> dict[str, Any]:
baseline_abs_dx_p95 = baseline_metrics["abs_dx_p95"]
baseline_local_dev_p95 = baseline_metrics["local_dev_p95"]
def compare_metrics(metrics: dict[str, Any]) -> dict[str, Any]:
return {
"abs_dx_p95_delta": float(metrics["abs_dx_p95"] - baseline_abs_dx_p95),
"abs_dx_p95_reduction_ratio": float((baseline_abs_dx_p95 - metrics["abs_dx_p95"]) / baseline_abs_dx_p95)
if baseline_abs_dx_p95 > 0
else 0.0,
"local_dev_p95_delta": float(metrics["local_dev_p95"] - baseline_local_dev_p95),
"local_dev_p95_reduction_ratio": float((baseline_local_dev_p95 - metrics["local_dev_p95"]) / baseline_local_dev_p95)
if baseline_local_dev_p95 > 0
else 0.0,
}
return {
"track_id": int(args.track_id),
"frame_id_start": int(args.frame_id_start),
"frame_id_end": int(args.frame_id_end),
"alpha": float(args.alpha),
"exported_model": str(args.exported_model.resolve()),
"baseline_case_dir": str(baseline_dir.resolve()),
"video_case_dir": str(args.video_case_dir.resolve()),
"reference_track_frames": len(reference_track),
"oracle_offset_map": str(oracle_offsets_path.resolve()),
"causal_offset_map": str(causal_offsets_path.resolve()),
"frame_delta_offset_map": str(frame_delta_offsets_path.resolve()),
"variants": {
"baseline": {
"output_dir": str(baseline_dir.resolve()),
"metrics": baseline_metrics,
},
"oracle": {
"output_dir": str(oracle_dir.resolve()),
"metrics": oracle_metrics,
"comparison_to_baseline": compare_metrics(oracle_metrics),
},
"causal": {
"output_dir": str(causal_dir.resolve()),
"metrics": causal_metrics,
"comparison_to_baseline": compare_metrics(causal_metrics),
},
"frame_delta": {
"output_dir": str(frame_delta_dir.resolve()),
"metrics": frame_delta_metrics,
"comparison_to_baseline": compare_metrics(frame_delta_metrics),
},
},
}
def main() -> int:
args = parse_args()
baseline_case_dir = args.baseline_case_dir.resolve()
tracking_json = args.tracking_json.resolve() if args.tracking_json else baseline_case_dir / "merge.json"
reference_track = load_reference_track(
tracking_json=tracking_json,
track_id=args.track_id,
frame_id_start=args.frame_id_start,
frame_id_end=args.frame_id_end,
)
oracle_offsets, causal_offsets, frame_delta_offsets = build_offset_maps(reference_track, alpha=args.alpha)
case_tag = f"{baseline_case_dir.name}_track{args.track_id}_f{args.frame_id_start}_{args.frame_id_end}_a{str(args.alpha).replace('.', 'p')}"
output_root = args.output_root.resolve() / case_tag
configs_dir = output_root / "configs"
oracle_offsets_path = configs_dir / "oracle_roi1_offsets.json"
causal_offsets_path = configs_dir / "causal_prev_frame_roi1_offsets.json"
frame_delta_offsets_path = configs_dir / "frame_delta_roi1_offsets.json"
oracle_dir = output_root / "oracle"
causal_dir = output_root / "causal"
frame_delta_dir = output_root / "frame_delta"
report_path = output_root / "experiment_summary.json"
write_offset_map(
oracle_offsets_path,
oracle_offsets,
{
"mode": "same_frame_oracle",
"track_id": args.track_id,
"alpha": args.alpha,
"reference_frame_id": reference_track[0]["frame_id"],
"reference_y2": reference_track[0]["y2"],
},
)
write_offset_map(
causal_offsets_path,
causal_offsets,
{
"mode": "previous_frame_causal",
"track_id": args.track_id,
"alpha": args.alpha,
"reference_frame_id": reference_track[0]["frame_id"],
"reference_y2": reference_track[0]["y2"],
},
)
write_offset_map(
frame_delta_offsets_path,
frame_delta_offsets,
{
"mode": "same_frame_prev_delta",
"track_id": args.track_id,
"alpha": args.alpha,
"reference_frame_id": reference_track[0]["frame_id"],
"reference_y2": reference_track[0]["y2"],
},
)
if not args.skip_existing or not (oracle_dir / "predictions" / "merge").is_dir():
run_inference_variant(
video_case_dir=args.video_case_dir.resolve(),
output_dir=oracle_dir,
frame_id_start=args.frame_id_start,
frame_id_end=args.frame_id_end,
exported_model=args.exported_model.resolve(),
device=args.device,
offset_map_path=oracle_offsets_path,
)
if not args.skip_existing or not (causal_dir / "predictions" / "merge").is_dir():
run_inference_variant(
video_case_dir=args.video_case_dir.resolve(),
output_dir=causal_dir,
frame_id_start=args.frame_id_start,
frame_id_end=args.frame_id_end,
exported_model=args.exported_model.resolve(),
device=args.device,
offset_map_path=causal_offsets_path,
)
if not args.skip_existing or not (frame_delta_dir / "predictions" / "merge").is_dir():
run_inference_variant(
video_case_dir=args.video_case_dir.resolve(),
output_dir=frame_delta_dir,
frame_id_start=args.frame_id_start,
frame_id_end=args.frame_id_end,
exported_model=args.exported_model.resolve(),
device=args.device,
offset_map_path=frame_delta_offsets_path,
)
baseline_series = collect_variant_series(reference_track, baseline_case_dir / "predictions" / "merge")
oracle_series = collect_variant_series(reference_track, oracle_dir / "predictions" / "merge")
causal_series = collect_variant_series(reference_track, causal_dir / "predictions" / "merge")
frame_delta_series = collect_variant_series(reference_track, frame_delta_dir / "predictions" / "merge")
baseline_metrics = compute_series_metrics(baseline_series)
oracle_metrics = compute_series_metrics(oracle_series)
causal_metrics = compute_series_metrics(causal_series)
frame_delta_metrics = compute_series_metrics(frame_delta_series)
report_payload = build_report_payload(
args=args,
reference_track=reference_track,
baseline_metrics=baseline_metrics,
oracle_metrics=oracle_metrics,
causal_metrics=causal_metrics,
frame_delta_metrics=frame_delta_metrics,
oracle_offsets_path=oracle_offsets_path,
causal_offsets_path=causal_offsets_path,
frame_delta_offsets_path=frame_delta_offsets_path,
baseline_dir=baseline_case_dir,
oracle_dir=oracle_dir,
causal_dir=causal_dir,
frame_delta_dir=frame_delta_dir,
)
save_json(report_path, report_payload)
print("")
print("ROI1 crop compensation experiment summary")
print(f"summary_json: {report_path}")
print(f"baseline abs_dx_p95 : {baseline_metrics['abs_dx_p95']:.4f} m/frame")
print(f"oracle abs_dx_p95 : {oracle_metrics['abs_dx_p95']:.4f} m/frame")
print(f"causal abs_dx_p95 : {causal_metrics['abs_dx_p95']:.4f} m/frame")
print(f"delta abs_dx_p95 : {frame_delta_metrics['abs_dx_p95']:.4f} m/frame")
print(f"baseline local_dev_p95 : {baseline_metrics['local_dev_p95']:.4f} m")
print(f"oracle local_dev_p95 : {oracle_metrics['local_dev_p95']:.4f} m")
print(f"causal local_dev_p95 : {causal_metrics['local_dev_p95']:.4f} m")
print(f"delta local_dev_p95 : {frame_delta_metrics['local_dev_p95']:.4f} m")
return 0
if __name__ == "__main__":
sys.exit(main())