Files
yolov26_3d/tools/pdcl_inference/file_pred_bundle.py
2026-06-24 09:35:46 +08:00

958 lines
33 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""File-based prediction bundle for analyze_val_two_roi_badcases.py.
Reads precomputed detection JSON files and GT JSON files (eval_tools format)
and converts them to the same internal format used by run_roi_analysis().
Key differences from live-inference mode:
- Predictions are loaded from JSON rather than run through the model.
- GT is parsed from JSON (absolute pixel coords) and converted to lb_2d/lb_3d.
- PreparedROI is built from calibration + roi_config; depth_scale is forced to
1.0 because JSON z3d values are already metric (de-normalized).
- EdgeYaw artifacts (edge_selection, edge_box, edge_heading_decoded) are not
available from JSON, so they are set to None/False/nan.
"""
from __future__ import annotations
import math
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterator, Optional
import cv2
import numpy as np
import yaml
FILE = Path(__file__).resolve()
ROOT = FILE.parents[2]
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT))
from ultralytics.data.ground3d_augment import (
adjust_calib_for_roi_crop,
build_final_resized_calib,
compute_centered_roi_bounds,
)
from tools.pdcl_inference.two_roi_inference import (
PreparedROI,
_compute_vanishing_point_xy,
_resize_ground3d_image_in_steps,
load_camera4_calib,
)
from eval_tools.evaluator.parser import DetectionParser, GroundTruthParser
from eval_tools.class_config import (
CLASS_NAMES,
COMPLETE_3D_CLASSES as _EVAL_COMPLETE_3D,
FACE_3D_CLASSES as _EVAL_FACE_3D,
)
# lb_3d face offsets: front=10, rear=18, left=26, right=34
_FACE_NAME_TO_OFFSET: dict[str, int] = {"front": 10, "back": 18, "left": 26, "right": 34}
# ---------------------------------------------------------------------------
# ROI spec for file-based mode
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class FilePredROISpec:
"""Minimal ROI spec for file-based mode.
Has the same attribute interface as ROIModelSpec so that visualization
helpers (_prepare_roi_image etc.) can accept it directly.
"""
name: str
roi_size: tuple[int, int] # (width, height) full ROI before bottom/right trim
crop_center_mode: str # "cxvy" (roi0) or "vxvy" (roi1)
virtual_fx: float = 1.0 # placeholder; depth_scale is overridden to 1.0
imgsz: Optional[tuple[int, int]] = None
conf: float = 0.25
max_det: int = 300
model_path: str = "" # unused in file mode
# ---------------------------------------------------------------------------
# Bundle dataclass
# ---------------------------------------------------------------------------
@dataclass
class FilePredBundle:
"""Holds configuration for one ROI in file-based mode.
The attributes mirror the ones that run_roi_analysis() accesses on a
LoadedROIModel so the analysis loop can treat both interchangeably.
"""
spec: FilePredROISpec
names: dict[int, str]
face_3d_classes: set[int]
complete_3d_classes: set[int]
imgsz: tuple[int, int]
# File I/O config
det_path: Path
gt_path: Path
path_depth: int
det_subdir: Optional[str]
det_roi_filter: Optional[str]
det_format: str
gt_format: str
# Calibration / ROI
calib_root: Path
roi_bottom_offset: int
roi_right_offset: int
# Evaluation thresholds
conf_threshold: float
img_width: int
img_height: int
min_box_size: float
# Optional image root for visualization (images not required for metrics)
image_root: Optional[Path] = None
# Per-case calibration cache keyed by (level1_name or "", case_name).
# Populated lazily by load_frame(); eliminates repeated disk reads for
# frames that belong to the same case (same camera4.json).
calib_cache: dict = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Factory
# ---------------------------------------------------------------------------
def build_file_pred_bundle(eval_cfg: dict, roi_name: str) -> FilePredBundle:
"""Build a FilePredBundle from a loaded eval YAML config.
Args:
eval_cfg: Dict loaded from an eval_tools config YAML.
roi_name: "roi0" or "roi1".
Returns:
FilePredBundle ready for use in run_roi_analysis().
"""
dataset_cfg = eval_cfg.get("dataset", {})
roi_gt_cfg = eval_cfg.get("roi_gt", {})
model_cfg = eval_cfg.get("model", {})
image_cfg = eval_cfg.get("image", {})
classes_cfg = eval_cfg.get("classes", {})
metrics_2d = eval_cfg.get("metrics_2d", {})
det_path = Path(str(dataset_cfg["det_path"]))
gt_path = Path(str(dataset_cfg["gt_path"]))
path_depth = int(dataset_cfg.get("path_depth", 1))
det_subdir = dataset_cfg.get("det_subdir") or None
_drf = dataset_cfg.get("det_roi_filter")
det_roi_filter = str(_drf) if _drf is not None else None
det_format = str(dataset_cfg.get("det_format", "auto"))
gt_format = str(dataset_cfg.get("gt_format", "auto"))
img_width = int(image_cfg.get("width", 1920))
img_height = int(image_cfg.get("height", 1080))
input_size = int(model_cfg.get("input_size", 768))
min_box_at_input = float(model_cfg.get("min_box_size_at_input_scale", 8))
roi_config = roi_gt_cfg.get("roi_config")
if roi_config is None:
raise ValueError("eval config missing roi_gt.roi_config")
roi_bottom_offset = int(roi_gt_cfg.get("roi_bottom_offset", 0))
roi_right_offset = int(roi_gt_cfg.get("roi_right_offset", 0))
calib_root_str = roi_gt_cfg.get("calib_root") or str(gt_path)
calib_root = Path(calib_root_str)
# --- Derive ROI crop size (before bottom/right trim) ---
if isinstance(roi_config, (list, tuple)):
if len(roi_config) == 2:
full_roi_w = int(roi_config[0])
full_roi_h = int(roi_config[1])
elif len(roi_config) == 4:
full_roi_w = int(roi_config[2]) - int(roi_config[0])
full_roi_h = int(roi_config[3]) - int(roi_config[1])
else:
raise ValueError(f"roi_config must have 2 or 4 elements: {roi_config}")
else:
raise ValueError(f"Unsupported roi_config type: {type(roi_config)}")
# Effective ROI size after offsets (what the model sees)
eff_roi_w = full_roi_w - roi_right_offset
eff_roi_h = full_roi_h - roi_bottom_offset
# Model input size (width from config; height derived from ROI aspect ratio)
model_input_w = input_size
model_input_h = round(input_size * eff_roi_h / eff_roi_w)
imgsz = (model_input_w, model_input_h)
# Crop center mode
crop_center_mode = "vxvy" if roi_name.lower() == "roi1" else "cxvy"
# Min box size at original image scale
min_box_size = min_box_at_input * eff_roi_w / input_size
# Class names
class_names_cfg = classes_cfg.get("class_names") or {}
names: dict[int, str] = (
{int(k): str(v) for k, v in class_names_cfg.items()}
if class_names_cfg
else dict(CLASS_NAMES)
)
# face / complete 3D classes from eval config (or defaults from class_config.py)
face_3d_cls = set(int(x) for x in classes_cfg.get("3d_classes", _EVAL_FACE_3D))
complete_3d_cls = set(int(x) for x in classes_cfg.get("complete_3d_classes", _EVAL_COMPLETE_3D))
# eval configs typically only list vehicle-like 3D classes; pedestrian etc.
# might be absent. Keep complete_3d_cls as whatever the config says.
conf = float(metrics_2d.get("conf_threshold", 0.25))
roi_spec = FilePredROISpec(
name=roi_name.upper(),
roi_size=(full_roi_w, full_roi_h),
crop_center_mode=crop_center_mode,
virtual_fx=1.0,
imgsz=imgsz,
conf=conf,
max_det=300,
)
# Optional image root (users may add an 'image_root' key to the eval config)
image_root_str = eval_cfg.get("image_root") or dataset_cfg.get("image_root")
image_root = Path(image_root_str) if image_root_str else None
return FilePredBundle(
spec=roi_spec,
names=names,
face_3d_classes=face_3d_cls,
complete_3d_classes=complete_3d_cls,
imgsz=imgsz,
det_path=det_path,
gt_path=gt_path,
path_depth=path_depth,
det_subdir=det_subdir,
det_roi_filter=det_roi_filter,
det_format=det_format,
gt_format=gt_format,
calib_root=calib_root,
roi_bottom_offset=roi_bottom_offset,
roi_right_offset=roi_right_offset,
conf_threshold=conf,
img_width=img_width,
img_height=img_height,
min_box_size=min_box_size,
image_root=image_root,
)
# ---------------------------------------------------------------------------
# Frame scanning
# ---------------------------------------------------------------------------
def iter_frames(bundle: FilePredBundle) -> Iterator[dict]:
"""Yield one dict per matched (det, gt) frame pair.
Returns:
dict with keys: case_name, frame_stem, det_file, gt_file, level1_name.
"""
det_root = bundle.det_path
gt_root = bundle.gt_path
if bundle.path_depth == 1:
case_dirs = [(d, d.name, None) for d in sorted(det_root.iterdir()) if d.is_dir()]
elif bundle.path_depth == 2:
case_dirs = []
for lvl1 in sorted(det_root.iterdir()):
if not lvl1.is_dir():
continue
for case_dir in sorted(lvl1.iterdir()):
if case_dir.is_dir():
case_dirs.append((case_dir, case_dir.name, lvl1.name))
else:
raise ValueError(f"Unsupported path_depth: {bundle.path_depth}")
for det_case_dir, case_name, level1_name in case_dirs:
# --- Locate detection files ---
det_results_dir, det_glob = _resolve_det_dir(
det_case_dir, bundle.det_subdir, bundle.det_roi_filter, bundle.det_format
)
if det_results_dir is None:
continue
# --- Corresponding GT case dir ---
if bundle.path_depth == 1:
gt_case_dir = gt_root / case_name
else:
gt_case_dir = gt_root / level1_name / case_name
if not gt_case_dir.exists():
continue
gt_results_dir, gt_glob = _resolve_gt_dir(gt_case_dir, bundle.gt_format)
if gt_results_dir is None:
continue
# --- Match det files to GT files by stem ---
det_files = {p.stem: p for p in sorted(det_results_dir.glob(det_glob))}
gt_files = {p.stem: p for p in sorted(gt_results_dir.glob(gt_glob))}
common_stems = sorted(det_files.keys() & gt_files.keys())
for stem in common_stems:
yield {
"case_name": case_name,
"frame_stem": stem,
"det_file": det_files[stem],
"gt_file": gt_files[stem],
"level1_name": level1_name,
}
def _resolve_det_dir(
det_case_dir: Path,
det_subdir: Optional[str],
det_roi_filter: Optional[str],
det_format: str,
) -> tuple[Optional[Path], str]:
"""Return (det_results_dir, glob_pattern) or (None, '') if not found."""
candidates: list[Path] = []
if det_subdir:
cand = (
Path(det_subdir) if Path(det_subdir).is_absolute()
else det_case_dir / det_subdir
)
candidates.append(cand)
if det_roi_filter is not None:
candidates += [
det_case_dir / "json_results" / f"roi{det_roi_filter}",
det_case_dir / "predictions" / f"roi{det_roi_filter}",
]
candidates += [
det_case_dir / "json_results",
det_case_dir / "predictions",
]
if det_format in ("json", "auto"):
for cand in dict.fromkeys(str(c) for c in candidates):
cand_path = Path(cand)
if cand_path.exists() and list(cand_path.glob("*.json")):
return cand_path, "*.json"
if det_format in ("txt", "auto"):
txt_dir = det_case_dir / "txt_results"
if txt_dir.exists() and list(txt_dir.glob("*.txt")):
return txt_dir, "*.txt"
return None, ""
def _resolve_gt_dir(gt_case_dir: Path, gt_format: str) -> tuple[Optional[Path], str]:
"""Return (gt_results_dir, glob_pattern) or (None, '') if not found."""
if gt_format in ("json", "auto"):
for subdir in ("labels_json", "json_results"):
d = gt_case_dir / subdir
if d.exists() and list(d.glob("*.json")):
return d, "*.json"
if gt_format in ("txt", "auto"):
for subdir in ("labels", "txt_results"):
d = gt_case_dir / subdir
if d.exists() and list(d.glob("*.txt")):
return d, "*.txt"
return None, ""
# ---------------------------------------------------------------------------
# Per-frame loader
# ---------------------------------------------------------------------------
def load_frame(
bundle: FilePredBundle,
frame_info: dict,
) -> dict:
"""Load and convert one frame's GT + predictions.
Returns a dict with keys matching what run_roi_analysis() expects::
{
"image_bgr": np.ndarray | None,
"image_path": Path,
"label_path": Path,
"gt": dict[str, Any],
"predictions": list[dict],
"prepared": PreparedROI,
}
When the original image is not found, image_bgr is None and prepared.image
is a zero-filled placeholder (metrics still work; visualization is skipped).
"""
case_name = frame_info["case_name"]
frame_stem = frame_info["frame_stem"]
det_file = Path(frame_info["det_file"])
gt_file = Path(frame_info["gt_file"])
level1_name = frame_info.get("level1_name")
# ----- 1. Load calibration (cached per case to avoid repeated disk reads) -----
_cache_key = (level1_name or "", case_name)
if _cache_key not in bundle.calib_cache:
bundle.calib_cache[_cache_key] = _load_calib_for_case(bundle.calib_root, case_name, level1_name)
raw_calib = bundle.calib_cache[_cache_key]
# ----- 2. Compute ROI crop bounds -----
if raw_calib is not None:
vp_x, vp_y = _compute_vanishing_point_xy(
raw_calib, bundle.img_width, bundle.img_height
)
else:
# Fall back to image center when calibration is missing
vp_x = bundle.img_width / 2.0
vp_y = bundle.img_height / 2.0
roi_full_w, roi_full_h = bundle.spec.roi_size
eff_roi_w = roi_full_w - bundle.roi_right_offset
eff_roi_h = roi_full_h - bundle.roi_bottom_offset
crop_center_x = (
vp_x if bundle.spec.crop_center_mode == "vxvy"
else bundle.img_width / 2.0
)
crop_bounds = compute_centered_roi_bounds(
bundle.img_width, bundle.img_height,
min(roi_full_w, bundle.img_width),
min(roi_full_h, bundle.img_height),
crop_center_x, vp_y,
)
# Apply bottom / right offsets
cx1, cy1, cx2, cy2 = crop_bounds
cx2 = cx2 - bundle.roi_right_offset
cy2 = cy2 - bundle.roi_bottom_offset
eff_crop_bounds = (cx1, cy1, cx2, cy2)
# ----- 3. Build file-mode PreparedROI -----
model_input_w, model_input_h = bundle.imgsz
calib_after_crop = adjust_calib_for_roi_crop(
raw_calib, bundle.img_width, bundle.img_height, eff_crop_bounds
)
# Set virtual_fx = fx_final so that depth_scale = 1.0 (z3d stored as metric)
fx_final = calib_after_crop["focal_u"] * model_input_w / (cx2 - cx1)
roi_calib = build_final_resized_calib(
calib_after_crop["focal_u"],
calib_after_crop["focal_v"],
calib_after_crop["cu"],
calib_after_crop["cv"],
calib_after_crop["src_w"],
calib_after_crop["src_h"],
model_input_w,
model_input_h,
virtual_fx=fx_final, # depth_scale = fx_final / fx_final = 1.0
distort_coeffs=calib_after_crop["distort_coeffs"],
)
# Sanity: depth_scale should be 1.0 (allow tiny float error)
assert abs(roi_calib.get("depth_scale", 1.0) - 1.0) < 1e-4, roi_calib["depth_scale"]
# ----- 4. Try to load original image -----
image_bgr: Optional[np.ndarray] = None
image_path_guess = _guess_image_path(bundle, case_name, frame_stem, level1_name)
if image_path_guess is not None and image_path_guess.exists():
image_bgr = cv2.imread(str(image_path_guess), cv2.IMREAD_COLOR)
if image_bgr is not None:
# Produce the ROI-cropped and resized image
cropped = image_bgr[cy1:cy2, cx1:cx2]
roi_image = _resize_ground3d_image_in_steps(
cropped, (model_input_w, model_input_h)
)
else:
roi_image = np.zeros((model_input_h, model_input_w, 3), dtype=np.uint8)
prepared = PreparedROI(
name=bundle.spec.name,
image=roi_image,
crop_bounds=eff_crop_bounds,
calib=roi_calib,
vp_x=float(vp_x),
vp_y=float(vp_y),
crop_center_x=float(crop_center_x),
crop_center_y=float(vp_y),
)
# ----- 5. Load and convert GT -----
gt = _load_gt(
gt_file=gt_file,
bundle=bundle,
raw_calib=raw_calib,
eff_crop_bounds=eff_crop_bounds,
model_input_w=model_input_w,
model_input_h=model_input_h,
)
# ----- 6. Load and convert predictions -----
predictions = _load_predictions(
det_file=det_file,
bundle=bundle,
eff_crop_bounds=eff_crop_bounds,
model_input_w=model_input_w,
model_input_h=model_input_h,
roi_calib=roi_calib,
)
# Determine canonical image_path and label_path for record keeping
image_path = image_path_guess if image_path_guess is not None else (
bundle.gt_path / (frame_info.get("level1_name") or "") / case_name / f"{frame_stem}.png"
)
label_path = gt_file
return {
"image_bgr": image_bgr,
"image_path": image_path,
"label_path": label_path,
"gt": gt,
"predictions": predictions,
"prepared": prepared,
}
# ---------------------------------------------------------------------------
# GT conversion
# ---------------------------------------------------------------------------
def _load_gt(
gt_file: Path,
bundle: FilePredBundle,
raw_calib: Optional[dict],
eff_crop_bounds: tuple,
model_input_w: int,
model_input_h: int,
) -> dict:
"""Parse GT JSON and return the lb_2d / lb_3d / boxes_xyxy / classes dict."""
gt_parser = GroundTruthParser(min_box_size=0, coord_system="camera")
raw_annotations = gt_parser.parse_file(
str(gt_file), bundle.img_width, bundle.img_height
)
# Clip annotations to the effective ROI crop bounds (same bounds used for
# PreparedROI calib, ensuring coordinate conversion consistency).
cx1, cy1, cx2, cy2 = eff_crop_bounds
roi_w = cx2 - cx1
roi_h = cy2 - cy1
annotations: list[dict] = []
for raw_ann in raw_annotations:
x1o, y1o, x2o, y2o = raw_ann["bbox_2d"]
# Clip to ROI bounds
nx1 = max(x1o, cx1)
ny1 = max(y1o, cy1)
nx2 = min(x2o, cx2)
ny2 = min(y2o, cy2)
if nx2 <= nx1 or ny2 <= ny1:
continue
ann = dict(raw_ann)
ann["bbox_2d"] = [nx1, ny1, nx2, ny2]
annotations.append(ann)
scale_x = model_input_w / roi_w
scale_y = model_input_h / roi_h
# Min box size threshold in model input pixels
min_box_model_px = bundle.min_box_size * scale_x # same scale_x as roi→model
bboxes_list: list[np.ndarray] = []
cls_list: list[float] = []
diff_list: list[float] = []
lb3d_list: list[np.ndarray] = []
for ann in annotations:
label = ann["label"]
# Filter to known classes
if label not in bundle.names:
continue
# Convert bbox from original image coords to model input coords
x1o, y1o, x2o, y2o = ann["bbox_2d"]
x1m = (x1o - cx1) * scale_x
y1m = (y1o - cy1) * scale_y
x2m = (x2o - cx1) * scale_x
y2m = (y2o - cy1) * scale_y
bw = x2m - x1m
bh = y2m - y1m
if bw < min_box_model_px or bh < min_box_model_px:
continue
# Normalized xywh for lb_2d (w.r.t. model input size)
xcn = (x1m + x2m) * 0.5 / model_input_w
ycn = (y1m + y2m) * 0.5 / model_input_h
wn = bw / model_input_w
hn = bh / model_input_h
bboxes_list.append(np.array([xcn, ycn, wn, hn], dtype=np.float32))
cls_list.append(float(label))
diff_list.append(1.0) # difficulty weight default
# Build 42-dim lb_3d row
row = _build_lb3d_row(
ann=ann,
label=label,
raw_calib=raw_calib,
cx1=cx1, cy1=cy1, roi_w=roi_w, roi_h=roi_h,
face_3d_classes=bundle.face_3d_classes,
complete_3d_classes=bundle.complete_3d_classes,
)
lb3d_list.append(row) # always append (NaN row if 3D not available)
n = len(bboxes_list)
if n == 0:
lb_2d = {
"cls": np.zeros((0, 1), dtype=np.float32),
"bboxes": np.zeros((0, 4), dtype=np.float32),
"difficulties": np.zeros((0, 1), dtype=np.float32),
"segments": [], "keypoints": None,
"normalized": True, "bbox_format": "xywh",
}
lb_3d = np.full((0, 42), np.nan, dtype=np.float32)
boxes_xyxy = np.zeros((0, 4), dtype=np.float32)
classes = np.zeros((0,), dtype=np.int32)
return {
"lb_2d": lb_2d, "lb_3d": lb_3d,
"boxes_xyxy": boxes_xyxy, "classes": classes,
}
bboxes_arr = np.stack(bboxes_list, axis=0) # (n, 4)
cls_arr = np.array(cls_list, dtype=np.float32).reshape(-1, 1)
diff_arr = np.array(diff_list, dtype=np.float32).reshape(-1, 1)
lb3d_arr = np.stack(lb3d_list, axis=0) # (n, 42)
lb_2d = {
"cls": cls_arr,
"bboxes": bboxes_arr,
"difficulties": diff_arr,
"segments": [], "keypoints": None,
"normalized": True, "bbox_format": "xywh",
}
# boxes_xyxy in model input pixel space
xc_px = bboxes_arr[:, 0] * model_input_w
yc_px = bboxes_arr[:, 1] * model_input_h
w_px = bboxes_arr[:, 2] * model_input_w
h_px = bboxes_arr[:, 3] * model_input_h
boxes_xyxy = np.stack([xc_px - w_px/2, yc_px - h_px/2,
xc_px + w_px/2, yc_px + h_px/2], axis=1).astype(np.float32)
classes = cls_arr.reshape(-1).astype(np.int32)
return {
"lb_2d": lb_2d, "lb_3d": lb3d_arr,
"boxes_xyxy": boxes_xyxy, "classes": classes,
}
def _build_lb3d_row(
ann: dict,
label: int,
raw_calib: Optional[dict],
cx1: int, cy1: int, roi_w: int, roi_h: int,
face_3d_classes: set[int],
complete_3d_classes: set[int],
) -> np.ndarray:
"""Build a 42-dim lb_3d row from a JSON GT annotation dict.
z3d values are stored as metric (depth_scale = 1.0 in PreparedROI calib).
xc/yc values are ROI-relative normalized (01 within the crop).
"""
row = np.full(42, np.nan, dtype=np.float32)
d3info = ann.get("3d_info")
if d3info is None:
return row
if label not in face_3d_classes and label not in complete_3d_classes:
return row
center = d3info["center"] # [x3d, y3d, z3d] metric
dims = d3info["dimensions"] # [l, h, w]
rot_y = d3info["rotation"] # radians
x3d, y3d, z3d = center
if not (math.isfinite(z3d) and z3d > 0):
return row
row[0:3] = center
row[3:6] = dims
row[6] = rot_y
# Project 3D center into original image → ROI-relative normalized
if raw_calib is not None:
fx0 = float(raw_calib.get("focal_u", 1.0))
fy0 = float(raw_calib.get("focal_v", 1.0))
cx0 = float(raw_calib.get("cu", 0.0))
cy0 = float(raw_calib.get("cv", 0.0))
u_orig = fx0 * x3d / z3d + cx0
v_orig = fy0 * y3d / z3d + cy0
else:
u_orig = 0.0
v_orig = 0.0
row[7] = (u_orig - cx1) / roi_w
row[8] = (v_orig - cy1) / roi_h
row[9] = 0.0 # alpha approximation
# Face data for face_3d_classes
if label in face_3d_classes and d3info.get("faces"):
for face_name, foffset in _FACE_NAME_TO_OFFSET.items():
face_data = d3info["faces"].get(face_name)
if face_data is None or len(face_data) < 8:
continue
# JSON face data: [x3d, y3d, z3d, alpha, xc_abs_px, yc_abs_px, score, is_visible]
fz3d = float(face_data[2])
if not (math.isfinite(fz3d) and fz3d > 0):
continue
row[foffset + 0] = float(face_data[0]) # face x3d
row[foffset + 1] = float(face_data[1]) # face y3d
row[foffset + 2] = fz3d # face z3d (metric)
row[foffset + 3] = float(face_data[3]) # alpha
# xc/yc in JSON are absolute pixel coords → ROI-relative normalized
xc_abs = float(face_data[4])
yc_abs = float(face_data[5])
row[foffset + 4] = (xc_abs - cx1) / roi_w
row[foffset + 5] = (yc_abs - cy1) / roi_h
row[foffset + 6] = float(face_data[6]) # visibility score
row[foffset + 7] = float(face_data[7]) # is_visible
return row
# ---------------------------------------------------------------------------
# Prediction conversion
# ---------------------------------------------------------------------------
def _load_predictions(
det_file: Path,
bundle: FilePredBundle,
eff_crop_bounds: tuple,
model_input_w: int,
model_input_h: int,
roi_calib: dict,
) -> list[dict]:
"""Parse detection JSON and convert to the prediction list format."""
det_parser = DetectionParser(min_box_size=0, coord_system="camera")
raw_dets = det_parser.parse_file(str(det_file))
cx1, cy1, cx2, cy2 = eff_crop_bounds
roi_w = cx2 - cx1
roi_h = cy2 - cy1
scale_x = model_input_w / roi_w
scale_y = model_input_h / roi_h
predictions = []
for det in raw_dets:
label = det["label"]
conf = float(det.get("confidence", 0.0))
roi_id = det.get("roi_id")
# Skip by confidence threshold
if conf < bundle.conf_threshold:
continue
# Skip by roi_id filter
if bundle.det_roi_filter is not None:
if roi_id is None or _norm_roi_id(roi_id) != _norm_roi_id(bundle.det_roi_filter):
continue
# Skip classes not in the configured class_names
if label not in bundle.names:
continue
# Convert bbox from original image coords to model input coords
x1o, y1o, x2o, y2o = det["bbox_2d"]
x1m = (x1o - cx1) * scale_x
y1m = (y1o - cy1) * scale_y
x2m = (x2o - cx1) * scale_x
y2m = (y2o - cy1) * scale_y
bbox_xyxy = np.array([x1m, y1m, x2m, y2m], dtype=np.float32)
d3info = det.get("3d_info")
decoded = None
attrs = None
if d3info is not None:
xyzlhwyaw = [
d3info["center"][0], d3info["center"][1], d3info["center"][2],
d3info["dimensions"][0], d3info["dimensions"][1], d3info["dimensions"][2],
d3info["rotation"],
]
decoded, attrs = _build_decoded_and_attrs_from_xyzlhwyaw(
xyzlhwyaw=xyzlhwyaw,
face_type_name=d3info.get("face_type"),
cls_id=label,
calib=roi_calib,
img_w=model_input_w,
img_h=model_input_h,
face_3d_classes=bundle.face_3d_classes,
complete_3d_classes=bundle.complete_3d_classes,
)
predictions.append({
"bbox_xyxy": bbox_xyxy,
"confidence": conf,
"cls_id": label,
# 3D fields
"attrs": attrs,
"decoded": decoded,
# Edge artifacts not available from precomputed JSON
"edge_selection": None,
"edge_box": None,
"edge_heading_decoded": None,
"edge_yaw": float("nan"),
"edge_confident": False,
})
return predictions
def _build_decoded_and_attrs_from_xyzlhwyaw(
xyzlhwyaw: list,
face_type_name: Optional[str],
cls_id: int,
calib: dict,
img_w: int,
img_h: int,
face_3d_classes: set[int],
complete_3d_classes: set[int],
) -> tuple[Optional[dict], Optional[dict]]:
"""Build 'decoded' and 'attrs' dicts from an xyzlhwyaw list + calibration.
These mimic the output of decode_3d_prediction() and
extract_3d_attrs_from_prediction() but use the stored metric values
directly instead of decoding from raw prediction tensors.
"""
from ultralytics.utils.plotting_3d import (
FACE_COLORS,
reconstruct_3d_box_from_face,
reconstruct_3d_box_from_whole,
collect_face_bottom_edges,
)
if len(xyzlhwyaw) < 7:
return None, None
x3d, y3d, z3d = [float(v) for v in xyzlhwyaw[:3]]
l, h, w = [float(v) for v in xyzlhwyaw[3:6]]
rot_y = float(xyzlhwyaw[6])
dims = np.array([l, h, w], dtype=np.float32)
if not (math.isfinite(z3d) and z3d > 0):
return None, None
center = np.array([x3d, y3d, z3d], dtype=np.float32)
fx = calib["fx"]
fy = calib["fy"]
cx_c = calib["cx"]
cy_c = calib["cy"]
# 2D projection of center
u = fx * (x3d / z3d) + cx_c
v = fy * (y3d / z3d) + cy_c
# --- Map face_type_name to integer ---
_FACE_TYPE_MAP = {"front": 0, "rear": 1, "back": 1, "left": 2, "right": 3}
face_type_int: Optional[int] = None
if face_type_name is not None:
face_type_int = _FACE_TYPE_MAP.get(str(face_type_name).lower())
# --- Build corners_3d ---
corners_3d = None
face_center_2d = None
face_color = None
visible_face_type = face_type_int
visible_face_types: tuple[int, ...] = ()
if cls_id in face_3d_classes and face_type_int is not None:
# Use face-based 3D reconstruction
corners_3d = reconstruct_3d_box_from_face(
(u, v), z3d, dims, rot_y, face_type_int, calib
)
if corners_3d is not None:
face_center_2d = (u, v)
face_color = FACE_COLORS[face_type_int]
visible_face_types = (face_type_int,)
elif cls_id in complete_3d_classes or cls_id in face_3d_classes:
corners_3d = reconstruct_3d_box_from_whole(
(u, v), z3d, dims, rot_y, calib
)
visible_face_type = None
edge_points_3d, edge_points_2d = None, None
if corners_3d is not None and visible_face_types:
edge_points_3d, edge_points_2d = collect_face_bottom_edges(
corners_3d, list(visible_face_types), calib, num_samples=5
)
decoded: Optional[dict] = None
if corners_3d is not None:
decoded = {
"corners_3d": corners_3d,
"face_center_2d": face_center_2d,
"face_color": face_color,
"visible_face_type": visible_face_type,
"visible_face_types": visible_face_types,
"edge_points_2d": edge_points_2d,
"edge_points_3d": edge_points_3d,
}
attrs: Optional[dict] = {
"center": center,
"depth": float(z3d),
"dims": dims,
"yaw": rot_y,
"yaw_deg": math.degrees(rot_y),
"uv": np.array([u, v], dtype=np.float32),
"visible_face_type": visible_face_type,
"face_center": (
center
if cls_id in face_3d_classes and face_type_int is not None
else None
),
}
return decoded, attrs
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _load_calib_for_case(
calib_root: Path,
case_name: str,
level1_name: Optional[str],
) -> Optional[dict]:
"""Look for a camera4.json calibration file under the case directory."""
case_dir = calib_root / level1_name / case_name if level1_name else calib_root / case_name
candidates = [
case_dir / "calib" / "L2_calib" / "camera4.json",
case_dir / "calib" / "camera4.json",
case_dir / "calibration.json",
]
for path in candidates:
if path.exists():
try:
return load_camera4_calib(path)
except Exception as exc:
print(f"Warning: could not load calib {path}: {exc}")
return None
def _guess_image_path(
bundle: FilePredBundle,
case_name: str,
frame_stem: str,
level1_name: Optional[str],
) -> Optional[Path]:
"""Try to find the original image for a given frame."""
roots_to_try: list[Path] = []
if bundle.image_root is not None:
roots_to_try.append(bundle.image_root)
roots_to_try.append(bundle.gt_path)
for root in roots_to_try:
case_dir = root / level1_name / case_name if level1_name else root / case_name
for subdir in ("images", ""):
base = case_dir / subdir if subdir else case_dir
for ext in (".png", ".jpg", ".jpeg"):
candidate = base / f"{frame_stem}{ext}"
if candidate.exists():
return candidate
return None
def _norm_roi_id(roi_id) -> str:
"""Normalize 'roi0'/'0'/0 → '0'."""
s = str(roi_id).strip().lower()
if s.startswith("roi"):
s = s[3:]
return s or "0"