from __future__ import annotations import argparse import json import re import sys import traceback from pathlib import Path from typing import Any, Optional FILE = Path(__file__).resolve() ROOT = FILE.parents[2] if str(ROOT) not in sys.path: sys.path.append(str(ROOT)) try: from dotenv import load_dotenv except ImportError: def load_dotenv(*args, **kwargs): return False try: import yaml except ImportError: yaml = None from tools.model_inference.adapters.video_dir_inference_utils import ( build_case_output_rel_dir, resolve_video_case_paths, ) from tools.pdcl_inference.pipeline_types import RawIDTask, VideoCaseTask DEFAULT_OUTPUT_ROOT = FILE.parent / "visualization_by_rawid" DEFAULT_INFERENCE_CONFIG_PATH = FILE.parent / "configs" / "two_roi_inference.yaml" REQUIRED_ROI_NAMES = ("roi0", "roi1") DEFAULT_SHARED_INFERENCE_CONFIG = { "edge_yaw_max_lateral_dist": 30.0, "inference_batch_size": 1, } def _to_str(value: Any) -> Optional[str]: if value is None: return None text = str(value).strip() return text or None def _normalize_cve_timestamp(value: Optional[str]) -> Optional[str]: if not value: return None digits = re.sub(r"\D", "", value) if len(digits) >= 14: return digits[:14] if len(digits) >= 8: return digits[:8] return None def _sanitize_path_component(value: Optional[str]) -> str: text = _to_str(value) or "unknown" text = re.sub(r"[\\/:*?\"<>|]+", "_", text) text = re.sub(r"\s+", "_", text) text = re.sub(r"_+", "_", text).strip("._") return text or "unknown" def _load_yaml_if_present(path: str) -> dict[str, Any]: if not path: return {} if yaml is None: raise ImportError("PyYAML is required when using --roi0-data/--roi1-data. Please install: pip install pyyaml") yaml_path = Path(path) if not yaml_path.exists(): return {} with yaml_path.open("r", encoding="utf-8") as file: return yaml.safe_load(file) or {} def _load_yaml_required(path: str | Path) -> dict[str, Any]: if yaml is None: raise ImportError("PyYAML is required for loading the shared two-ROI inference config. Please install: pip install pyyaml") yaml_path = Path(path) if not yaml_path.is_file(): raise FileNotFoundError(f"Inference config file not found: {yaml_path}") with yaml_path.open("r", encoding="utf-8") as file: return yaml.safe_load(file) or {} def _coerce_imgsz_arg(values: Optional[list[int]]) -> Optional[tuple[int, int]]: if not values: return None if len(values) != 2: raise ValueError(f"Expected imgsz as two integers, got: {values}") return int(values[0]), int(values[1]) def _coerce_imgsz_config(value: Any) -> Optional[tuple[int, int]]: if value in (None, "", []): return None if isinstance(value, (list, tuple)): return _coerce_imgsz_arg([int(item) for item in value]) text = str(value).strip() if not text: return None return _coerce_imgsz_arg([int(part.strip()) for part in text.split(",") if part.strip()]) def _resolve_config_path_value(raw_value: Any, config_path: Path) -> str: text = _to_str(raw_value) if text is None: return "" candidate = Path(text) if candidate.is_absolute(): return str(candidate.resolve()) if text.startswith("./") or text.startswith("../"): return str((config_path.parent / candidate).resolve()) return str((ROOT / candidate).resolve()) def load_two_roi_inference_config(config_path: str | Path) -> dict[str, Any]: resolved_config_path = Path(config_path).resolve() payload = _load_yaml_required(resolved_config_path) if not isinstance(payload, dict): raise ValueError( f"Two-ROI inference config must be a YAML mapping, got {type(payload).__name__}: {resolved_config_path}" ) rois_payload = payload.get("rois") if rois_payload is None: rois_payload = {roi_name: payload.get(roi_name) for roi_name in REQUIRED_ROI_NAMES if payload.get(roi_name) is not None} if not isinstance(rois_payload, dict): raise ValueError(f"`rois` in inference config must be a mapping: {resolved_config_path}") shared_payload = payload.get("shared") or {} if not isinstance(shared_payload, dict): raise ValueError(f"`shared` in inference config must be a mapping: {resolved_config_path}") normalized_rois = {} for roi_name in REQUIRED_ROI_NAMES: roi_payload = rois_payload.get(roi_name) if not isinstance(roi_payload, dict): raise ValueError(f"Missing or invalid `{roi_name}` section in inference config: {resolved_config_path}") roi_value = roi_payload.get("roi") if not isinstance(roi_value, (list, tuple)) or len(roi_value) != 2: raise ValueError(f"`{roi_name}.roi` must be a 2-item list/tuple in inference config: {resolved_config_path}") crop_center_mode = _to_str(roi_payload.get("crop_center_mode")) if crop_center_mode not in {"cxvy", "vxvy"}: raise ValueError( f"`{roi_name}.crop_center_mode` must be one of ['cxvy', 'vxvy'] in inference config: {resolved_config_path}" ) model_path = _resolve_config_path_value(roi_payload.get("model"), resolved_config_path) if not model_path: raise ValueError(f"`{roi_name}.model` is required in inference config: {resolved_config_path}") normalized_rois[roi_name] = { "model": model_path, "data": _resolve_config_path_value(roi_payload.get("data"), resolved_config_path), "roi": (int(roi_value[0]), int(roi_value[1])), "crop_center_mode": crop_center_mode, "virtual_fx": float(roi_payload.get("virtual_fx")), "imgsz": _coerce_imgsz_config(roi_payload.get("imgsz")), "conf": float(roi_payload.get("conf")), "max_det": int(roi_payload.get("max_det")), } return { "config_path": str(resolved_config_path), "shared": { "edge_yaw_max_lateral_dist": float( shared_payload.get("edge_yaw_max_lateral_dist", DEFAULT_SHARED_INFERENCE_CONFIG["edge_yaw_max_lateral_dist"]) ), "inference_batch_size": max( 1, int(shared_payload.get("inference_batch_size", DEFAULT_SHARED_INFERENCE_CONFIG["inference_batch_size"])) ), }, "rois": normalized_rois, } def populate_two_roi_inference_args(args: argparse.Namespace) -> dict[str, Any]: config_payload = load_two_roi_inference_config(getattr(args, "inference_config", DEFAULT_INFERENCE_CONFIG_PATH)) args.inference_config = config_payload["config_path"] if getattr(args, "edge_yaw_max_lateral_dist", None) is None: args.edge_yaw_max_lateral_dist = float(config_payload["shared"]["edge_yaw_max_lateral_dist"]) if getattr(args, "inference_batch_size", None) is None: args.inference_batch_size = int(config_payload["shared"]["inference_batch_size"]) for roi_name in REQUIRED_ROI_NAMES: roi_config = config_payload["rois"][roi_name] if _to_str(getattr(args, f"{roi_name}_model", None)) is None: setattr(args, f"{roi_name}_model", str(roi_config["model"])) if _to_str(getattr(args, f"{roi_name}_data", None)) is None: setattr(args, f"{roi_name}_data", str(roi_config["data"])) if getattr(args, f"{roi_name}_roi", None) is None: setattr(args, f"{roi_name}_roi", tuple(int(value) for value in roi_config["roi"])) if getattr(args, f"{roi_name}_crop_center_mode", None) is None: setattr(args, f"{roi_name}_crop_center_mode", str(roi_config["crop_center_mode"])) if getattr(args, f"{roi_name}_virtual_fx", None) is None: setattr(args, f"{roi_name}_virtual_fx", float(roi_config["virtual_fx"])) if getattr(args, f"{roi_name}_imgsz", None) is None: setattr(args, f"{roi_name}_imgsz", roi_config["imgsz"]) if getattr(args, f"{roi_name}_conf", None) is None: setattr(args, f"{roi_name}_conf", float(roi_config["conf"])) if getattr(args, f"{roi_name}_max_det", None) is None: setattr(args, f"{roi_name}_max_det", int(roi_config["max_det"])) return config_payload def build_roi_specs_from_args(args: argparse.Namespace): from tools.pdcl_inference.two_roi_inference import ROIModelSpec populate_two_roi_inference_args(args) roi_specs = [] for roi_name in REQUIRED_ROI_NAMES: data_cfg = _load_yaml_if_present(getattr(args, f"{roi_name}_data")) roi_arg = tuple(int(value) for value in getattr(args, f"{roi_name}_roi")) crop_mode_arg = str(getattr(args, f"{roi_name}_crop_center_mode")) virtual_fx_arg = float(getattr(args, f"{roi_name}_virtual_fx")) imgsz_arg = _coerce_imgsz_arg(getattr(args, f"{roi_name}_imgsz")) resolved_roi = tuple(int(value) for value in data_cfg.get("roi", roi_arg)) resolved_crop_mode = str(data_cfg.get("crop_center_mode", crop_mode_arg)) resolved_virtual_fx = float(data_cfg.get("virtual_fx", virtual_fx_arg)) roi_specs.append( ROIModelSpec( name=roi_name.upper(), model_path=str(getattr(args, f"{roi_name}_model")), roi_size=resolved_roi, crop_center_mode=resolved_crop_mode, virtual_fx=resolved_virtual_fx, imgsz=imgsz_arg, conf=float(getattr(args, f"{roi_name}_conf")), max_det=int(getattr(args, f"{roi_name}_max_det")), ) ) return roi_specs def build_two_roi_inference_context_from_args(args: argparse.Namespace, requested_rois: Optional[set[str]] = None): from tools.pdcl_inference.two_roi_inference import build_inference_context populate_two_roi_inference_args(args) roi_specs = build_roi_specs_from_args(args) if requested_rois is not None: requested = {str(roi_name).lower() for roi_name in requested_rois} roi_specs = [spec for spec in roi_specs if spec.name.lower() in requested] return build_inference_context( roi_specs=roi_specs, device=args.device, half=args.half, classes=args.classes, edge_yaw_max_lateral_dist_m=args.edge_yaw_max_lateral_dist, inference_batch_size=getattr(args, "inference_batch_size", 1), ) def add_inference_args(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--inference-config", type=str, default=str(DEFAULT_INFERENCE_CONFIG_PATH), help="YAML file containing shared two-ROI inference defaults", ) for roi_name in REQUIRED_ROI_NAMES: parser.add_argument( f"--{roi_name}-model", type=str, default=None, help=f"{roi_name.upper()} checkpoint path (*.pt); overrides inference config", ) parser.add_argument( f"--{roi_name}-data", type=str, default=None, help=f"Optional data YAML used to infer {roi_name.upper()} defaults; overrides inference config", ) parser.add_argument( f"--{roi_name}-roi", nargs=2, type=int, default=None, metavar=("W", "H"), help=f"{roi_name.upper()} crop size before resize; overrides inference config", ) parser.add_argument( f"--{roi_name}-crop-center-mode", type=str, default=None, choices=("cxvy", "vxvy"), help=f"{roi_name.upper()} crop center mode; overrides inference config", ) parser.add_argument( f"--{roi_name}-virtual-fx", type=float, default=None, help=f"{roi_name.upper()} virtual focal length; overrides inference config", ) parser.add_argument( f"--{roi_name}-imgsz", nargs=2, type=int, default=None, metavar=("W", "H"), help=f"{roi_name.upper()} model input size override; overrides inference config", ) parser.add_argument( f"--{roi_name}-conf", type=float, default=None, help=f"{roi_name.upper()} confidence threshold; overrides inference config", ) parser.add_argument( f"--{roi_name}-max-det", type=int, default=None, help=f"{roi_name.upper()} max detections per frame; overrides inference config", ) parser.add_argument("--device", type=str, default="0", help="Inference device, e.g. '0' or 'cpu'") parser.add_argument("--half", action="store_true", help="Run inference in FP16 on CUDA") parser.add_argument("--classes", nargs="*", type=int, default=None, help="Optional class-id filter") parser.add_argument( "--inference-batch-size", type=int, default=None, help="Number of frames/images to run together per ROI model forward pass; overrides inference config", ) parser.add_argument( "--edge-yaw-max-lateral-dist", type=float, default=None, help="Use edge-based yaw only for 2+ visible-face cases whose predicted absolute lateral distance is below this value in meters; overrides inference config", ) def add_two_roi_inference_args(parser: argparse.ArgumentParser, include_output_dir: bool = True) -> None: """Compatibility wrapper for analysis scripts that share the same ROI hyper-parameter choices.""" add_inference_args(parser) parser.add_argument("--glob", type=str, default="*.png", help="Image glob inside exported image-case directories") parser.add_argument("--max-images", type=int, default=0, help="Maximum number of images/frames to process; 0 means all") if include_output_dir: parser.add_argument( "--output-dir", type=str, default=str(DEFAULT_OUTPUT_ROOT), help="Visualization output directory or root", ) def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Batch run two-ROI Detect3D inference from raw_id manifests, clip lists, or video-case camera4.bin inputs." ) parser.add_argument( "--rawid-json", type=str, default="", help="Path to AEB raw_id manifest JSON produced by get_clips_of_aeb.py", ) parser.add_argument( "--clip-list-file", type=str, default=str(FILE.parent / "clips_6284.txt"), help="Path to text file: '' per line. Used when --rawid-json and --video-case-list-file are unset.", ) parser.add_argument( "--video-case-list-file", type=str, default="", help="Path to text file: '' per line.", ) parser.add_argument( "--output-root", type=str, default=str(DEFAULT_OUTPUT_ROOT), help="Root directory containing one result folder per raw_id", ) parser.add_argument( "--visualization-root", type=str, default=str(FILE.parent / "visualization_20260407"), help="Root directory for clip-list and video-case visualization outputs", ) parser.add_argument( "--export-root", type=str, default=str(FILE.parent / "clip_exports"), help="Root directory used to save decoded frames and calibration in clip-list export mode.", ) parser.add_argument( "--output-prefix", type=str, default="clip_export", help="Prefix used in each clip export directory name in clip-list export mode.", ) parser.add_argument( "--camera-topic", type=str, default="camera4", help="Camera topic name in mcap", ) parser.add_argument( "--max-frames-per-clip", type=int, default=0, help="Max decoded frames per clip; 0 means all", ) parser.add_argument( "--calib-file", type=str, default="", help="Optional override path to camera4.json for all clips", ) parser.add_argument("--skip-done", action="store_true", help="Skip raw_ids and clips already marked done") parser.add_argument("--limit-rawids", type=int, default=0, help="Limit number of raw_ids; 0 means all") parser.add_argument("--limit-clips", type=int, default=0, help="Limit clip-list/video-case tasks; 0 means all") parser.add_argument( "--limit-clips-per-rawid", type=int, default=0, help="Limit number of clips per raw_id; 0 means all", ) parser.add_argument("--video-stride", type=int, default=1, help="Read every Nth frame from camera4.bin inputs in video-case mode") parser.add_argument("--glob", type=str, default="*.png", help="Image glob inside exported image-case directories") parser.add_argument("--max-images", type=int, default=0, help="Maximum number of images/frames to process; 0 means all") add_inference_args(parser) return parser.parse_args(argv) def parse_rawid_tasks(rawid_json_path: str) -> list[RawIDTask]: with open(rawid_json_path, "r", encoding="utf-8") as file: payload = json.load(file) if not isinstance(payload, dict): raise ValueError(f"输入 JSON 顶层必须是 dict,实际: {type(payload).__name__}") grouped = payload.get("scenarios", payload) if not isinstance(grouped, dict): raise ValueError("输入 JSON 的 scenarios 字段必须是 dict") task_map: dict[str, RawIDTask] = {} for scenario_key, records in grouped.items(): for record in records: raw_id = _to_str(record.get("rawid")) if not raw_id: continue raw_clips = record.get("clips", []) if not isinstance(raw_clips, list): raise ValueError(f"raw_id={raw_id} 的 clips 字段必须是 list,实际: {type(raw_clips).__name__}") clips = tuple( clip for clip in dict.fromkeys(_to_str(item) for item in raw_clips).keys() if clip ) if not clips: continue scenario_name = _to_str(record.get("场景")) or _to_str(scenario_key.split("-")[0]) or scenario_key cve_data = _normalize_cve_timestamp(_to_str(record.get("CVE数据"))) candidate = RawIDTask( scenario_key=scenario_key, scenario_name=scenario_name, raw_id=raw_id, cve_data=cve_data, offset=_to_str(record.get("偏置")), target_speed=_to_str(record.get("目标速度")), ego_speed=_to_str(record.get("自车速度")), clips=clips, ) existing = task_map.get(raw_id) if existing is None: task_map[raw_id] = candidate continue merged_clips = tuple(dict.fromkeys([*existing.clips, *candidate.clips])) merged_cve = max(filter(None, [existing.cve_data, candidate.cve_data]), default=None) task_map[raw_id] = RawIDTask( scenario_key=existing.scenario_key, scenario_name=existing.scenario_name, raw_id=raw_id, cve_data=merged_cve, offset=existing.offset, target_speed=existing.target_speed, ego_speed=existing.ego_speed, clips=merged_clips, ) return list(task_map.values()) def save_json(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as file: json.dump(payload, file, indent=2, ensure_ascii=False) def append_error_log(error_log: Path, task_id: str, message: str) -> None: error_log.parent.mkdir(parents=True, exist_ok=True) with error_log.open("a", encoding="utf-8") as file: file.write(f"[{task_id}] {message}\n{traceback.format_exc()}\n") def build_rawid_output_dir(output_root: Path, rawid_task: RawIDTask) -> Path: scenario_part = _sanitize_path_component(rawid_task.scenario_key or rawid_task.scenario_name) rawid_part = _sanitize_path_component(rawid_task.raw_id) return output_root / f"{scenario_part}__{rawid_part}" def infer_input_mode(args: argparse.Namespace) -> str: if args.video_case_list_file: return "video_case" if args.rawid_json: return "rawid" return "clip" def _load_clip_export_tools(): from tools.pdcl_inference.export_mcap_frames_by_clip_id import ( build_case_dir_name, export_one_clip, parse_clip_list, save_run_manifest, validate_pdcl_auth_env, ) return build_case_dir_name, export_one_clip, parse_clip_list, save_run_manifest, validate_pdcl_auth_env def parse_video_case_list(video_case_list_file: str) -> list[VideoCaseTask]: tasks: list[VideoCaseTask] = [] seen_case_dirs: set[str] = set() with open(video_case_list_file, "r", encoding="utf-8") as file: for line in file: line = line.strip() if not line or line.startswith("#"): continue input_path = line.split()[0] try: case_dir, video_path, calib_path = resolve_video_case_paths(input_path) except Exception as exc: print(f"Skip video_case={input_path}: {type(exc).__name__}: {exc}") continue case_dir_str = str(case_dir) if case_dir_str in seen_case_dirs: print(f"Skip duplicate video_case={input_path}: resolved case {case_dir_str}") continue seen_case_dirs.add(case_dir_str) tasks.append( VideoCaseTask( input_path=input_path, case_dir=case_dir_str, video_path=str(video_path), calib_path=str(calib_path), output_rel_dir=str(build_case_output_rel_dir(case_dir)), ) ) return tasks def has_reusable_exported_case(case_dir: Path) -> bool: images_dir = case_dir / "images" calib_path = case_dir / "calib" / "L2_calib" / "camera4.json" if not images_dir.is_dir() or not calib_path.is_file(): return False return any(path.is_file() for path in images_dir.iterdir()) def save_task_batch_manifest(args: argparse.Namespace, tasks, input_mode: str) -> None: manifest_path = Path(args.visualization_root) / "_status" / "run_manifest.json" payload = { "input_mode": input_mode, "visualization_root": args.visualization_root, "inference_config": args.inference_config, "inference_batch_size": args.inference_batch_size, "edge_yaw_max_lateral_dist_m": args.edge_yaw_max_lateral_dist, "roi_models": { "roi0": args.roi0_model, "roi1": args.roi1_model, }, "num_tasks": len(tasks), } if input_mode == "clip": payload.update( { "clip_list_file": args.clip_list_file, "export_root": args.export_root, "camera_topic": args.camera_topic, "max_frames_per_clip": args.max_frames_per_clip, "tasks": [ { "task_id": task.task_id, "clip_uuid": task.clip_uuid, "date_name": task.date_name, "vehicle_name": task.vehicle_name, "clip_path": task.clip_path, } for task in tasks ], } ) else: payload.update( { "video_case_list_file": args.video_case_list_file, "video_stride": args.video_stride, "max_images": args.max_images, "tasks": [ { "task_id": task.task_id, "input_path": task.input_path, "case_dir": task.case_dir, "video_path": task.video_path, "calib_path": task.calib_path, "output_rel_dir": task.output_rel_dir, } for task in tasks ], } ) save_json(manifest_path, payload) def record_task_failure(infer_status, visualization_root: str, task_id: str, exc: Exception) -> None: message = f"{type(exc).__name__}: {exc}" infer_status.mark_failed(task_id, message) append_error_log(Path(visualization_root) / "_status" / "errors.log", task_id, message) def save_batch_manifest(args: argparse.Namespace, rawid_tasks: list[RawIDTask]) -> None: manifest_path = Path(args.output_root) / "_status" / "run_manifest.json" payload = { "rawid_json": args.rawid_json, "output_root": args.output_root, "inference_config": args.inference_config, "rawids_ordered_by": "cve_data_ascending", "clips_ordered_by": "natural_clip_list_order_from_raw_id_manifest", "camera_topic": args.camera_topic, "max_frames_per_clip": args.max_frames_per_clip, "limit_rawids": args.limit_rawids, "limit_clips_per_rawid": args.limit_clips_per_rawid, "inference_batch_size": args.inference_batch_size, "edge_yaw_max_lateral_dist_m": args.edge_yaw_max_lateral_dist, "roi_models": { "roi0": args.roi0_model, "roi1": args.roi1_model, }, "num_rawids": len(rawid_tasks), "rawids": [ { "raw_id": task.raw_id, "scenario_key": task.scenario_key, "cve_data": task.cve_data, "num_clips": len(task.clips), "clips": list(task.clips), } for task in rawid_tasks ], } save_json(manifest_path, payload) def save_rawid_manifest(rawid_dir: Path, rawid_task: RawIDTask, clip_results: list[dict[str, Any]]) -> None: success_clips = sum(1 for item in clip_results if item.get("status") in {"done", "skipped_done"}) failed_clips = sum(1 for item in clip_results if item.get("status") == "failed") payload = { "raw_id": rawid_task.raw_id, "scenario_key": rawid_task.scenario_key, "scenario_name": rawid_task.scenario_name, "cve_data": rawid_task.cve_data, "偏置": rawid_task.offset, "目标速度": rawid_task.target_speed, "自车速度": rawid_task.ego_speed, "num_clips": len(rawid_task.clips), "clips_ordered_by": "natural_clip_list_order_from_raw_id_manifest", "ordered_clips": list(rawid_task.clips), "predictions_path": str(rawid_dir / "predictions.json"), "success_clips": success_clips, "failed_clips": failed_clips, "clips": clip_results, } save_json(rawid_dir / "rawid_manifest.json", payload) def build_pdcl_rawid_diagnostic(rawid_task: RawIDTask, clip_results: list[dict[str, Any]]) -> Optional[str]: if not clip_results: return None if not all("clip id not exist" in str(item.get("detail", "")) for item in clip_results): return None try: from pdcl_dss import Raw with Raw(rawid_task.raw_id) as raw: current_clips = raw.list_clip_ukeys() except Exception as exc: return ( f"All manifest clips are missing from PDCL, and raw_id lookup also failed: " f"{type(exc).__name__}: {exc}. The AEB manifest is likely stale or was generated " "against a different PDCL environment/namespace." ) current_clip_set = set(current_clips) manifest_clip_set = set(rawid_task.clips) if current_clip_set == manifest_clip_set: return None return ( f"All manifest clips are missing from PDCL. Current raw_id lookup returns " f"{len(current_clips)} clips and overlaps {len(current_clip_set & manifest_clip_set)}/" f"{len(manifest_clip_set)} manifest clips; regenerate the AEB clips manifest before rerunning." ) def run_one_clip_inference( rawid_task: RawIDTask, clip_uuid: str, clip_index: int, args: argparse.Namespace, context, rawid_dir: Path, predictions_payload: dict[str, Any], frame_index_offset: int, clip_status, error_log: Path, ) -> dict[str, Any]: from tools.pdcl_inference.export_mcap_frames_by_clip_id import ( build_calib_summary, iter_decoded_clip_frames, load_calib_payload_for_clip, ) from tools.pdcl_inference.pdcl_clip_service import PDCLClipService from tools.pdcl_inference.two_roi_inference import ( append_image_stream_inference, camera4_payload_to_raw_calib, ) clip_task_id = f"{rawid_task.raw_id}:{clip_uuid}" predictions_path = rawid_dir / "predictions.json" clip_status.mark_running(clip_task_id, f"resolving clip path for raw_id={rawid_task.raw_id}") try: clip_path = PDCLClipService.get_clip_path(clip_uuid) if clip_path is None: raise FileNotFoundError(f"clip_uuid={clip_uuid} 无法解析到本地 mcap 路径") calib_payload, calib_source = load_calib_payload_for_clip( clip_path=clip_path, calib_file_override=args.calib_file, ) raw_calib = camera4_payload_to_raw_calib(calib_payload) calib_summary = build_calib_summary(calib_payload) num_frames = append_image_stream_inference( context=context, frames=iter_decoded_clip_frames( clip_uuid=clip_uuid, clip_path=clip_path, camera_topic=args.camera_topic, max_frames=args.max_frames_per_clip, ), raw_calib=raw_calib, output_dir=rawid_dir, predictions_payload=predictions_payload, frame_index_offset=frame_index_offset, frame_name_prefix=f"{clip_index:03d}", ) for frame_payload in predictions_payload["frames"][frame_index_offset : frame_index_offset + num_frames]: frame_payload["clip_uuid"] = clip_uuid frame_payload["clip_index_within_rawid"] = clip_index save_json(predictions_path, predictions_payload) clip_status.mark_done(clip_task_id, f"frames={num_frames} output={rawid_dir}") return { "clip_uuid": clip_uuid, "status": "done", "clip_path": clip_path, "clip_index_within_rawid": clip_index, "num_frames": num_frames, "frame_index_start": frame_index_offset, "frame_index_end": frame_index_offset + max(0, num_frames - 1), "output_dir": str(rawid_dir), "predictions_path": str(predictions_path), "calib_source": calib_source, "calib_summary": calib_summary, } except Exception as exc: message = f"{type(exc).__name__}: {exc}" clip_status.mark_failed(clip_task_id, message) append_error_log(error_log, clip_task_id, message) return { "clip_uuid": clip_uuid, "status": "failed", "clip_index_within_rawid": clip_index, "detail": message, "output_dir": str(rawid_dir), } def run_one_rawid( rawid_task: RawIDTask, args: argparse.Namespace, context, rawid_status, clip_status, error_log: Path, ) -> tuple[int, int]: rawid_dir = build_rawid_output_dir(Path(args.output_root), rawid_task) if args.skip_done and rawid_status.is_done(rawid_task.task_id): info = rawid_status.get(rawid_task.task_id) or {} print(f" -> skip done: {info.get('detail', '')}") return 0, 0 from tools.pdcl_inference.two_roi_inference import create_predictions_payload rawid_dir.mkdir(parents=True, exist_ok=True) rawid_status.mark_running(rawid_task.task_id, f"running {len(rawid_task.clips)} clips") clip_results = [] success = 0 fail = 0 total_frames = 0 predictions_payload = create_predictions_payload( context=context, case_name=rawid_task.raw_id, source_info={ "raw_id": rawid_task.raw_id, "scenario_key": rawid_task.scenario_key, "scenario_name": rawid_task.scenario_name, "cve_data": rawid_task.cve_data, "camera_topic": args.camera_topic, "clips_ordered_by": "natural_clip_list_order_from_raw_id_manifest", "ordered_clips": list(rawid_task.clips), }, ) clips = list(rawid_task.clips) if args.limit_clips_per_rawid > 0: clips = clips[: args.limit_clips_per_rawid] if not clips: rawid_status.mark_failed(rawid_task.task_id, "no clips to process") save_rawid_manifest(rawid_dir, rawid_task, clip_results) return 0, 1 for clip_index, clip_uuid in enumerate(clips, start=1): print(f" [{clip_index}/{len(clips)}] clip_uuid={clip_uuid}") clip_result = run_one_clip_inference( rawid_task=rawid_task, clip_uuid=clip_uuid, clip_index=clip_index, args=args, context=context, rawid_dir=rawid_dir, predictions_payload=predictions_payload, frame_index_offset=total_frames, clip_status=clip_status, error_log=error_log, ) clip_results.append(clip_result) if clip_result["status"] in {"done", "skipped_done"}: success += 1 total_frames += int(clip_result.get("num_frames", 0)) detail = clip_result.get("detail") or clip_result.get("output_dir", "") print(f" -> {clip_result['status']}: {detail}") else: fail += 1 print(f" -> failed: {clip_result.get('detail', '')}") save_rawid_manifest(rawid_dir, rawid_task, clip_results) diagnostic = build_pdcl_rawid_diagnostic(rawid_task, clip_results) if diagnostic: print(f" -> diagnostic: {diagnostic}") rawid_status.mark_failed(rawid_task.task_id, diagnostic) return success, fail if fail > 0: rawid_status.mark_failed(rawid_task.task_id, f"{fail}/{len(clips)} clips failed") else: rawid_status.mark_done(rawid_task.task_id, str(rawid_dir)) return success, fail def run_clip_batch(args: argparse.Namespace, context) -> None: build_case_dir_name, export_one_clip, parse_clip_list, save_run_manifest, validate_pdcl_auth_env = _load_clip_export_tools() from tools.pdcl_inference.status_store import StatusStore validate_pdcl_auth_env() args.output_root = args.export_root Path(args.export_root).mkdir(parents=True, exist_ok=True) Path(args.visualization_root).mkdir(parents=True, exist_ok=True) clip_tasks = parse_clip_list(args.clip_list_file) if args.limit_clips > 0: clip_tasks = clip_tasks[: args.limit_clips] if not clip_tasks: print("No clip tasks discovered. Exit.") return export_status = StatusStore(Path(args.export_root) / "_status" / "task_status.json") infer_status = StatusStore(Path(args.visualization_root) / "_status" / "task_status.json") save_run_manifest(args, clip_tasks) save_task_batch_manifest(args, clip_tasks, input_mode="clip") print(f"Discovered {len(clip_tasks)} clip tasks.") print(f"Export root: {args.export_root}") print(f"Visualization root: {args.visualization_root}") success = 0 fail = 0 for index, clip_task in enumerate(clip_tasks, start=1): task_id = clip_task.task_id if args.skip_done and infer_status.is_done(task_id): info = infer_status.get(task_id) or {} print(f"[{index}/{len(clip_tasks)}] clip_id={task_id} -> skip done: {info.get('detail', '')}") continue print(f"[{index}/{len(clip_tasks)}] clip_id={task_id} source={clip_task.clip_path}") infer_status.mark_running(task_id, "exporting clip and running two-roi inference") try: case_dir = Path(args.export_root) / build_case_dir_name(args.output_prefix, clip_task) if has_reusable_exported_case(case_dir): export_status.mark_done(task_id, f"reuse existing export: {case_dir}") print(f" -> reuse exported clip data from {case_dir}") else: export_result = export_one_clip(clip_task, args, export_status) if not export_result.success or not export_result.output_dir: raise RuntimeError(export_result.message) case_dir = Path(export_result.output_dir) from tools.pdcl_inference.two_roi_inference import run_case_inference vis_dir = Path(args.visualization_root) / case_dir.name infer_result = run_case_inference( context=context, case_dir=str(case_dir), output_dir=vis_dir, glob_pattern=args.glob, max_images=args.max_images, ) infer_status.mark_done(task_id, infer_result["output_dir"]) success += 1 print(f" -> saved {infer_result['num_frames']} frames to {infer_result['output_dir']}") except Exception as exc: fail += 1 record_task_failure(infer_status, args.visualization_root, task_id, exc) print(f" -> failed: {type(exc).__name__}: {exc}") print("\nBatch inference done.") print(f"success={success}, fail={fail}") print(f"infer_status_summary={infer_status.summary()}") def run_video_case_batch(args: argparse.Namespace, context) -> None: from tools.pdcl_inference.status_store import StatusStore from tools.pdcl_inference.two_roi_inference import run_video_case_inference Path(args.visualization_root).mkdir(parents=True, exist_ok=True) video_case_tasks = parse_video_case_list(args.video_case_list_file) if args.limit_clips > 0: video_case_tasks = video_case_tasks[: args.limit_clips] if not video_case_tasks: print("No video-case tasks discovered. Exit.") return infer_status = StatusStore(Path(args.visualization_root) / "_status" / "task_status.json") save_task_batch_manifest(args, video_case_tasks, input_mode="video_case") print(f"Discovered {len(video_case_tasks)} video-case tasks.") print(f"Visualization root: {args.visualization_root}") success = 0 fail = 0 for index, task in enumerate(video_case_tasks, start=1): task_id = task.task_id if args.skip_done and infer_status.is_done(task_id): info = infer_status.get(task_id) or {} print(f"[{index}/{len(video_case_tasks)}] case={task.case_name} -> skip done: {info.get('detail', '')}") continue print(f"[{index}/{len(video_case_tasks)}] case={task.case_name} source={task.input_path}") infer_status.mark_running(task_id, f"running video-case inference source={task.video_path}") try: vis_dir = Path(args.visualization_root) / Path(task.output_rel_dir) infer_result = run_video_case_inference( context=context, video_case_dir=task.case_dir, output_dir=vis_dir, max_images=args.max_images, video_stride=args.video_stride, ) infer_status.mark_done(task_id, infer_result["output_dir"]) success += 1 print(f" -> saved {infer_result['num_frames']} frames to {infer_result['output_dir']}") except Exception as exc: fail += 1 record_task_failure(infer_status, args.visualization_root, task_id, exc) print(f" -> failed: {type(exc).__name__}: {exc}") print("\nBatch inference done.") print(f"success={success}, fail={fail}") print(f"infer_status_summary={infer_status.summary()}") def run_rawid_batch(args: argparse.Namespace, context) -> None: from tools.pdcl_inference.status_store import StatusStore output_root = Path(args.output_root) output_root.mkdir(parents=True, exist_ok=True) rawid_tasks = parse_rawid_tasks(args.rawid_json) rawid_tasks = sorted( rawid_tasks, key=lambda task: (task.cve_data or "", task.raw_id), ) if args.limit_rawids > 0: rawid_tasks = rawid_tasks[: args.limit_rawids] if not rawid_tasks: print("No raw_id tasks discovered. Exit.") return rawid_status = StatusStore(output_root / "_status" / "rawid_status.json") clip_status = StatusStore(output_root / "_status" / "clip_status.json") error_log = output_root / "_status" / "errors.log" save_batch_manifest(args, rawid_tasks) context = build_two_roi_inference_context_from_args(args) print(f"Discovered {len(rawid_tasks)} raw_ids.") print(f"Output root: {output_root}") total_success = 0 total_fail = 0 for index, rawid_task in enumerate(rawid_tasks, start=1): print( f"[{index}/{len(rawid_tasks)}] raw_id={rawid_task.raw_id} " f"scenario={rawid_task.scenario_key} clips={len(rawid_task.clips)} cve={rawid_task.cve_data or 'n/a'}" ) success, fail = run_one_rawid( rawid_task=rawid_task, args=args, context=context, rawid_status=rawid_status, clip_status=clip_status, error_log=error_log, ) total_success += success total_fail += fail print("\nBatch inference done.") print(f"clip_success={total_success}, clip_fail={total_fail}") print(f"rawid_status_summary={rawid_status.summary()}") print(f"clip_status_summary={clip_status.summary()}") def main(argv: Optional[list[str]] = None) -> None: args = parse_args(argv) populate_two_roi_inference_args(args) load_dotenv() mode = infer_input_mode(args) context = build_two_roi_inference_context_from_args(args) if mode == "rawid": from tools.pdcl_inference.export_mcap_frames_by_clip_id import validate_pdcl_auth_env validate_pdcl_auth_env() run_rawid_batch(args, context) elif mode == "video_case": run_video_case_batch(args, context) else: run_clip_batch(args, context) if __name__ == "__main__": main()