Files
yolov26_3d/tools/pdcl_inference/run_batch_two_roi_infer.py
2026-06-24 09:35:46 +08:00

1088 lines
42 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import argparse
import json
import re
import sys
import traceback
from pathlib import Path
from typing import Any, Optional
FILE = Path(__file__).resolve()
ROOT = FILE.parents[2]
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT))
try:
from dotenv import load_dotenv
except ImportError:
def load_dotenv(*args, **kwargs):
return False
try:
import yaml
except ImportError:
yaml = None
from tools.model_inference.adapters.video_dir_inference_utils import (
build_case_output_rel_dir,
resolve_video_case_paths,
)
from tools.pdcl_inference.pipeline_types import RawIDTask, VideoCaseTask
DEFAULT_OUTPUT_ROOT = FILE.parent / "visualization_by_rawid"
DEFAULT_INFERENCE_CONFIG_PATH = FILE.parent / "configs" / "two_roi_inference.yaml"
REQUIRED_ROI_NAMES = ("roi0", "roi1")
DEFAULT_SHARED_INFERENCE_CONFIG = {
"edge_yaw_max_lateral_dist": 30.0,
"inference_batch_size": 1,
}
def _to_str(value: Any) -> Optional[str]:
if value is None:
return None
text = str(value).strip()
return text or None
def _normalize_cve_timestamp(value: Optional[str]) -> Optional[str]:
if not value:
return None
digits = re.sub(r"\D", "", value)
if len(digits) >= 14:
return digits[:14]
if len(digits) >= 8:
return digits[:8]
return None
def _sanitize_path_component(value: Optional[str]) -> str:
text = _to_str(value) or "unknown"
text = re.sub(r"[\\/:*?\"<>|]+", "_", text)
text = re.sub(r"\s+", "_", text)
text = re.sub(r"_+", "_", text).strip("._")
return text or "unknown"
def _load_yaml_if_present(path: str) -> dict[str, Any]:
if not path:
return {}
if yaml is None:
raise ImportError("PyYAML is required when using --roi0-data/--roi1-data. Please install: pip install pyyaml")
yaml_path = Path(path)
if not yaml_path.exists():
return {}
with yaml_path.open("r", encoding="utf-8") as file:
return yaml.safe_load(file) or {}
def _load_yaml_required(path: str | Path) -> dict[str, Any]:
if yaml is None:
raise ImportError("PyYAML is required for loading the shared two-ROI inference config. Please install: pip install pyyaml")
yaml_path = Path(path)
if not yaml_path.is_file():
raise FileNotFoundError(f"Inference config file not found: {yaml_path}")
with yaml_path.open("r", encoding="utf-8") as file:
return yaml.safe_load(file) or {}
def _coerce_imgsz_arg(values: Optional[list[int]]) -> Optional[tuple[int, int]]:
if not values:
return None
if len(values) != 2:
raise ValueError(f"Expected imgsz as two integers, got: {values}")
return int(values[0]), int(values[1])
def _coerce_imgsz_config(value: Any) -> Optional[tuple[int, int]]:
if value in (None, "", []):
return None
if isinstance(value, (list, tuple)):
return _coerce_imgsz_arg([int(item) for item in value])
text = str(value).strip()
if not text:
return None
return _coerce_imgsz_arg([int(part.strip()) for part in text.split(",") if part.strip()])
def _resolve_config_path_value(raw_value: Any, config_path: Path) -> str:
text = _to_str(raw_value)
if text is None:
return ""
candidate = Path(text)
if candidate.is_absolute():
return str(candidate.resolve())
if text.startswith("./") or text.startswith("../"):
return str((config_path.parent / candidate).resolve())
return str((ROOT / candidate).resolve())
def load_two_roi_inference_config(config_path: str | Path) -> dict[str, Any]:
resolved_config_path = Path(config_path).resolve()
payload = _load_yaml_required(resolved_config_path)
if not isinstance(payload, dict):
raise ValueError(
f"Two-ROI inference config must be a YAML mapping, got {type(payload).__name__}: {resolved_config_path}"
)
rois_payload = payload.get("rois")
if rois_payload is None:
rois_payload = {roi_name: payload.get(roi_name) for roi_name in REQUIRED_ROI_NAMES if payload.get(roi_name) is not None}
if not isinstance(rois_payload, dict):
raise ValueError(f"`rois` in inference config must be a mapping: {resolved_config_path}")
shared_payload = payload.get("shared") or {}
if not isinstance(shared_payload, dict):
raise ValueError(f"`shared` in inference config must be a mapping: {resolved_config_path}")
normalized_rois = {}
for roi_name in REQUIRED_ROI_NAMES:
roi_payload = rois_payload.get(roi_name)
if not isinstance(roi_payload, dict):
raise ValueError(f"Missing or invalid `{roi_name}` section in inference config: {resolved_config_path}")
roi_value = roi_payload.get("roi")
if not isinstance(roi_value, (list, tuple)) or len(roi_value) != 2:
raise ValueError(f"`{roi_name}.roi` must be a 2-item list/tuple in inference config: {resolved_config_path}")
crop_center_mode = _to_str(roi_payload.get("crop_center_mode"))
if crop_center_mode not in {"cxvy", "vxvy"}:
raise ValueError(
f"`{roi_name}.crop_center_mode` must be one of ['cxvy', 'vxvy'] in inference config: {resolved_config_path}"
)
model_path = _resolve_config_path_value(roi_payload.get("model"), resolved_config_path)
if not model_path:
raise ValueError(f"`{roi_name}.model` is required in inference config: {resolved_config_path}")
normalized_rois[roi_name] = {
"model": model_path,
"data": _resolve_config_path_value(roi_payload.get("data"), resolved_config_path),
"roi": (int(roi_value[0]), int(roi_value[1])),
"crop_center_mode": crop_center_mode,
"virtual_fx": float(roi_payload.get("virtual_fx")),
"imgsz": _coerce_imgsz_config(roi_payload.get("imgsz")),
"conf": float(roi_payload.get("conf")),
"max_det": int(roi_payload.get("max_det")),
}
return {
"config_path": str(resolved_config_path),
"shared": {
"edge_yaw_max_lateral_dist": float(
shared_payload.get("edge_yaw_max_lateral_dist", DEFAULT_SHARED_INFERENCE_CONFIG["edge_yaw_max_lateral_dist"])
),
"inference_batch_size": max(
1, int(shared_payload.get("inference_batch_size", DEFAULT_SHARED_INFERENCE_CONFIG["inference_batch_size"]))
),
},
"rois": normalized_rois,
}
def populate_two_roi_inference_args(args: argparse.Namespace) -> dict[str, Any]:
config_payload = load_two_roi_inference_config(getattr(args, "inference_config", DEFAULT_INFERENCE_CONFIG_PATH))
args.inference_config = config_payload["config_path"]
if getattr(args, "edge_yaw_max_lateral_dist", None) is None:
args.edge_yaw_max_lateral_dist = float(config_payload["shared"]["edge_yaw_max_lateral_dist"])
if getattr(args, "inference_batch_size", None) is None:
args.inference_batch_size = int(config_payload["shared"]["inference_batch_size"])
for roi_name in REQUIRED_ROI_NAMES:
roi_config = config_payload["rois"][roi_name]
if _to_str(getattr(args, f"{roi_name}_model", None)) is None:
setattr(args, f"{roi_name}_model", str(roi_config["model"]))
if _to_str(getattr(args, f"{roi_name}_data", None)) is None:
setattr(args, f"{roi_name}_data", str(roi_config["data"]))
if getattr(args, f"{roi_name}_roi", None) is None:
setattr(args, f"{roi_name}_roi", tuple(int(value) for value in roi_config["roi"]))
if getattr(args, f"{roi_name}_crop_center_mode", None) is None:
setattr(args, f"{roi_name}_crop_center_mode", str(roi_config["crop_center_mode"]))
if getattr(args, f"{roi_name}_virtual_fx", None) is None:
setattr(args, f"{roi_name}_virtual_fx", float(roi_config["virtual_fx"]))
if getattr(args, f"{roi_name}_imgsz", None) is None:
setattr(args, f"{roi_name}_imgsz", roi_config["imgsz"])
if getattr(args, f"{roi_name}_conf", None) is None:
setattr(args, f"{roi_name}_conf", float(roi_config["conf"]))
if getattr(args, f"{roi_name}_max_det", None) is None:
setattr(args, f"{roi_name}_max_det", int(roi_config["max_det"]))
return config_payload
def build_roi_specs_from_args(args: argparse.Namespace):
from tools.pdcl_inference.two_roi_inference import ROIModelSpec
populate_two_roi_inference_args(args)
roi_specs = []
for roi_name in REQUIRED_ROI_NAMES:
data_cfg = _load_yaml_if_present(getattr(args, f"{roi_name}_data"))
roi_arg = tuple(int(value) for value in getattr(args, f"{roi_name}_roi"))
crop_mode_arg = str(getattr(args, f"{roi_name}_crop_center_mode"))
virtual_fx_arg = float(getattr(args, f"{roi_name}_virtual_fx"))
imgsz_arg = _coerce_imgsz_arg(getattr(args, f"{roi_name}_imgsz"))
resolved_roi = tuple(int(value) for value in data_cfg.get("roi", roi_arg))
resolved_crop_mode = str(data_cfg.get("crop_center_mode", crop_mode_arg))
resolved_virtual_fx = float(data_cfg.get("virtual_fx", virtual_fx_arg))
roi_specs.append(
ROIModelSpec(
name=roi_name.upper(),
model_path=str(getattr(args, f"{roi_name}_model")),
roi_size=resolved_roi,
crop_center_mode=resolved_crop_mode,
virtual_fx=resolved_virtual_fx,
imgsz=imgsz_arg,
conf=float(getattr(args, f"{roi_name}_conf")),
max_det=int(getattr(args, f"{roi_name}_max_det")),
)
)
return roi_specs
def build_two_roi_inference_context_from_args(args: argparse.Namespace, requested_rois: Optional[set[str]] = None):
from tools.pdcl_inference.two_roi_inference import build_inference_context
populate_two_roi_inference_args(args)
roi_specs = build_roi_specs_from_args(args)
if requested_rois is not None:
requested = {str(roi_name).lower() for roi_name in requested_rois}
roi_specs = [spec for spec in roi_specs if spec.name.lower() in requested]
return build_inference_context(
roi_specs=roi_specs,
device=args.device,
half=args.half,
classes=args.classes,
edge_yaw_max_lateral_dist_m=args.edge_yaw_max_lateral_dist,
inference_batch_size=getattr(args, "inference_batch_size", 1),
)
def add_inference_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--inference-config",
type=str,
default=str(DEFAULT_INFERENCE_CONFIG_PATH),
help="YAML file containing shared two-ROI inference defaults",
)
for roi_name in REQUIRED_ROI_NAMES:
parser.add_argument(
f"--{roi_name}-model",
type=str,
default=None,
help=f"{roi_name.upper()} checkpoint path (*.pt); overrides inference config",
)
parser.add_argument(
f"--{roi_name}-data",
type=str,
default=None,
help=f"Optional data YAML used to infer {roi_name.upper()} defaults; overrides inference config",
)
parser.add_argument(
f"--{roi_name}-roi",
nargs=2,
type=int,
default=None,
metavar=("W", "H"),
help=f"{roi_name.upper()} crop size before resize; overrides inference config",
)
parser.add_argument(
f"--{roi_name}-crop-center-mode",
type=str,
default=None,
choices=("cxvy", "vxvy"),
help=f"{roi_name.upper()} crop center mode; overrides inference config",
)
parser.add_argument(
f"--{roi_name}-virtual-fx",
type=float,
default=None,
help=f"{roi_name.upper()} virtual focal length; overrides inference config",
)
parser.add_argument(
f"--{roi_name}-imgsz",
nargs=2,
type=int,
default=None,
metavar=("W", "H"),
help=f"{roi_name.upper()} model input size override; overrides inference config",
)
parser.add_argument(
f"--{roi_name}-conf",
type=float,
default=None,
help=f"{roi_name.upper()} confidence threshold; overrides inference config",
)
parser.add_argument(
f"--{roi_name}-max-det",
type=int,
default=None,
help=f"{roi_name.upper()} max detections per frame; overrides inference config",
)
parser.add_argument("--device", type=str, default="0", help="Inference device, e.g. '0' or 'cpu'")
parser.add_argument("--half", action="store_true", help="Run inference in FP16 on CUDA")
parser.add_argument("--classes", nargs="*", type=int, default=None, help="Optional class-id filter")
parser.add_argument(
"--inference-batch-size",
type=int,
default=None,
help="Number of frames/images to run together per ROI model forward pass; overrides inference config",
)
parser.add_argument(
"--edge-yaw-max-lateral-dist",
type=float,
default=None,
help="Use edge-based yaw only for 2+ visible-face cases whose predicted absolute lateral distance is below this value in meters; overrides inference config",
)
def add_two_roi_inference_args(parser: argparse.ArgumentParser, include_output_dir: bool = True) -> None:
"""Compatibility wrapper for analysis scripts that share the same ROI hyper-parameter choices."""
add_inference_args(parser)
parser.add_argument("--glob", type=str, default="*.png", help="Image glob inside exported image-case directories")
parser.add_argument("--max-images", type=int, default=0, help="Maximum number of images/frames to process; 0 means all")
if include_output_dir:
parser.add_argument(
"--output-dir",
type=str,
default=str(DEFAULT_OUTPUT_ROOT),
help="Visualization output directory or root",
)
def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Batch run two-ROI Detect3D inference from raw_id manifests, clip lists, or video-case camera4.bin inputs."
)
parser.add_argument(
"--rawid-json",
type=str,
default="",
help="Path to AEB raw_id manifest JSON produced by get_clips_of_aeb.py",
)
parser.add_argument(
"--clip-list-file",
type=str,
default=str(FILE.parent / "clips_6284.txt"),
help="Path to text file: '<clip_id>' per line. Used when --rawid-json and --video-case-list-file are unset.",
)
parser.add_argument(
"--video-case-list-file",
type=str,
default="",
help="Path to text file: '<camera4.bin|sigmastar.1|case_dir>' per line.",
)
parser.add_argument(
"--output-root",
type=str,
default=str(DEFAULT_OUTPUT_ROOT),
help="Root directory containing one result folder per raw_id",
)
parser.add_argument(
"--visualization-root",
type=str,
default=str(FILE.parent / "visualization_20260407"),
help="Root directory for clip-list and video-case visualization outputs",
)
parser.add_argument(
"--export-root",
type=str,
default=str(FILE.parent / "clip_exports"),
help="Root directory used to save decoded frames and calibration in clip-list export mode.",
)
parser.add_argument(
"--output-prefix",
type=str,
default="clip_export",
help="Prefix used in each clip export directory name in clip-list export mode.",
)
parser.add_argument(
"--camera-topic",
type=str,
default="camera4",
help="Camera topic name in mcap",
)
parser.add_argument(
"--max-frames-per-clip",
type=int,
default=0,
help="Max decoded frames per clip; 0 means all",
)
parser.add_argument(
"--calib-file",
type=str,
default="",
help="Optional override path to camera4.json for all clips",
)
parser.add_argument("--skip-done", action="store_true", help="Skip raw_ids and clips already marked done")
parser.add_argument("--limit-rawids", type=int, default=0, help="Limit number of raw_ids; 0 means all")
parser.add_argument("--limit-clips", type=int, default=0, help="Limit clip-list/video-case tasks; 0 means all")
parser.add_argument(
"--limit-clips-per-rawid",
type=int,
default=0,
help="Limit number of clips per raw_id; 0 means all",
)
parser.add_argument("--video-stride", type=int, default=1, help="Read every Nth frame from camera4.bin inputs in video-case mode")
parser.add_argument("--glob", type=str, default="*.png", help="Image glob inside exported image-case directories")
parser.add_argument("--max-images", type=int, default=0, help="Maximum number of images/frames to process; 0 means all")
add_inference_args(parser)
return parser.parse_args(argv)
def parse_rawid_tasks(rawid_json_path: str) -> list[RawIDTask]:
with open(rawid_json_path, "r", encoding="utf-8") as file:
payload = json.load(file)
if not isinstance(payload, dict):
raise ValueError(f"输入 JSON 顶层必须是 dict实际: {type(payload).__name__}")
grouped = payload.get("scenarios", payload)
if not isinstance(grouped, dict):
raise ValueError("输入 JSON 的 scenarios 字段必须是 dict")
task_map: dict[str, RawIDTask] = {}
for scenario_key, records in grouped.items():
for record in records:
raw_id = _to_str(record.get("rawid"))
if not raw_id:
continue
raw_clips = record.get("clips", [])
if not isinstance(raw_clips, list):
raise ValueError(f"raw_id={raw_id} 的 clips 字段必须是 list实际: {type(raw_clips).__name__}")
clips = tuple(
clip
for clip in dict.fromkeys(_to_str(item) for item in raw_clips).keys()
if clip
)
if not clips:
continue
scenario_name = _to_str(record.get("场景")) or _to_str(scenario_key.split("-")[0]) or scenario_key
cve_data = _normalize_cve_timestamp(_to_str(record.get("CVE数据")))
candidate = RawIDTask(
scenario_key=scenario_key,
scenario_name=scenario_name,
raw_id=raw_id,
cve_data=cve_data,
offset=_to_str(record.get("偏置")),
target_speed=_to_str(record.get("目标速度")),
ego_speed=_to_str(record.get("自车速度")),
clips=clips,
)
existing = task_map.get(raw_id)
if existing is None:
task_map[raw_id] = candidate
continue
merged_clips = tuple(dict.fromkeys([*existing.clips, *candidate.clips]))
merged_cve = max(filter(None, [existing.cve_data, candidate.cve_data]), default=None)
task_map[raw_id] = RawIDTask(
scenario_key=existing.scenario_key,
scenario_name=existing.scenario_name,
raw_id=raw_id,
cve_data=merged_cve,
offset=existing.offset,
target_speed=existing.target_speed,
ego_speed=existing.ego_speed,
clips=merged_clips,
)
return list(task_map.values())
def save_json(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as file:
json.dump(payload, file, indent=2, ensure_ascii=False)
def append_error_log(error_log: Path, task_id: str, message: str) -> None:
error_log.parent.mkdir(parents=True, exist_ok=True)
with error_log.open("a", encoding="utf-8") as file:
file.write(f"[{task_id}] {message}\n{traceback.format_exc()}\n")
def build_rawid_output_dir(output_root: Path, rawid_task: RawIDTask) -> Path:
scenario_part = _sanitize_path_component(rawid_task.scenario_key or rawid_task.scenario_name)
rawid_part = _sanitize_path_component(rawid_task.raw_id)
return output_root / f"{scenario_part}__{rawid_part}"
def infer_input_mode(args: argparse.Namespace) -> str:
if args.video_case_list_file:
return "video_case"
if args.rawid_json:
return "rawid"
return "clip"
def _load_clip_export_tools():
from tools.pdcl_inference.export_mcap_frames_by_clip_id import (
build_case_dir_name,
export_one_clip,
parse_clip_list,
save_run_manifest,
validate_pdcl_auth_env,
)
return build_case_dir_name, export_one_clip, parse_clip_list, save_run_manifest, validate_pdcl_auth_env
def parse_video_case_list(video_case_list_file: str) -> list[VideoCaseTask]:
tasks: list[VideoCaseTask] = []
seen_case_dirs: set[str] = set()
with open(video_case_list_file, "r", encoding="utf-8") as file:
for line in file:
line = line.strip()
if not line or line.startswith("#"):
continue
input_path = line.split()[0]
try:
case_dir, video_path, calib_path = resolve_video_case_paths(input_path)
except Exception as exc:
print(f"Skip video_case={input_path}: {type(exc).__name__}: {exc}")
continue
case_dir_str = str(case_dir)
if case_dir_str in seen_case_dirs:
print(f"Skip duplicate video_case={input_path}: resolved case {case_dir_str}")
continue
seen_case_dirs.add(case_dir_str)
tasks.append(
VideoCaseTask(
input_path=input_path,
case_dir=case_dir_str,
video_path=str(video_path),
calib_path=str(calib_path),
output_rel_dir=str(build_case_output_rel_dir(case_dir)),
)
)
return tasks
def has_reusable_exported_case(case_dir: Path) -> bool:
images_dir = case_dir / "images"
calib_path = case_dir / "calib" / "L2_calib" / "camera4.json"
if not images_dir.is_dir() or not calib_path.is_file():
return False
return any(path.is_file() for path in images_dir.iterdir())
def save_task_batch_manifest(args: argparse.Namespace, tasks, input_mode: str) -> None:
manifest_path = Path(args.visualization_root) / "_status" / "run_manifest.json"
payload = {
"input_mode": input_mode,
"visualization_root": args.visualization_root,
"inference_config": args.inference_config,
"inference_batch_size": args.inference_batch_size,
"edge_yaw_max_lateral_dist_m": args.edge_yaw_max_lateral_dist,
"roi_models": {
"roi0": args.roi0_model,
"roi1": args.roi1_model,
},
"num_tasks": len(tasks),
}
if input_mode == "clip":
payload.update(
{
"clip_list_file": args.clip_list_file,
"export_root": args.export_root,
"camera_topic": args.camera_topic,
"max_frames_per_clip": args.max_frames_per_clip,
"tasks": [
{
"task_id": task.task_id,
"clip_uuid": task.clip_uuid,
"date_name": task.date_name,
"vehicle_name": task.vehicle_name,
"clip_path": task.clip_path,
}
for task in tasks
],
}
)
else:
payload.update(
{
"video_case_list_file": args.video_case_list_file,
"video_stride": args.video_stride,
"max_images": args.max_images,
"tasks": [
{
"task_id": task.task_id,
"input_path": task.input_path,
"case_dir": task.case_dir,
"video_path": task.video_path,
"calib_path": task.calib_path,
"output_rel_dir": task.output_rel_dir,
}
for task in tasks
],
}
)
save_json(manifest_path, payload)
def record_task_failure(infer_status, visualization_root: str, task_id: str, exc: Exception) -> None:
message = f"{type(exc).__name__}: {exc}"
infer_status.mark_failed(task_id, message)
append_error_log(Path(visualization_root) / "_status" / "errors.log", task_id, message)
def save_batch_manifest(args: argparse.Namespace, rawid_tasks: list[RawIDTask]) -> None:
manifest_path = Path(args.output_root) / "_status" / "run_manifest.json"
payload = {
"rawid_json": args.rawid_json,
"output_root": args.output_root,
"inference_config": args.inference_config,
"rawids_ordered_by": "cve_data_ascending",
"clips_ordered_by": "natural_clip_list_order_from_raw_id_manifest",
"camera_topic": args.camera_topic,
"max_frames_per_clip": args.max_frames_per_clip,
"limit_rawids": args.limit_rawids,
"limit_clips_per_rawid": args.limit_clips_per_rawid,
"inference_batch_size": args.inference_batch_size,
"edge_yaw_max_lateral_dist_m": args.edge_yaw_max_lateral_dist,
"roi_models": {
"roi0": args.roi0_model,
"roi1": args.roi1_model,
},
"num_rawids": len(rawid_tasks),
"rawids": [
{
"raw_id": task.raw_id,
"scenario_key": task.scenario_key,
"cve_data": task.cve_data,
"num_clips": len(task.clips),
"clips": list(task.clips),
}
for task in rawid_tasks
],
}
save_json(manifest_path, payload)
def save_rawid_manifest(rawid_dir: Path, rawid_task: RawIDTask, clip_results: list[dict[str, Any]]) -> None:
success_clips = sum(1 for item in clip_results if item.get("status") in {"done", "skipped_done"})
failed_clips = sum(1 for item in clip_results if item.get("status") == "failed")
payload = {
"raw_id": rawid_task.raw_id,
"scenario_key": rawid_task.scenario_key,
"scenario_name": rawid_task.scenario_name,
"cve_data": rawid_task.cve_data,
"偏置": rawid_task.offset,
"目标速度": rawid_task.target_speed,
"自车速度": rawid_task.ego_speed,
"num_clips": len(rawid_task.clips),
"clips_ordered_by": "natural_clip_list_order_from_raw_id_manifest",
"ordered_clips": list(rawid_task.clips),
"predictions_path": str(rawid_dir / "predictions.json"),
"success_clips": success_clips,
"failed_clips": failed_clips,
"clips": clip_results,
}
save_json(rawid_dir / "rawid_manifest.json", payload)
def build_pdcl_rawid_diagnostic(rawid_task: RawIDTask, clip_results: list[dict[str, Any]]) -> Optional[str]:
if not clip_results:
return None
if not all("clip id not exist" in str(item.get("detail", "")) for item in clip_results):
return None
try:
from pdcl_dss import Raw
with Raw(rawid_task.raw_id) as raw:
current_clips = raw.list_clip_ukeys()
except Exception as exc:
return (
f"All manifest clips are missing from PDCL, and raw_id lookup also failed: "
f"{type(exc).__name__}: {exc}. The AEB manifest is likely stale or was generated "
"against a different PDCL environment/namespace."
)
current_clip_set = set(current_clips)
manifest_clip_set = set(rawid_task.clips)
if current_clip_set == manifest_clip_set:
return None
return (
f"All manifest clips are missing from PDCL. Current raw_id lookup returns "
f"{len(current_clips)} clips and overlaps {len(current_clip_set & manifest_clip_set)}/"
f"{len(manifest_clip_set)} manifest clips; regenerate the AEB clips manifest before rerunning."
)
def run_one_clip_inference(
rawid_task: RawIDTask,
clip_uuid: str,
clip_index: int,
args: argparse.Namespace,
context,
rawid_dir: Path,
predictions_payload: dict[str, Any],
frame_index_offset: int,
clip_status,
error_log: Path,
) -> dict[str, Any]:
from tools.pdcl_inference.export_mcap_frames_by_clip_id import (
build_calib_summary,
iter_decoded_clip_frames,
load_calib_payload_for_clip,
)
from tools.pdcl_inference.pdcl_clip_service import PDCLClipService
from tools.pdcl_inference.two_roi_inference import (
append_image_stream_inference,
camera4_payload_to_raw_calib,
)
clip_task_id = f"{rawid_task.raw_id}:{clip_uuid}"
predictions_path = rawid_dir / "predictions.json"
clip_status.mark_running(clip_task_id, f"resolving clip path for raw_id={rawid_task.raw_id}")
try:
clip_path = PDCLClipService.get_clip_path(clip_uuid)
if clip_path is None:
raise FileNotFoundError(f"clip_uuid={clip_uuid} 无法解析到本地 mcap 路径")
calib_payload, calib_source = load_calib_payload_for_clip(
clip_path=clip_path,
calib_file_override=args.calib_file,
)
raw_calib = camera4_payload_to_raw_calib(calib_payload)
calib_summary = build_calib_summary(calib_payload)
num_frames = append_image_stream_inference(
context=context,
frames=iter_decoded_clip_frames(
clip_uuid=clip_uuid,
clip_path=clip_path,
camera_topic=args.camera_topic,
max_frames=args.max_frames_per_clip,
),
raw_calib=raw_calib,
output_dir=rawid_dir,
predictions_payload=predictions_payload,
frame_index_offset=frame_index_offset,
frame_name_prefix=f"{clip_index:03d}",
)
for frame_payload in predictions_payload["frames"][frame_index_offset : frame_index_offset + num_frames]:
frame_payload["clip_uuid"] = clip_uuid
frame_payload["clip_index_within_rawid"] = clip_index
save_json(predictions_path, predictions_payload)
clip_status.mark_done(clip_task_id, f"frames={num_frames} output={rawid_dir}")
return {
"clip_uuid": clip_uuid,
"status": "done",
"clip_path": clip_path,
"clip_index_within_rawid": clip_index,
"num_frames": num_frames,
"frame_index_start": frame_index_offset,
"frame_index_end": frame_index_offset + max(0, num_frames - 1),
"output_dir": str(rawid_dir),
"predictions_path": str(predictions_path),
"calib_source": calib_source,
"calib_summary": calib_summary,
}
except Exception as exc:
message = f"{type(exc).__name__}: {exc}"
clip_status.mark_failed(clip_task_id, message)
append_error_log(error_log, clip_task_id, message)
return {
"clip_uuid": clip_uuid,
"status": "failed",
"clip_index_within_rawid": clip_index,
"detail": message,
"output_dir": str(rawid_dir),
}
def run_one_rawid(
rawid_task: RawIDTask,
args: argparse.Namespace,
context,
rawid_status,
clip_status,
error_log: Path,
) -> tuple[int, int]:
rawid_dir = build_rawid_output_dir(Path(args.output_root), rawid_task)
if args.skip_done and rawid_status.is_done(rawid_task.task_id):
info = rawid_status.get(rawid_task.task_id) or {}
print(f" -> skip done: {info.get('detail', '')}")
return 0, 0
from tools.pdcl_inference.two_roi_inference import create_predictions_payload
rawid_dir.mkdir(parents=True, exist_ok=True)
rawid_status.mark_running(rawid_task.task_id, f"running {len(rawid_task.clips)} clips")
clip_results = []
success = 0
fail = 0
total_frames = 0
predictions_payload = create_predictions_payload(
context=context,
case_name=rawid_task.raw_id,
source_info={
"raw_id": rawid_task.raw_id,
"scenario_key": rawid_task.scenario_key,
"scenario_name": rawid_task.scenario_name,
"cve_data": rawid_task.cve_data,
"camera_topic": args.camera_topic,
"clips_ordered_by": "natural_clip_list_order_from_raw_id_manifest",
"ordered_clips": list(rawid_task.clips),
},
)
clips = list(rawid_task.clips)
if args.limit_clips_per_rawid > 0:
clips = clips[: args.limit_clips_per_rawid]
if not clips:
rawid_status.mark_failed(rawid_task.task_id, "no clips to process")
save_rawid_manifest(rawid_dir, rawid_task, clip_results)
return 0, 1
for clip_index, clip_uuid in enumerate(clips, start=1):
print(f" [{clip_index}/{len(clips)}] clip_uuid={clip_uuid}")
clip_result = run_one_clip_inference(
rawid_task=rawid_task,
clip_uuid=clip_uuid,
clip_index=clip_index,
args=args,
context=context,
rawid_dir=rawid_dir,
predictions_payload=predictions_payload,
frame_index_offset=total_frames,
clip_status=clip_status,
error_log=error_log,
)
clip_results.append(clip_result)
if clip_result["status"] in {"done", "skipped_done"}:
success += 1
total_frames += int(clip_result.get("num_frames", 0))
detail = clip_result.get("detail") or clip_result.get("output_dir", "")
print(f" -> {clip_result['status']}: {detail}")
else:
fail += 1
print(f" -> failed: {clip_result.get('detail', '')}")
save_rawid_manifest(rawid_dir, rawid_task, clip_results)
diagnostic = build_pdcl_rawid_diagnostic(rawid_task, clip_results)
if diagnostic:
print(f" -> diagnostic: {diagnostic}")
rawid_status.mark_failed(rawid_task.task_id, diagnostic)
return success, fail
if fail > 0:
rawid_status.mark_failed(rawid_task.task_id, f"{fail}/{len(clips)} clips failed")
else:
rawid_status.mark_done(rawid_task.task_id, str(rawid_dir))
return success, fail
def run_clip_batch(args: argparse.Namespace, context) -> None:
build_case_dir_name, export_one_clip, parse_clip_list, save_run_manifest, validate_pdcl_auth_env = _load_clip_export_tools()
from tools.pdcl_inference.status_store import StatusStore
validate_pdcl_auth_env()
args.output_root = args.export_root
Path(args.export_root).mkdir(parents=True, exist_ok=True)
Path(args.visualization_root).mkdir(parents=True, exist_ok=True)
clip_tasks = parse_clip_list(args.clip_list_file)
if args.limit_clips > 0:
clip_tasks = clip_tasks[: args.limit_clips]
if not clip_tasks:
print("No clip tasks discovered. Exit.")
return
export_status = StatusStore(Path(args.export_root) / "_status" / "task_status.json")
infer_status = StatusStore(Path(args.visualization_root) / "_status" / "task_status.json")
save_run_manifest(args, clip_tasks)
save_task_batch_manifest(args, clip_tasks, input_mode="clip")
print(f"Discovered {len(clip_tasks)} clip tasks.")
print(f"Export root: {args.export_root}")
print(f"Visualization root: {args.visualization_root}")
success = 0
fail = 0
for index, clip_task in enumerate(clip_tasks, start=1):
task_id = clip_task.task_id
if args.skip_done and infer_status.is_done(task_id):
info = infer_status.get(task_id) or {}
print(f"[{index}/{len(clip_tasks)}] clip_id={task_id} -> skip done: {info.get('detail', '')}")
continue
print(f"[{index}/{len(clip_tasks)}] clip_id={task_id} source={clip_task.clip_path}")
infer_status.mark_running(task_id, "exporting clip and running two-roi inference")
try:
case_dir = Path(args.export_root) / build_case_dir_name(args.output_prefix, clip_task)
if has_reusable_exported_case(case_dir):
export_status.mark_done(task_id, f"reuse existing export: {case_dir}")
print(f" -> reuse exported clip data from {case_dir}")
else:
export_result = export_one_clip(clip_task, args, export_status)
if not export_result.success or not export_result.output_dir:
raise RuntimeError(export_result.message)
case_dir = Path(export_result.output_dir)
from tools.pdcl_inference.two_roi_inference import run_case_inference
vis_dir = Path(args.visualization_root) / case_dir.name
infer_result = run_case_inference(
context=context,
case_dir=str(case_dir),
output_dir=vis_dir,
glob_pattern=args.glob,
max_images=args.max_images,
)
infer_status.mark_done(task_id, infer_result["output_dir"])
success += 1
print(f" -> saved {infer_result['num_frames']} frames to {infer_result['output_dir']}")
except Exception as exc:
fail += 1
record_task_failure(infer_status, args.visualization_root, task_id, exc)
print(f" -> failed: {type(exc).__name__}: {exc}")
print("\nBatch inference done.")
print(f"success={success}, fail={fail}")
print(f"infer_status_summary={infer_status.summary()}")
def run_video_case_batch(args: argparse.Namespace, context) -> None:
from tools.pdcl_inference.status_store import StatusStore
from tools.pdcl_inference.two_roi_inference import run_video_case_inference
Path(args.visualization_root).mkdir(parents=True, exist_ok=True)
video_case_tasks = parse_video_case_list(args.video_case_list_file)
if args.limit_clips > 0:
video_case_tasks = video_case_tasks[: args.limit_clips]
if not video_case_tasks:
print("No video-case tasks discovered. Exit.")
return
infer_status = StatusStore(Path(args.visualization_root) / "_status" / "task_status.json")
save_task_batch_manifest(args, video_case_tasks, input_mode="video_case")
print(f"Discovered {len(video_case_tasks)} video-case tasks.")
print(f"Visualization root: {args.visualization_root}")
success = 0
fail = 0
for index, task in enumerate(video_case_tasks, start=1):
task_id = task.task_id
if args.skip_done and infer_status.is_done(task_id):
info = infer_status.get(task_id) or {}
print(f"[{index}/{len(video_case_tasks)}] case={task.case_name} -> skip done: {info.get('detail', '')}")
continue
print(f"[{index}/{len(video_case_tasks)}] case={task.case_name} source={task.input_path}")
infer_status.mark_running(task_id, f"running video-case inference source={task.video_path}")
try:
vis_dir = Path(args.visualization_root) / Path(task.output_rel_dir)
infer_result = run_video_case_inference(
context=context,
video_case_dir=task.case_dir,
output_dir=vis_dir,
max_images=args.max_images,
video_stride=args.video_stride,
)
infer_status.mark_done(task_id, infer_result["output_dir"])
success += 1
print(f" -> saved {infer_result['num_frames']} frames to {infer_result['output_dir']}")
except Exception as exc:
fail += 1
record_task_failure(infer_status, args.visualization_root, task_id, exc)
print(f" -> failed: {type(exc).__name__}: {exc}")
print("\nBatch inference done.")
print(f"success={success}, fail={fail}")
print(f"infer_status_summary={infer_status.summary()}")
def run_rawid_batch(args: argparse.Namespace, context) -> None:
from tools.pdcl_inference.status_store import StatusStore
output_root = Path(args.output_root)
output_root.mkdir(parents=True, exist_ok=True)
rawid_tasks = parse_rawid_tasks(args.rawid_json)
rawid_tasks = sorted(
rawid_tasks,
key=lambda task: (task.cve_data or "", task.raw_id),
)
if args.limit_rawids > 0:
rawid_tasks = rawid_tasks[: args.limit_rawids]
if not rawid_tasks:
print("No raw_id tasks discovered. Exit.")
return
rawid_status = StatusStore(output_root / "_status" / "rawid_status.json")
clip_status = StatusStore(output_root / "_status" / "clip_status.json")
error_log = output_root / "_status" / "errors.log"
save_batch_manifest(args, rawid_tasks)
context = build_two_roi_inference_context_from_args(args)
print(f"Discovered {len(rawid_tasks)} raw_ids.")
print(f"Output root: {output_root}")
total_success = 0
total_fail = 0
for index, rawid_task in enumerate(rawid_tasks, start=1):
print(
f"[{index}/{len(rawid_tasks)}] raw_id={rawid_task.raw_id} "
f"scenario={rawid_task.scenario_key} clips={len(rawid_task.clips)} cve={rawid_task.cve_data or 'n/a'}"
)
success, fail = run_one_rawid(
rawid_task=rawid_task,
args=args,
context=context,
rawid_status=rawid_status,
clip_status=clip_status,
error_log=error_log,
)
total_success += success
total_fail += fail
print("\nBatch inference done.")
print(f"clip_success={total_success}, clip_fail={total_fail}")
print(f"rawid_status_summary={rawid_status.summary()}")
print(f"clip_status_summary={clip_status.summary()}")
def main(argv: Optional[list[str]] = None) -> None:
args = parse_args(argv)
populate_two_roi_inference_args(args)
load_dotenv()
mode = infer_input_mode(args)
context = build_two_roi_inference_context_from_args(args)
if mode == "rawid":
from tools.pdcl_inference.export_mcap_frames_by_clip_id import validate_pdcl_auth_env
validate_pdcl_auth_env()
run_rawid_batch(args, context)
elif mode == "video_case":
run_video_case_batch(args, context)
else:
run_clip_batch(args, context)
if __name__ == "__main__":
main()