1088 lines
42 KiB
Python
Executable File
1088 lines
42 KiB
Python
Executable File
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import sys
|
||
import traceback
|
||
from pathlib import Path
|
||
from typing import Any, Optional
|
||
|
||
FILE = Path(__file__).resolve()
|
||
ROOT = FILE.parents[2]
|
||
if str(ROOT) not in sys.path:
|
||
sys.path.append(str(ROOT))
|
||
|
||
try:
|
||
from dotenv import load_dotenv
|
||
except ImportError:
|
||
def load_dotenv(*args, **kwargs):
|
||
return False
|
||
|
||
try:
|
||
import yaml
|
||
except ImportError:
|
||
yaml = None
|
||
from tools.model_inference.adapters.video_dir_inference_utils import (
|
||
build_case_output_rel_dir,
|
||
resolve_video_case_paths,
|
||
)
|
||
from tools.pdcl_inference.pipeline_types import RawIDTask, VideoCaseTask
|
||
|
||
|
||
DEFAULT_OUTPUT_ROOT = FILE.parent / "visualization_by_rawid"
|
||
DEFAULT_INFERENCE_CONFIG_PATH = FILE.parent / "configs" / "two_roi_inference.yaml"
|
||
REQUIRED_ROI_NAMES = ("roi0", "roi1")
|
||
DEFAULT_SHARED_INFERENCE_CONFIG = {
|
||
"edge_yaw_max_lateral_dist": 30.0,
|
||
"inference_batch_size": 1,
|
||
}
|
||
|
||
|
||
def _to_str(value: Any) -> Optional[str]:
|
||
if value is None:
|
||
return None
|
||
text = str(value).strip()
|
||
return text or None
|
||
|
||
|
||
def _normalize_cve_timestamp(value: Optional[str]) -> Optional[str]:
|
||
if not value:
|
||
return None
|
||
digits = re.sub(r"\D", "", value)
|
||
if len(digits) >= 14:
|
||
return digits[:14]
|
||
if len(digits) >= 8:
|
||
return digits[:8]
|
||
return None
|
||
|
||
|
||
def _sanitize_path_component(value: Optional[str]) -> str:
|
||
text = _to_str(value) or "unknown"
|
||
text = re.sub(r"[\\/:*?\"<>|]+", "_", text)
|
||
text = re.sub(r"\s+", "_", text)
|
||
text = re.sub(r"_+", "_", text).strip("._")
|
||
return text or "unknown"
|
||
|
||
|
||
def _load_yaml_if_present(path: str) -> dict[str, Any]:
|
||
if not path:
|
||
return {}
|
||
if yaml is None:
|
||
raise ImportError("PyYAML is required when using --roi0-data/--roi1-data. Please install: pip install pyyaml")
|
||
yaml_path = Path(path)
|
||
if not yaml_path.exists():
|
||
return {}
|
||
with yaml_path.open("r", encoding="utf-8") as file:
|
||
return yaml.safe_load(file) or {}
|
||
|
||
|
||
def _load_yaml_required(path: str | Path) -> dict[str, Any]:
|
||
if yaml is None:
|
||
raise ImportError("PyYAML is required for loading the shared two-ROI inference config. Please install: pip install pyyaml")
|
||
yaml_path = Path(path)
|
||
if not yaml_path.is_file():
|
||
raise FileNotFoundError(f"Inference config file not found: {yaml_path}")
|
||
with yaml_path.open("r", encoding="utf-8") as file:
|
||
return yaml.safe_load(file) or {}
|
||
|
||
|
||
def _coerce_imgsz_arg(values: Optional[list[int]]) -> Optional[tuple[int, int]]:
|
||
if not values:
|
||
return None
|
||
if len(values) != 2:
|
||
raise ValueError(f"Expected imgsz as two integers, got: {values}")
|
||
return int(values[0]), int(values[1])
|
||
|
||
|
||
def _coerce_imgsz_config(value: Any) -> Optional[tuple[int, int]]:
|
||
if value in (None, "", []):
|
||
return None
|
||
if isinstance(value, (list, tuple)):
|
||
return _coerce_imgsz_arg([int(item) for item in value])
|
||
text = str(value).strip()
|
||
if not text:
|
||
return None
|
||
return _coerce_imgsz_arg([int(part.strip()) for part in text.split(",") if part.strip()])
|
||
|
||
|
||
def _resolve_config_path_value(raw_value: Any, config_path: Path) -> str:
|
||
text = _to_str(raw_value)
|
||
if text is None:
|
||
return ""
|
||
candidate = Path(text)
|
||
if candidate.is_absolute():
|
||
return str(candidate.resolve())
|
||
if text.startswith("./") or text.startswith("../"):
|
||
return str((config_path.parent / candidate).resolve())
|
||
return str((ROOT / candidate).resolve())
|
||
|
||
|
||
def load_two_roi_inference_config(config_path: str | Path) -> dict[str, Any]:
|
||
resolved_config_path = Path(config_path).resolve()
|
||
payload = _load_yaml_required(resolved_config_path)
|
||
if not isinstance(payload, dict):
|
||
raise ValueError(
|
||
f"Two-ROI inference config must be a YAML mapping, got {type(payload).__name__}: {resolved_config_path}"
|
||
)
|
||
|
||
rois_payload = payload.get("rois")
|
||
if rois_payload is None:
|
||
rois_payload = {roi_name: payload.get(roi_name) for roi_name in REQUIRED_ROI_NAMES if payload.get(roi_name) is not None}
|
||
if not isinstance(rois_payload, dict):
|
||
raise ValueError(f"`rois` in inference config must be a mapping: {resolved_config_path}")
|
||
|
||
shared_payload = payload.get("shared") or {}
|
||
if not isinstance(shared_payload, dict):
|
||
raise ValueError(f"`shared` in inference config must be a mapping: {resolved_config_path}")
|
||
|
||
normalized_rois = {}
|
||
for roi_name in REQUIRED_ROI_NAMES:
|
||
roi_payload = rois_payload.get(roi_name)
|
||
if not isinstance(roi_payload, dict):
|
||
raise ValueError(f"Missing or invalid `{roi_name}` section in inference config: {resolved_config_path}")
|
||
roi_value = roi_payload.get("roi")
|
||
if not isinstance(roi_value, (list, tuple)) or len(roi_value) != 2:
|
||
raise ValueError(f"`{roi_name}.roi` must be a 2-item list/tuple in inference config: {resolved_config_path}")
|
||
crop_center_mode = _to_str(roi_payload.get("crop_center_mode"))
|
||
if crop_center_mode not in {"cxvy", "vxvy"}:
|
||
raise ValueError(
|
||
f"`{roi_name}.crop_center_mode` must be one of ['cxvy', 'vxvy'] in inference config: {resolved_config_path}"
|
||
)
|
||
model_path = _resolve_config_path_value(roi_payload.get("model"), resolved_config_path)
|
||
if not model_path:
|
||
raise ValueError(f"`{roi_name}.model` is required in inference config: {resolved_config_path}")
|
||
normalized_rois[roi_name] = {
|
||
"model": model_path,
|
||
"data": _resolve_config_path_value(roi_payload.get("data"), resolved_config_path),
|
||
"roi": (int(roi_value[0]), int(roi_value[1])),
|
||
"crop_center_mode": crop_center_mode,
|
||
"virtual_fx": float(roi_payload.get("virtual_fx")),
|
||
"imgsz": _coerce_imgsz_config(roi_payload.get("imgsz")),
|
||
"conf": float(roi_payload.get("conf")),
|
||
"max_det": int(roi_payload.get("max_det")),
|
||
}
|
||
|
||
return {
|
||
"config_path": str(resolved_config_path),
|
||
"shared": {
|
||
"edge_yaw_max_lateral_dist": float(
|
||
shared_payload.get("edge_yaw_max_lateral_dist", DEFAULT_SHARED_INFERENCE_CONFIG["edge_yaw_max_lateral_dist"])
|
||
),
|
||
"inference_batch_size": max(
|
||
1, int(shared_payload.get("inference_batch_size", DEFAULT_SHARED_INFERENCE_CONFIG["inference_batch_size"]))
|
||
),
|
||
},
|
||
"rois": normalized_rois,
|
||
}
|
||
|
||
|
||
def populate_two_roi_inference_args(args: argparse.Namespace) -> dict[str, Any]:
|
||
config_payload = load_two_roi_inference_config(getattr(args, "inference_config", DEFAULT_INFERENCE_CONFIG_PATH))
|
||
args.inference_config = config_payload["config_path"]
|
||
|
||
if getattr(args, "edge_yaw_max_lateral_dist", None) is None:
|
||
args.edge_yaw_max_lateral_dist = float(config_payload["shared"]["edge_yaw_max_lateral_dist"])
|
||
if getattr(args, "inference_batch_size", None) is None:
|
||
args.inference_batch_size = int(config_payload["shared"]["inference_batch_size"])
|
||
|
||
for roi_name in REQUIRED_ROI_NAMES:
|
||
roi_config = config_payload["rois"][roi_name]
|
||
if _to_str(getattr(args, f"{roi_name}_model", None)) is None:
|
||
setattr(args, f"{roi_name}_model", str(roi_config["model"]))
|
||
if _to_str(getattr(args, f"{roi_name}_data", None)) is None:
|
||
setattr(args, f"{roi_name}_data", str(roi_config["data"]))
|
||
if getattr(args, f"{roi_name}_roi", None) is None:
|
||
setattr(args, f"{roi_name}_roi", tuple(int(value) for value in roi_config["roi"]))
|
||
if getattr(args, f"{roi_name}_crop_center_mode", None) is None:
|
||
setattr(args, f"{roi_name}_crop_center_mode", str(roi_config["crop_center_mode"]))
|
||
if getattr(args, f"{roi_name}_virtual_fx", None) is None:
|
||
setattr(args, f"{roi_name}_virtual_fx", float(roi_config["virtual_fx"]))
|
||
if getattr(args, f"{roi_name}_imgsz", None) is None:
|
||
setattr(args, f"{roi_name}_imgsz", roi_config["imgsz"])
|
||
if getattr(args, f"{roi_name}_conf", None) is None:
|
||
setattr(args, f"{roi_name}_conf", float(roi_config["conf"]))
|
||
if getattr(args, f"{roi_name}_max_det", None) is None:
|
||
setattr(args, f"{roi_name}_max_det", int(roi_config["max_det"]))
|
||
|
||
return config_payload
|
||
|
||
|
||
def build_roi_specs_from_args(args: argparse.Namespace):
|
||
from tools.pdcl_inference.two_roi_inference import ROIModelSpec
|
||
|
||
populate_two_roi_inference_args(args)
|
||
roi_specs = []
|
||
for roi_name in REQUIRED_ROI_NAMES:
|
||
data_cfg = _load_yaml_if_present(getattr(args, f"{roi_name}_data"))
|
||
|
||
roi_arg = tuple(int(value) for value in getattr(args, f"{roi_name}_roi"))
|
||
crop_mode_arg = str(getattr(args, f"{roi_name}_crop_center_mode"))
|
||
virtual_fx_arg = float(getattr(args, f"{roi_name}_virtual_fx"))
|
||
imgsz_arg = _coerce_imgsz_arg(getattr(args, f"{roi_name}_imgsz"))
|
||
|
||
resolved_roi = tuple(int(value) for value in data_cfg.get("roi", roi_arg))
|
||
resolved_crop_mode = str(data_cfg.get("crop_center_mode", crop_mode_arg))
|
||
resolved_virtual_fx = float(data_cfg.get("virtual_fx", virtual_fx_arg))
|
||
|
||
roi_specs.append(
|
||
ROIModelSpec(
|
||
name=roi_name.upper(),
|
||
model_path=str(getattr(args, f"{roi_name}_model")),
|
||
roi_size=resolved_roi,
|
||
crop_center_mode=resolved_crop_mode,
|
||
virtual_fx=resolved_virtual_fx,
|
||
imgsz=imgsz_arg,
|
||
conf=float(getattr(args, f"{roi_name}_conf")),
|
||
max_det=int(getattr(args, f"{roi_name}_max_det")),
|
||
)
|
||
)
|
||
return roi_specs
|
||
|
||
|
||
def build_two_roi_inference_context_from_args(args: argparse.Namespace, requested_rois: Optional[set[str]] = None):
|
||
from tools.pdcl_inference.two_roi_inference import build_inference_context
|
||
|
||
populate_two_roi_inference_args(args)
|
||
roi_specs = build_roi_specs_from_args(args)
|
||
if requested_rois is not None:
|
||
requested = {str(roi_name).lower() for roi_name in requested_rois}
|
||
roi_specs = [spec for spec in roi_specs if spec.name.lower() in requested]
|
||
return build_inference_context(
|
||
roi_specs=roi_specs,
|
||
device=args.device,
|
||
half=args.half,
|
||
classes=args.classes,
|
||
edge_yaw_max_lateral_dist_m=args.edge_yaw_max_lateral_dist,
|
||
inference_batch_size=getattr(args, "inference_batch_size", 1),
|
||
)
|
||
|
||
|
||
def add_inference_args(parser: argparse.ArgumentParser) -> None:
|
||
parser.add_argument(
|
||
"--inference-config",
|
||
type=str,
|
||
default=str(DEFAULT_INFERENCE_CONFIG_PATH),
|
||
help="YAML file containing shared two-ROI inference defaults",
|
||
)
|
||
for roi_name in REQUIRED_ROI_NAMES:
|
||
parser.add_argument(
|
||
f"--{roi_name}-model",
|
||
type=str,
|
||
default=None,
|
||
help=f"{roi_name.upper()} checkpoint path (*.pt); overrides inference config",
|
||
)
|
||
parser.add_argument(
|
||
f"--{roi_name}-data",
|
||
type=str,
|
||
default=None,
|
||
help=f"Optional data YAML used to infer {roi_name.upper()} defaults; overrides inference config",
|
||
)
|
||
parser.add_argument(
|
||
f"--{roi_name}-roi",
|
||
nargs=2,
|
||
type=int,
|
||
default=None,
|
||
metavar=("W", "H"),
|
||
help=f"{roi_name.upper()} crop size before resize; overrides inference config",
|
||
)
|
||
parser.add_argument(
|
||
f"--{roi_name}-crop-center-mode",
|
||
type=str,
|
||
default=None,
|
||
choices=("cxvy", "vxvy"),
|
||
help=f"{roi_name.upper()} crop center mode; overrides inference config",
|
||
)
|
||
parser.add_argument(
|
||
f"--{roi_name}-virtual-fx",
|
||
type=float,
|
||
default=None,
|
||
help=f"{roi_name.upper()} virtual focal length; overrides inference config",
|
||
)
|
||
parser.add_argument(
|
||
f"--{roi_name}-imgsz",
|
||
nargs=2,
|
||
type=int,
|
||
default=None,
|
||
metavar=("W", "H"),
|
||
help=f"{roi_name.upper()} model input size override; overrides inference config",
|
||
)
|
||
parser.add_argument(
|
||
f"--{roi_name}-conf",
|
||
type=float,
|
||
default=None,
|
||
help=f"{roi_name.upper()} confidence threshold; overrides inference config",
|
||
)
|
||
parser.add_argument(
|
||
f"--{roi_name}-max-det",
|
||
type=int,
|
||
default=None,
|
||
help=f"{roi_name.upper()} max detections per frame; overrides inference config",
|
||
)
|
||
|
||
parser.add_argument("--device", type=str, default="0", help="Inference device, e.g. '0' or 'cpu'")
|
||
parser.add_argument("--half", action="store_true", help="Run inference in FP16 on CUDA")
|
||
parser.add_argument("--classes", nargs="*", type=int, default=None, help="Optional class-id filter")
|
||
parser.add_argument(
|
||
"--inference-batch-size",
|
||
type=int,
|
||
default=None,
|
||
help="Number of frames/images to run together per ROI model forward pass; overrides inference config",
|
||
)
|
||
parser.add_argument(
|
||
"--edge-yaw-max-lateral-dist",
|
||
type=float,
|
||
default=None,
|
||
help="Use edge-based yaw only for 2+ visible-face cases whose predicted absolute lateral distance is below this value in meters; overrides inference config",
|
||
)
|
||
|
||
|
||
def add_two_roi_inference_args(parser: argparse.ArgumentParser, include_output_dir: bool = True) -> None:
|
||
"""Compatibility wrapper for analysis scripts that share the same ROI hyper-parameter choices."""
|
||
add_inference_args(parser)
|
||
parser.add_argument("--glob", type=str, default="*.png", help="Image glob inside exported image-case directories")
|
||
parser.add_argument("--max-images", type=int, default=0, help="Maximum number of images/frames to process; 0 means all")
|
||
if include_output_dir:
|
||
parser.add_argument(
|
||
"--output-dir",
|
||
type=str,
|
||
default=str(DEFAULT_OUTPUT_ROOT),
|
||
help="Visualization output directory or root",
|
||
)
|
||
|
||
|
||
def parse_args(argv: Optional[list[str]] = None) -> argparse.Namespace:
|
||
parser = argparse.ArgumentParser(
|
||
description="Batch run two-ROI Detect3D inference from raw_id manifests, clip lists, or video-case camera4.bin inputs."
|
||
)
|
||
parser.add_argument(
|
||
"--rawid-json",
|
||
type=str,
|
||
default="",
|
||
help="Path to AEB raw_id manifest JSON produced by get_clips_of_aeb.py",
|
||
)
|
||
parser.add_argument(
|
||
"--clip-list-file",
|
||
type=str,
|
||
default=str(FILE.parent / "clips_6284.txt"),
|
||
help="Path to text file: '<clip_id>' per line. Used when --rawid-json and --video-case-list-file are unset.",
|
||
)
|
||
parser.add_argument(
|
||
"--video-case-list-file",
|
||
type=str,
|
||
default="",
|
||
help="Path to text file: '<camera4.bin|sigmastar.1|case_dir>' per line.",
|
||
)
|
||
parser.add_argument(
|
||
"--output-root",
|
||
type=str,
|
||
default=str(DEFAULT_OUTPUT_ROOT),
|
||
help="Root directory containing one result folder per raw_id",
|
||
)
|
||
parser.add_argument(
|
||
"--visualization-root",
|
||
type=str,
|
||
default=str(FILE.parent / "visualization_20260407"),
|
||
help="Root directory for clip-list and video-case visualization outputs",
|
||
)
|
||
parser.add_argument(
|
||
"--export-root",
|
||
type=str,
|
||
default=str(FILE.parent / "clip_exports"),
|
||
help="Root directory used to save decoded frames and calibration in clip-list export mode.",
|
||
)
|
||
parser.add_argument(
|
||
"--output-prefix",
|
||
type=str,
|
||
default="clip_export",
|
||
help="Prefix used in each clip export directory name in clip-list export mode.",
|
||
)
|
||
parser.add_argument(
|
||
"--camera-topic",
|
||
type=str,
|
||
default="camera4",
|
||
help="Camera topic name in mcap",
|
||
)
|
||
parser.add_argument(
|
||
"--max-frames-per-clip",
|
||
type=int,
|
||
default=0,
|
||
help="Max decoded frames per clip; 0 means all",
|
||
)
|
||
parser.add_argument(
|
||
"--calib-file",
|
||
type=str,
|
||
default="",
|
||
help="Optional override path to camera4.json for all clips",
|
||
)
|
||
parser.add_argument("--skip-done", action="store_true", help="Skip raw_ids and clips already marked done")
|
||
parser.add_argument("--limit-rawids", type=int, default=0, help="Limit number of raw_ids; 0 means all")
|
||
parser.add_argument("--limit-clips", type=int, default=0, help="Limit clip-list/video-case tasks; 0 means all")
|
||
parser.add_argument(
|
||
"--limit-clips-per-rawid",
|
||
type=int,
|
||
default=0,
|
||
help="Limit number of clips per raw_id; 0 means all",
|
||
)
|
||
parser.add_argument("--video-stride", type=int, default=1, help="Read every Nth frame from camera4.bin inputs in video-case mode")
|
||
parser.add_argument("--glob", type=str, default="*.png", help="Image glob inside exported image-case directories")
|
||
parser.add_argument("--max-images", type=int, default=0, help="Maximum number of images/frames to process; 0 means all")
|
||
add_inference_args(parser)
|
||
return parser.parse_args(argv)
|
||
|
||
|
||
def parse_rawid_tasks(rawid_json_path: str) -> list[RawIDTask]:
|
||
with open(rawid_json_path, "r", encoding="utf-8") as file:
|
||
payload = json.load(file)
|
||
|
||
if not isinstance(payload, dict):
|
||
raise ValueError(f"输入 JSON 顶层必须是 dict,实际: {type(payload).__name__}")
|
||
|
||
grouped = payload.get("scenarios", payload)
|
||
if not isinstance(grouped, dict):
|
||
raise ValueError("输入 JSON 的 scenarios 字段必须是 dict")
|
||
|
||
task_map: dict[str, RawIDTask] = {}
|
||
|
||
for scenario_key, records in grouped.items():
|
||
for record in records:
|
||
raw_id = _to_str(record.get("rawid"))
|
||
if not raw_id:
|
||
continue
|
||
|
||
raw_clips = record.get("clips", [])
|
||
if not isinstance(raw_clips, list):
|
||
raise ValueError(f"raw_id={raw_id} 的 clips 字段必须是 list,实际: {type(raw_clips).__name__}")
|
||
clips = tuple(
|
||
clip
|
||
for clip in dict.fromkeys(_to_str(item) for item in raw_clips).keys()
|
||
if clip
|
||
)
|
||
if not clips:
|
||
continue
|
||
|
||
scenario_name = _to_str(record.get("场景")) or _to_str(scenario_key.split("-")[0]) or scenario_key
|
||
cve_data = _normalize_cve_timestamp(_to_str(record.get("CVE数据")))
|
||
candidate = RawIDTask(
|
||
scenario_key=scenario_key,
|
||
scenario_name=scenario_name,
|
||
raw_id=raw_id,
|
||
cve_data=cve_data,
|
||
offset=_to_str(record.get("偏置")),
|
||
target_speed=_to_str(record.get("目标速度")),
|
||
ego_speed=_to_str(record.get("自车速度")),
|
||
clips=clips,
|
||
)
|
||
|
||
existing = task_map.get(raw_id)
|
||
if existing is None:
|
||
task_map[raw_id] = candidate
|
||
continue
|
||
|
||
merged_clips = tuple(dict.fromkeys([*existing.clips, *candidate.clips]))
|
||
merged_cve = max(filter(None, [existing.cve_data, candidate.cve_data]), default=None)
|
||
task_map[raw_id] = RawIDTask(
|
||
scenario_key=existing.scenario_key,
|
||
scenario_name=existing.scenario_name,
|
||
raw_id=raw_id,
|
||
cve_data=merged_cve,
|
||
offset=existing.offset,
|
||
target_speed=existing.target_speed,
|
||
ego_speed=existing.ego_speed,
|
||
clips=merged_clips,
|
||
)
|
||
|
||
return list(task_map.values())
|
||
|
||
|
||
def save_json(path: Path, payload: dict[str, Any]) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
with path.open("w", encoding="utf-8") as file:
|
||
json.dump(payload, file, indent=2, ensure_ascii=False)
|
||
|
||
|
||
def append_error_log(error_log: Path, task_id: str, message: str) -> None:
|
||
error_log.parent.mkdir(parents=True, exist_ok=True)
|
||
with error_log.open("a", encoding="utf-8") as file:
|
||
file.write(f"[{task_id}] {message}\n{traceback.format_exc()}\n")
|
||
|
||
|
||
def build_rawid_output_dir(output_root: Path, rawid_task: RawIDTask) -> Path:
|
||
scenario_part = _sanitize_path_component(rawid_task.scenario_key or rawid_task.scenario_name)
|
||
rawid_part = _sanitize_path_component(rawid_task.raw_id)
|
||
return output_root / f"{scenario_part}__{rawid_part}"
|
||
|
||
|
||
def infer_input_mode(args: argparse.Namespace) -> str:
|
||
if args.video_case_list_file:
|
||
return "video_case"
|
||
if args.rawid_json:
|
||
return "rawid"
|
||
return "clip"
|
||
|
||
|
||
def _load_clip_export_tools():
|
||
from tools.pdcl_inference.export_mcap_frames_by_clip_id import (
|
||
build_case_dir_name,
|
||
export_one_clip,
|
||
parse_clip_list,
|
||
save_run_manifest,
|
||
validate_pdcl_auth_env,
|
||
)
|
||
|
||
return build_case_dir_name, export_one_clip, parse_clip_list, save_run_manifest, validate_pdcl_auth_env
|
||
|
||
|
||
def parse_video_case_list(video_case_list_file: str) -> list[VideoCaseTask]:
|
||
tasks: list[VideoCaseTask] = []
|
||
seen_case_dirs: set[str] = set()
|
||
|
||
with open(video_case_list_file, "r", encoding="utf-8") as file:
|
||
for line in file:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
|
||
input_path = line.split()[0]
|
||
try:
|
||
case_dir, video_path, calib_path = resolve_video_case_paths(input_path)
|
||
except Exception as exc:
|
||
print(f"Skip video_case={input_path}: {type(exc).__name__}: {exc}")
|
||
continue
|
||
|
||
case_dir_str = str(case_dir)
|
||
if case_dir_str in seen_case_dirs:
|
||
print(f"Skip duplicate video_case={input_path}: resolved case {case_dir_str}")
|
||
continue
|
||
|
||
seen_case_dirs.add(case_dir_str)
|
||
tasks.append(
|
||
VideoCaseTask(
|
||
input_path=input_path,
|
||
case_dir=case_dir_str,
|
||
video_path=str(video_path),
|
||
calib_path=str(calib_path),
|
||
output_rel_dir=str(build_case_output_rel_dir(case_dir)),
|
||
)
|
||
)
|
||
|
||
return tasks
|
||
|
||
|
||
def has_reusable_exported_case(case_dir: Path) -> bool:
|
||
images_dir = case_dir / "images"
|
||
calib_path = case_dir / "calib" / "L2_calib" / "camera4.json"
|
||
if not images_dir.is_dir() or not calib_path.is_file():
|
||
return False
|
||
return any(path.is_file() for path in images_dir.iterdir())
|
||
|
||
|
||
def save_task_batch_manifest(args: argparse.Namespace, tasks, input_mode: str) -> None:
|
||
manifest_path = Path(args.visualization_root) / "_status" / "run_manifest.json"
|
||
payload = {
|
||
"input_mode": input_mode,
|
||
"visualization_root": args.visualization_root,
|
||
"inference_config": args.inference_config,
|
||
"inference_batch_size": args.inference_batch_size,
|
||
"edge_yaw_max_lateral_dist_m": args.edge_yaw_max_lateral_dist,
|
||
"roi_models": {
|
||
"roi0": args.roi0_model,
|
||
"roi1": args.roi1_model,
|
||
},
|
||
"num_tasks": len(tasks),
|
||
}
|
||
|
||
if input_mode == "clip":
|
||
payload.update(
|
||
{
|
||
"clip_list_file": args.clip_list_file,
|
||
"export_root": args.export_root,
|
||
"camera_topic": args.camera_topic,
|
||
"max_frames_per_clip": args.max_frames_per_clip,
|
||
"tasks": [
|
||
{
|
||
"task_id": task.task_id,
|
||
"clip_uuid": task.clip_uuid,
|
||
"date_name": task.date_name,
|
||
"vehicle_name": task.vehicle_name,
|
||
"clip_path": task.clip_path,
|
||
}
|
||
for task in tasks
|
||
],
|
||
}
|
||
)
|
||
else:
|
||
payload.update(
|
||
{
|
||
"video_case_list_file": args.video_case_list_file,
|
||
"video_stride": args.video_stride,
|
||
"max_images": args.max_images,
|
||
"tasks": [
|
||
{
|
||
"task_id": task.task_id,
|
||
"input_path": task.input_path,
|
||
"case_dir": task.case_dir,
|
||
"video_path": task.video_path,
|
||
"calib_path": task.calib_path,
|
||
"output_rel_dir": task.output_rel_dir,
|
||
}
|
||
for task in tasks
|
||
],
|
||
}
|
||
)
|
||
|
||
save_json(manifest_path, payload)
|
||
|
||
|
||
def record_task_failure(infer_status, visualization_root: str, task_id: str, exc: Exception) -> None:
|
||
message = f"{type(exc).__name__}: {exc}"
|
||
infer_status.mark_failed(task_id, message)
|
||
append_error_log(Path(visualization_root) / "_status" / "errors.log", task_id, message)
|
||
|
||
|
||
def save_batch_manifest(args: argparse.Namespace, rawid_tasks: list[RawIDTask]) -> None:
|
||
manifest_path = Path(args.output_root) / "_status" / "run_manifest.json"
|
||
payload = {
|
||
"rawid_json": args.rawid_json,
|
||
"output_root": args.output_root,
|
||
"inference_config": args.inference_config,
|
||
"rawids_ordered_by": "cve_data_ascending",
|
||
"clips_ordered_by": "natural_clip_list_order_from_raw_id_manifest",
|
||
"camera_topic": args.camera_topic,
|
||
"max_frames_per_clip": args.max_frames_per_clip,
|
||
"limit_rawids": args.limit_rawids,
|
||
"limit_clips_per_rawid": args.limit_clips_per_rawid,
|
||
"inference_batch_size": args.inference_batch_size,
|
||
"edge_yaw_max_lateral_dist_m": args.edge_yaw_max_lateral_dist,
|
||
"roi_models": {
|
||
"roi0": args.roi0_model,
|
||
"roi1": args.roi1_model,
|
||
},
|
||
"num_rawids": len(rawid_tasks),
|
||
"rawids": [
|
||
{
|
||
"raw_id": task.raw_id,
|
||
"scenario_key": task.scenario_key,
|
||
"cve_data": task.cve_data,
|
||
"num_clips": len(task.clips),
|
||
"clips": list(task.clips),
|
||
}
|
||
for task in rawid_tasks
|
||
],
|
||
}
|
||
save_json(manifest_path, payload)
|
||
|
||
|
||
def save_rawid_manifest(rawid_dir: Path, rawid_task: RawIDTask, clip_results: list[dict[str, Any]]) -> None:
|
||
success_clips = sum(1 for item in clip_results if item.get("status") in {"done", "skipped_done"})
|
||
failed_clips = sum(1 for item in clip_results if item.get("status") == "failed")
|
||
payload = {
|
||
"raw_id": rawid_task.raw_id,
|
||
"scenario_key": rawid_task.scenario_key,
|
||
"scenario_name": rawid_task.scenario_name,
|
||
"cve_data": rawid_task.cve_data,
|
||
"偏置": rawid_task.offset,
|
||
"目标速度": rawid_task.target_speed,
|
||
"自车速度": rawid_task.ego_speed,
|
||
"num_clips": len(rawid_task.clips),
|
||
"clips_ordered_by": "natural_clip_list_order_from_raw_id_manifest",
|
||
"ordered_clips": list(rawid_task.clips),
|
||
"predictions_path": str(rawid_dir / "predictions.json"),
|
||
"success_clips": success_clips,
|
||
"failed_clips": failed_clips,
|
||
"clips": clip_results,
|
||
}
|
||
save_json(rawid_dir / "rawid_manifest.json", payload)
|
||
|
||
|
||
def build_pdcl_rawid_diagnostic(rawid_task: RawIDTask, clip_results: list[dict[str, Any]]) -> Optional[str]:
|
||
if not clip_results:
|
||
return None
|
||
if not all("clip id not exist" in str(item.get("detail", "")) for item in clip_results):
|
||
return None
|
||
|
||
try:
|
||
from pdcl_dss import Raw
|
||
|
||
with Raw(rawid_task.raw_id) as raw:
|
||
current_clips = raw.list_clip_ukeys()
|
||
except Exception as exc:
|
||
return (
|
||
f"All manifest clips are missing from PDCL, and raw_id lookup also failed: "
|
||
f"{type(exc).__name__}: {exc}. The AEB manifest is likely stale or was generated "
|
||
"against a different PDCL environment/namespace."
|
||
)
|
||
|
||
current_clip_set = set(current_clips)
|
||
manifest_clip_set = set(rawid_task.clips)
|
||
if current_clip_set == manifest_clip_set:
|
||
return None
|
||
return (
|
||
f"All manifest clips are missing from PDCL. Current raw_id lookup returns "
|
||
f"{len(current_clips)} clips and overlaps {len(current_clip_set & manifest_clip_set)}/"
|
||
f"{len(manifest_clip_set)} manifest clips; regenerate the AEB clips manifest before rerunning."
|
||
)
|
||
|
||
|
||
def run_one_clip_inference(
|
||
rawid_task: RawIDTask,
|
||
clip_uuid: str,
|
||
clip_index: int,
|
||
args: argparse.Namespace,
|
||
context,
|
||
rawid_dir: Path,
|
||
predictions_payload: dict[str, Any],
|
||
frame_index_offset: int,
|
||
clip_status,
|
||
error_log: Path,
|
||
) -> dict[str, Any]:
|
||
from tools.pdcl_inference.export_mcap_frames_by_clip_id import (
|
||
build_calib_summary,
|
||
iter_decoded_clip_frames,
|
||
load_calib_payload_for_clip,
|
||
)
|
||
from tools.pdcl_inference.pdcl_clip_service import PDCLClipService
|
||
from tools.pdcl_inference.two_roi_inference import (
|
||
append_image_stream_inference,
|
||
camera4_payload_to_raw_calib,
|
||
)
|
||
|
||
clip_task_id = f"{rawid_task.raw_id}:{clip_uuid}"
|
||
predictions_path = rawid_dir / "predictions.json"
|
||
|
||
clip_status.mark_running(clip_task_id, f"resolving clip path for raw_id={rawid_task.raw_id}")
|
||
try:
|
||
clip_path = PDCLClipService.get_clip_path(clip_uuid)
|
||
if clip_path is None:
|
||
raise FileNotFoundError(f"clip_uuid={clip_uuid} 无法解析到本地 mcap 路径")
|
||
|
||
calib_payload, calib_source = load_calib_payload_for_clip(
|
||
clip_path=clip_path,
|
||
calib_file_override=args.calib_file,
|
||
)
|
||
raw_calib = camera4_payload_to_raw_calib(calib_payload)
|
||
calib_summary = build_calib_summary(calib_payload)
|
||
num_frames = append_image_stream_inference(
|
||
context=context,
|
||
frames=iter_decoded_clip_frames(
|
||
clip_uuid=clip_uuid,
|
||
clip_path=clip_path,
|
||
camera_topic=args.camera_topic,
|
||
max_frames=args.max_frames_per_clip,
|
||
),
|
||
raw_calib=raw_calib,
|
||
output_dir=rawid_dir,
|
||
predictions_payload=predictions_payload,
|
||
frame_index_offset=frame_index_offset,
|
||
frame_name_prefix=f"{clip_index:03d}",
|
||
)
|
||
for frame_payload in predictions_payload["frames"][frame_index_offset : frame_index_offset + num_frames]:
|
||
frame_payload["clip_uuid"] = clip_uuid
|
||
frame_payload["clip_index_within_rawid"] = clip_index
|
||
|
||
save_json(predictions_path, predictions_payload)
|
||
clip_status.mark_done(clip_task_id, f"frames={num_frames} output={rawid_dir}")
|
||
return {
|
||
"clip_uuid": clip_uuid,
|
||
"status": "done",
|
||
"clip_path": clip_path,
|
||
"clip_index_within_rawid": clip_index,
|
||
"num_frames": num_frames,
|
||
"frame_index_start": frame_index_offset,
|
||
"frame_index_end": frame_index_offset + max(0, num_frames - 1),
|
||
"output_dir": str(rawid_dir),
|
||
"predictions_path": str(predictions_path),
|
||
"calib_source": calib_source,
|
||
"calib_summary": calib_summary,
|
||
}
|
||
except Exception as exc:
|
||
message = f"{type(exc).__name__}: {exc}"
|
||
clip_status.mark_failed(clip_task_id, message)
|
||
append_error_log(error_log, clip_task_id, message)
|
||
return {
|
||
"clip_uuid": clip_uuid,
|
||
"status": "failed",
|
||
"clip_index_within_rawid": clip_index,
|
||
"detail": message,
|
||
"output_dir": str(rawid_dir),
|
||
}
|
||
|
||
|
||
def run_one_rawid(
|
||
rawid_task: RawIDTask,
|
||
args: argparse.Namespace,
|
||
context,
|
||
rawid_status,
|
||
clip_status,
|
||
error_log: Path,
|
||
) -> tuple[int, int]:
|
||
rawid_dir = build_rawid_output_dir(Path(args.output_root), rawid_task)
|
||
if args.skip_done and rawid_status.is_done(rawid_task.task_id):
|
||
info = rawid_status.get(rawid_task.task_id) or {}
|
||
print(f" -> skip done: {info.get('detail', '')}")
|
||
return 0, 0
|
||
|
||
from tools.pdcl_inference.two_roi_inference import create_predictions_payload
|
||
|
||
rawid_dir.mkdir(parents=True, exist_ok=True)
|
||
rawid_status.mark_running(rawid_task.task_id, f"running {len(rawid_task.clips)} clips")
|
||
clip_results = []
|
||
success = 0
|
||
fail = 0
|
||
total_frames = 0
|
||
predictions_payload = create_predictions_payload(
|
||
context=context,
|
||
case_name=rawid_task.raw_id,
|
||
source_info={
|
||
"raw_id": rawid_task.raw_id,
|
||
"scenario_key": rawid_task.scenario_key,
|
||
"scenario_name": rawid_task.scenario_name,
|
||
"cve_data": rawid_task.cve_data,
|
||
"camera_topic": args.camera_topic,
|
||
"clips_ordered_by": "natural_clip_list_order_from_raw_id_manifest",
|
||
"ordered_clips": list(rawid_task.clips),
|
||
},
|
||
)
|
||
|
||
clips = list(rawid_task.clips)
|
||
if args.limit_clips_per_rawid > 0:
|
||
clips = clips[: args.limit_clips_per_rawid]
|
||
if not clips:
|
||
rawid_status.mark_failed(rawid_task.task_id, "no clips to process")
|
||
save_rawid_manifest(rawid_dir, rawid_task, clip_results)
|
||
return 0, 1
|
||
|
||
for clip_index, clip_uuid in enumerate(clips, start=1):
|
||
print(f" [{clip_index}/{len(clips)}] clip_uuid={clip_uuid}")
|
||
clip_result = run_one_clip_inference(
|
||
rawid_task=rawid_task,
|
||
clip_uuid=clip_uuid,
|
||
clip_index=clip_index,
|
||
args=args,
|
||
context=context,
|
||
rawid_dir=rawid_dir,
|
||
predictions_payload=predictions_payload,
|
||
frame_index_offset=total_frames,
|
||
clip_status=clip_status,
|
||
error_log=error_log,
|
||
)
|
||
clip_results.append(clip_result)
|
||
if clip_result["status"] in {"done", "skipped_done"}:
|
||
success += 1
|
||
total_frames += int(clip_result.get("num_frames", 0))
|
||
detail = clip_result.get("detail") or clip_result.get("output_dir", "")
|
||
print(f" -> {clip_result['status']}: {detail}")
|
||
else:
|
||
fail += 1
|
||
print(f" -> failed: {clip_result.get('detail', '')}")
|
||
|
||
save_rawid_manifest(rawid_dir, rawid_task, clip_results)
|
||
diagnostic = build_pdcl_rawid_diagnostic(rawid_task, clip_results)
|
||
if diagnostic:
|
||
print(f" -> diagnostic: {diagnostic}")
|
||
rawid_status.mark_failed(rawid_task.task_id, diagnostic)
|
||
return success, fail
|
||
|
||
if fail > 0:
|
||
rawid_status.mark_failed(rawid_task.task_id, f"{fail}/{len(clips)} clips failed")
|
||
else:
|
||
rawid_status.mark_done(rawid_task.task_id, str(rawid_dir))
|
||
return success, fail
|
||
|
||
|
||
def run_clip_batch(args: argparse.Namespace, context) -> None:
|
||
build_case_dir_name, export_one_clip, parse_clip_list, save_run_manifest, validate_pdcl_auth_env = _load_clip_export_tools()
|
||
from tools.pdcl_inference.status_store import StatusStore
|
||
|
||
validate_pdcl_auth_env()
|
||
args.output_root = args.export_root
|
||
|
||
Path(args.export_root).mkdir(parents=True, exist_ok=True)
|
||
Path(args.visualization_root).mkdir(parents=True, exist_ok=True)
|
||
|
||
clip_tasks = parse_clip_list(args.clip_list_file)
|
||
if args.limit_clips > 0:
|
||
clip_tasks = clip_tasks[: args.limit_clips]
|
||
if not clip_tasks:
|
||
print("No clip tasks discovered. Exit.")
|
||
return
|
||
|
||
export_status = StatusStore(Path(args.export_root) / "_status" / "task_status.json")
|
||
infer_status = StatusStore(Path(args.visualization_root) / "_status" / "task_status.json")
|
||
save_run_manifest(args, clip_tasks)
|
||
save_task_batch_manifest(args, clip_tasks, input_mode="clip")
|
||
|
||
print(f"Discovered {len(clip_tasks)} clip tasks.")
|
||
print(f"Export root: {args.export_root}")
|
||
print(f"Visualization root: {args.visualization_root}")
|
||
|
||
success = 0
|
||
fail = 0
|
||
for index, clip_task in enumerate(clip_tasks, start=1):
|
||
task_id = clip_task.task_id
|
||
if args.skip_done and infer_status.is_done(task_id):
|
||
info = infer_status.get(task_id) or {}
|
||
print(f"[{index}/{len(clip_tasks)}] clip_id={task_id} -> skip done: {info.get('detail', '')}")
|
||
continue
|
||
|
||
print(f"[{index}/{len(clip_tasks)}] clip_id={task_id} source={clip_task.clip_path}")
|
||
infer_status.mark_running(task_id, "exporting clip and running two-roi inference")
|
||
try:
|
||
case_dir = Path(args.export_root) / build_case_dir_name(args.output_prefix, clip_task)
|
||
if has_reusable_exported_case(case_dir):
|
||
export_status.mark_done(task_id, f"reuse existing export: {case_dir}")
|
||
print(f" -> reuse exported clip data from {case_dir}")
|
||
else:
|
||
export_result = export_one_clip(clip_task, args, export_status)
|
||
if not export_result.success or not export_result.output_dir:
|
||
raise RuntimeError(export_result.message)
|
||
case_dir = Path(export_result.output_dir)
|
||
|
||
from tools.pdcl_inference.two_roi_inference import run_case_inference
|
||
|
||
vis_dir = Path(args.visualization_root) / case_dir.name
|
||
infer_result = run_case_inference(
|
||
context=context,
|
||
case_dir=str(case_dir),
|
||
output_dir=vis_dir,
|
||
glob_pattern=args.glob,
|
||
max_images=args.max_images,
|
||
)
|
||
infer_status.mark_done(task_id, infer_result["output_dir"])
|
||
success += 1
|
||
print(f" -> saved {infer_result['num_frames']} frames to {infer_result['output_dir']}")
|
||
except Exception as exc:
|
||
fail += 1
|
||
record_task_failure(infer_status, args.visualization_root, task_id, exc)
|
||
print(f" -> failed: {type(exc).__name__}: {exc}")
|
||
|
||
print("\nBatch inference done.")
|
||
print(f"success={success}, fail={fail}")
|
||
print(f"infer_status_summary={infer_status.summary()}")
|
||
|
||
|
||
def run_video_case_batch(args: argparse.Namespace, context) -> None:
|
||
from tools.pdcl_inference.status_store import StatusStore
|
||
from tools.pdcl_inference.two_roi_inference import run_video_case_inference
|
||
|
||
Path(args.visualization_root).mkdir(parents=True, exist_ok=True)
|
||
|
||
video_case_tasks = parse_video_case_list(args.video_case_list_file)
|
||
if args.limit_clips > 0:
|
||
video_case_tasks = video_case_tasks[: args.limit_clips]
|
||
if not video_case_tasks:
|
||
print("No video-case tasks discovered. Exit.")
|
||
return
|
||
|
||
infer_status = StatusStore(Path(args.visualization_root) / "_status" / "task_status.json")
|
||
save_task_batch_manifest(args, video_case_tasks, input_mode="video_case")
|
||
|
||
print(f"Discovered {len(video_case_tasks)} video-case tasks.")
|
||
print(f"Visualization root: {args.visualization_root}")
|
||
|
||
success = 0
|
||
fail = 0
|
||
for index, task in enumerate(video_case_tasks, start=1):
|
||
task_id = task.task_id
|
||
if args.skip_done and infer_status.is_done(task_id):
|
||
info = infer_status.get(task_id) or {}
|
||
print(f"[{index}/{len(video_case_tasks)}] case={task.case_name} -> skip done: {info.get('detail', '')}")
|
||
continue
|
||
|
||
print(f"[{index}/{len(video_case_tasks)}] case={task.case_name} source={task.input_path}")
|
||
infer_status.mark_running(task_id, f"running video-case inference source={task.video_path}")
|
||
try:
|
||
vis_dir = Path(args.visualization_root) / Path(task.output_rel_dir)
|
||
infer_result = run_video_case_inference(
|
||
context=context,
|
||
video_case_dir=task.case_dir,
|
||
output_dir=vis_dir,
|
||
max_images=args.max_images,
|
||
video_stride=args.video_stride,
|
||
)
|
||
infer_status.mark_done(task_id, infer_result["output_dir"])
|
||
success += 1
|
||
print(f" -> saved {infer_result['num_frames']} frames to {infer_result['output_dir']}")
|
||
except Exception as exc:
|
||
fail += 1
|
||
record_task_failure(infer_status, args.visualization_root, task_id, exc)
|
||
print(f" -> failed: {type(exc).__name__}: {exc}")
|
||
|
||
print("\nBatch inference done.")
|
||
print(f"success={success}, fail={fail}")
|
||
print(f"infer_status_summary={infer_status.summary()}")
|
||
|
||
|
||
def run_rawid_batch(args: argparse.Namespace, context) -> None:
|
||
from tools.pdcl_inference.status_store import StatusStore
|
||
|
||
output_root = Path(args.output_root)
|
||
output_root.mkdir(parents=True, exist_ok=True)
|
||
|
||
rawid_tasks = parse_rawid_tasks(args.rawid_json)
|
||
rawid_tasks = sorted(
|
||
rawid_tasks,
|
||
key=lambda task: (task.cve_data or "", task.raw_id),
|
||
)
|
||
if args.limit_rawids > 0:
|
||
rawid_tasks = rawid_tasks[: args.limit_rawids]
|
||
if not rawid_tasks:
|
||
print("No raw_id tasks discovered. Exit.")
|
||
return
|
||
|
||
rawid_status = StatusStore(output_root / "_status" / "rawid_status.json")
|
||
clip_status = StatusStore(output_root / "_status" / "clip_status.json")
|
||
error_log = output_root / "_status" / "errors.log"
|
||
save_batch_manifest(args, rawid_tasks)
|
||
|
||
context = build_two_roi_inference_context_from_args(args)
|
||
|
||
print(f"Discovered {len(rawid_tasks)} raw_ids.")
|
||
print(f"Output root: {output_root}")
|
||
|
||
total_success = 0
|
||
total_fail = 0
|
||
for index, rawid_task in enumerate(rawid_tasks, start=1):
|
||
print(
|
||
f"[{index}/{len(rawid_tasks)}] raw_id={rawid_task.raw_id} "
|
||
f"scenario={rawid_task.scenario_key} clips={len(rawid_task.clips)} cve={rawid_task.cve_data or 'n/a'}"
|
||
)
|
||
success, fail = run_one_rawid(
|
||
rawid_task=rawid_task,
|
||
args=args,
|
||
context=context,
|
||
rawid_status=rawid_status,
|
||
clip_status=clip_status,
|
||
error_log=error_log,
|
||
)
|
||
total_success += success
|
||
total_fail += fail
|
||
|
||
print("\nBatch inference done.")
|
||
print(f"clip_success={total_success}, clip_fail={total_fail}")
|
||
print(f"rawid_status_summary={rawid_status.summary()}")
|
||
print(f"clip_status_summary={clip_status.summary()}")
|
||
|
||
|
||
def main(argv: Optional[list[str]] = None) -> None:
|
||
args = parse_args(argv)
|
||
populate_two_roi_inference_args(args)
|
||
|
||
load_dotenv()
|
||
mode = infer_input_mode(args)
|
||
context = build_two_roi_inference_context_from_args(args)
|
||
|
||
if mode == "rawid":
|
||
from tools.pdcl_inference.export_mcap_frames_by_clip_id import validate_pdcl_auth_env
|
||
|
||
validate_pdcl_auth_env()
|
||
run_rawid_batch(args, context)
|
||
elif mode == "video_case":
|
||
run_video_case_batch(args, context)
|
||
else:
|
||
run_clip_batch(args, context)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|