#!/usr/bin/env python3 """Analyze Ground3D GT targets for Detect3D range limits and init priors. This script reuses the same Ground3D dataset preprocessing path as training so the reported distributions reflect the targets seen by the 3D head after ROI/virtual camera processing and depth normalization. """ from __future__ import annotations import argparse import json import math import random from pathlib import Path from typing import Any import numpy as np FACE_SPECS = ( ("front", 10, (4, 5)), ("rear", 18, (4, 5)), ("left", 26, (3, 4)), ("right", 34, (3, 4)), ) BRANCH_KEYS = ("whole", "visible_faces", *(name for name, _, _ in FACE_SPECS)) UV_LIMIT_CELLS = 8.0 DEFAULT_FACE_VISIBILITY_SCORE_THRESH = 0.05 def parse_args() -> argparse.Namespace: """Parse command-line arguments.""" parser = argparse.ArgumentParser(description="Analyze GT target ranges and init priors for the Ground3D Detect3D head.") parser.add_argument("--data", type=str, required=True, help="Path to Ground3D dataset YAML.") parser.add_argument("--split", type=str, default="train", help="Comma-separated split(s) to analyze, e.g. train or train,val.") parser.add_argument("--roi", type=str, default=None, help="Optional ROI preset name from dataset YAML.") parser.add_argument("--imgsz", type=str, default="704,352", help="Model image size, e.g. 704,352 or 640.") parser.add_argument("--strides", type=str, default="8,16,32", help="Comma-separated detection strides.") parser.add_argument("--fraction", type=float, default=1.0, help="Dataset fraction passed into YOLOGround3DDataset.") parser.add_argument("--max-samples", type=int, default=None, help="Optional max number of dataset samples to analyze.") parser.add_argument("--augment", action="store_true", help="Use train-time stochastic ROI/virtual-camera preprocessing.") parser.add_argument( "--repeats", type=int, default=1, help="Number of passes over the dataset. Useful when --augment is enabled or when virtual_camera_prob introduces stochastic camera-mode sampling.", ) parser.add_argument("--seed", type=int, default=0, help="Random seed used for stochastic preprocessing.") parser.add_argument( "--face-visibility-score-thresh", type=float, default=DEFAULT_FACE_VISIBILITY_SCORE_THRESH, help="Face visibility score threshold used by the loss.", ) parser.add_argument( "--json-output", type=str, default=None, help="Optional path to save the full report as JSON.", ) return parser.parse_args() def parse_imgsz(imgsz: str) -> list[int]: """Parse image size string into [width, height].""" text = str(imgsz).strip() if "," in text: parts = [int(x.strip()) for x in text.split(",")] if len(parts) != 2: raise ValueError(f"Expected imgsz in 'w,h' format, got: {imgsz}") return parts side = int(text) return [side, side] def parse_int_list(text: str) -> list[int]: """Parse a comma-separated list of integers.""" return [int(x.strip()) for x in str(text).split(",") if x.strip()] def resolve_yaml_relative(value: Any, base_dir: Path) -> Any: """Resolve YAML paths relative to the original dataset YAML directory.""" if isinstance(value, str) and value: path = Path(value).expanduser() return str(path if path.is_absolute() else (base_dir / path).resolve()) if isinstance(value, (list, tuple)): return [resolve_yaml_relative(x, base_dir) for x in value] return value def load_data_cfg(data_path: str, roi_name: str | None) -> tuple[dict[str, Any], str | None]: """Load dataset YAML with the requested ROI preset applied.""" try: from train_mono3d import resolve_data_yaml_for_roi from ultralytics.utils import YAML except ImportError as exc: raise ImportError( "This script needs the training environment to load the dataset YAML. " "Please run it from the same environment you use for mono3d training." ) from exc source_path = Path(data_path).expanduser().resolve() resolved_path, selected_roi = resolve_data_yaml_for_roi(str(source_path), roi_name) resolved_file = Path(resolved_path).resolve() data_cfg = YAML.load(resolved_file) base_dir = source_path.parent for key in ("path", "train", "val", "test"): if key in data_cfg and data_cfg[key] is not None: data_cfg[key] = resolve_yaml_relative(data_cfg[key], base_dir) if resolved_file != source_path and resolved_file.exists(): resolved_file.unlink() return data_cfg, selected_roi def flatten_split_entries(entry: Any) -> list[str]: """Flatten a split entry into a list of GT list files.""" if entry is None: return [] if isinstance(entry, (list, tuple)): items: list[str] = [] for value in entry: items.extend(flatten_split_entries(value)) return items return [str(entry)] def make_dataset( data_cfg: dict[str, Any], split_names: list[str], imgsz: list[int], fraction: float, augment: bool, face_visibility_score_thresh: float, ) -> Any: """Build a Ground3D dataset matching the requested analysis configuration.""" try: from ultralytics.data.dataset import YOLOGround3DDataset from ultralytics.utils import DEFAULT_CFG except ImportError as exc: raise ImportError( "This script needs the full training environment, including torch and ultralytics dependencies." ) from exc split_files: list[str] = [] for split_name in split_names: if split_name not in data_cfg: raise KeyError(f"Dataset YAML does not define split '{split_name}'.") split_files.extend(flatten_split_entries(data_cfg[split_name])) if not split_files: raise ValueError(f"No GT list files found for split(s): {', '.join(split_names)}") hyp = type("HeadStatsHyp", (), {})() hyp.face_visibility_score_thresh = float(face_visibility_score_thresh) hyp.edge_aux_loss_gain = 0.0 hyp.batch_timing = False hyp.hsv_h = getattr(DEFAULT_CFG, "hsv_h", 0.0) hyp.hsv_s = getattr(DEFAULT_CFG, "hsv_s", 0.0) hyp.hsv_v = getattr(DEFAULT_CFG, "hsv_v", 0.0) hyp.bgr = getattr(DEFAULT_CFG, "bgr", 0.0) return YOLOGround3DDataset( img_path=split_files if len(split_files) > 1 else split_files[0], imgsz=imgsz, batch_size=1, augment=augment, hyp=hyp, rect=False, stride=32, pad=0.5, single_cls=False, classes=None, fraction=fraction, data=data_cfg, task="detect", ) def xywhn_to_xyxy(bbox_xywh: np.ndarray, img_w: int, img_h: int) -> np.ndarray: """Convert normalized xywh to pixel xyxy.""" x_c, y_c, w, h = bbox_xywh.astype(np.float64) return np.array( [ (x_c - w * 0.5) * img_w, (y_c - h * 0.5) * img_h, (x_c + w * 0.5) * img_w, (y_c + h * 0.5) * img_h, ], dtype=np.float64, ) def bbox_to_normalized_xywh( bboxes: np.ndarray, bbox_format: str, normalized: bool, img_w: int, img_h: int, ) -> np.ndarray: """Convert bounding boxes into normalized xywh.""" boxes = np.asarray(bboxes, dtype=np.float64).copy() if boxes.size == 0: return boxes.reshape(0, 4) if bbox_format == "xywh": xywh = boxes elif bbox_format == "xyxy": xywh = np.empty_like(boxes, dtype=np.float64) xywh[:, 0] = 0.5 * (boxes[:, 0] + boxes[:, 2]) xywh[:, 1] = 0.5 * (boxes[:, 1] + boxes[:, 3]) xywh[:, 2] = boxes[:, 2] - boxes[:, 0] xywh[:, 3] = boxes[:, 3] - boxes[:, 1] elif bbox_format == "ltwh": xywh = np.empty_like(boxes, dtype=np.float64) xywh[:, 0] = boxes[:, 0] + 0.5 * boxes[:, 2] xywh[:, 1] = boxes[:, 1] + 0.5 * boxes[:, 3] xywh[:, 2] = boxes[:, 2] xywh[:, 3] = boxes[:, 3] else: raise ValueError(f"Unsupported bbox format: {bbox_format}") if normalized: return xywh xywh[:, [0, 2]] /= float(img_w) xywh[:, [1, 3]] /= float(img_h) return xywh def extract_normalized_xywh_bboxes(sample: dict[str, Any], img_w: int, img_h: int) -> np.ndarray: """Extract normalized xywh boxes from either raw `bboxes` or `instances` samples.""" if "bboxes" in sample: return bbox_to_normalized_xywh( np.asarray(sample["bboxes"], dtype=np.float64), bbox_format="xywh", normalized=True, img_w=img_w, img_h=img_h, ) instances = sample.get("instances") if instances is None: raise KeyError("Sample has neither 'bboxes' nor 'instances'.") bbox_format = getattr(getattr(instances, "_bboxes", None), "format", "xywh") normalized = bool(getattr(instances, "normalized", True)) return bbox_to_normalized_xywh( np.asarray(instances.bboxes, dtype=np.float64), bbox_format=bbox_format, normalized=normalized, img_w=img_w, img_h=img_h, ) def expand_bbox_for_assigner(bbox_xyxy: np.ndarray, min_side_px: float) -> np.ndarray: """Mirror select_candidates_in_gts() min-size expansion before anchor-in-box tests.""" x1, y1, x2, y2 = bbox_xyxy.astype(np.float64) cx = 0.5 * (x1 + x2) cy = 0.5 * (y1 + y2) w = max(x2 - x1, float(min_side_px)) h = max(y2 - y1, float(min_side_px)) return np.array([cx - 0.5 * w, cy - 0.5 * h, cx + 0.5 * w, cy + 0.5 * h], dtype=np.float64) def best_in_box_offset_cells( target_uv_px: np.ndarray, bbox_xyxy: np.ndarray, img_w: int, img_h: int, stride: int, eps: float = 1e-9, ) -> np.ndarray | None: """Return the best-case in-box anchor offset in grid cells for one target UV.""" result = best_in_box_anchor_and_offset(target_uv_px, bbox_xyxy, img_w, img_h, stride, eps=eps) return None if result is None else result[1] def best_in_box_anchor_and_offset( target_uv_px: np.ndarray, bbox_xyxy: np.ndarray, img_w: int, img_h: int, stride: int, eps: float = 1e-9, ) -> tuple[np.ndarray, np.ndarray] | None: """Return the best in-box anchor pixel center and offset in grid cells.""" grid_w = img_w // stride grid_h = img_h // stride if grid_w <= 0 or grid_h <= 0: return None x1, y1, x2, y2 = bbox_xyxy.astype(np.float64) ix_lo = max(0, int(math.floor(x1 / stride - 0.5)) - 1) ix_hi = min(grid_w - 1, int(math.ceil(x2 / stride - 0.5)) + 1) iy_lo = max(0, int(math.floor(y1 / stride - 0.5)) - 1) iy_hi = min(grid_h - 1, int(math.ceil(y2 / stride - 0.5)) + 1) if ix_lo > ix_hi or iy_lo > iy_hi: return None x_centers = (np.arange(ix_lo, ix_hi + 1, dtype=np.float64) + 0.5) * stride y_centers = (np.arange(iy_lo, iy_hi + 1, dtype=np.float64) + 0.5) * stride valid_x = x_centers[(x_centers - x1 > eps) & (x2 - x_centers > eps)] valid_y = y_centers[(y_centers - y1 > eps) & (y2 - y_centers > eps)] if valid_x.size == 0 or valid_y.size == 0: return None target_u, target_v = target_uv_px.astype(np.float64) best_x = valid_x[np.argmin(np.abs(valid_x - target_u))] best_y = valid_y[np.argmin(np.abs(valid_y - target_v))] anchor_uv = np.array([best_x, best_y], dtype=np.float64) offset = np.array([(target_u - best_x) / stride, (target_v - best_y) / stride], dtype=np.float64) return anchor_uv, offset def infer_cut_label(target_42: np.ndarray) -> int: """Reproduce the cut-label mapping used by the training loss.""" def _is_face_cut(face_offset: int) -> bool: return bool(np.all(target_42[face_offset : face_offset + 6] == -1) and target_42[face_offset + 7] <= 0) rear_cut = _is_face_cut(18) left_cut = _is_face_cut(26) right_cut = _is_face_cut(34) front_cut = _is_face_cut(10) if rear_cut and left_cut and right_cut: return 1 if front_cut and left_cut and right_cut: return 2 return 0 def remove_fisheye_distortion_np(xd: float, yd: float, distort_coeffs: list[float] | tuple[float, ...] | np.ndarray, max_iter: int = 20) -> tuple[float, float]: """Remove Kannala-Brandt fisheye distortion from normalized camera coordinates.""" if distort_coeffs is None or len(distort_coeffs) < 4: return float(xd), float(yd) k1, k2, k3, k4 = [float(x) for x in distort_coeffs[:4]] r_d = math.sqrt(float(xd) * float(xd) + float(yd) * float(yd)) if r_d < 1e-8: return float(xd), float(yd) theta_d = r_d theta = theta_d / (1 + k1 * theta_d * theta_d) for _ in range(max_iter): theta2 = theta * theta theta4 = theta2 * theta2 theta6 = theta4 * theta2 theta8 = theta4 * theta4 f = theta * (1 + k1 * theta2 + k2 * theta4 + k3 * theta6 + k4 * theta8) - theta_d f_prime = 1 + 3 * k1 * theta2 + 5 * k2 * theta4 + 7 * k3 * theta6 + 9 * k4 * theta8 theta_new = theta - f / f_prime if abs(theta_new - theta) < 1e-8: theta = theta_new break theta = theta_new scale = math.tan(theta) / r_d return float(xd * scale), float(yd * scale) def back_project_2d_to_3d_np(uv: tuple[float, float] | np.ndarray, depth: float, calib: dict[str, Any] | None) -> np.ndarray | None: """Back-project a pixel point to camera coordinates.""" if calib is None or not np.isfinite(depth) or float(depth) <= 0: return None fx = float(calib["fx"]) fy = float(calib["fy"]) cx = float(calib["cx"]) cy = float(calib["cy"]) u, v = float(uv[0]), float(uv[1]) xd = (u - cx) / fx yd = (v - cy) / fy xn, yn = remove_fisheye_distortion_np(xd, yd, calib.get("distort_coeffs", [])) return np.array([xn * depth, yn * depth, depth], dtype=np.float64) def activation_lateral_half_span_m(anchor_uv_px: np.ndarray, target_v_px: float, stride: int, calib: dict[str, Any] | None, depth_metric: float) -> float | None: """Estimate the metric lateral half-span allowed by the UV activation around one anchor.""" if calib is None or not np.isfinite(depth_metric) or float(depth_metric) <= 0: return None anchor_u = float(anchor_uv_px[0]) u_left = anchor_u - UV_LIMIT_CELLS * float(stride) u_right = anchor_u + UV_LIMIT_CELLS * float(stride) left_3d = back_project_2d_to_3d_np((u_left, float(target_v_px)), float(depth_metric), calib) right_3d = back_project_2d_to_3d_np((u_right, float(target_v_px)), float(depth_metric), calib) if left_3d is None or right_3d is None: return None return float(0.5 * abs(right_3d[0] - left_3d[0])) def compute_yaw_targets(rot_y: float) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Reproduce the yaw classification and residual targets used by the loss.""" delta_0 = rot_y delta_1 = rot_y - math.pi / 2 delta_2 = rot_y + math.pi / 2 delta_3 = rot_y - math.pi if abs(rot_y - math.pi) < abs(rot_y + math.pi) else rot_y + math.pi angles = np.array([delta_0, delta_1, delta_2, delta_3], dtype=np.float64) ang_cls = np.clip((math.pi * 0.5 - np.abs(angles)) / (math.pi * 0.5), 0.0, 1.0) angle_valid = np.abs(angles) <= (math.pi / 2) bin_active = ang_cls > 0.1 valid_yaw = angle_valid & bin_active target_sin = np.sin(angles) return ang_cls, target_sin, valid_yaw def summarize_array(values: list[float] | np.ndarray) -> dict[str, float | int | None]: """Summarize an array with robust quantiles.""" arr = np.asarray(values, dtype=np.float64) if arr.size == 0: return { "count": 0, "min": None, "p01": None, "p10": None, "p16": None, "p50": None, "mean": None, "p84": None, "p90": None, "p99": None, "max": None, "std": None, } percentiles = np.percentile(arr, [1, 10, 16, 50, 84, 90, 99]) return { "count": int(arr.size), "min": float(arr.min()), "p01": float(percentiles[0]), "p10": float(percentiles[1]), "p16": float(percentiles[2]), "p50": float(percentiles[3]), "mean": float(arr.mean()), "p84": float(percentiles[4]), "p90": float(percentiles[5]), "p99": float(percentiles[6]), "max": float(arr.max()), "std": float(arr.std(ddof=0)), } def recommend_l1_norm(values: list[float] | np.ndarray) -> dict[str, float | None]: """Recommend offset/scale for an L1-style loss using robust location and spread.""" arr = np.asarray(values, dtype=np.float64) if arr.size == 0: return {"offset_median": None, "scale_p84_p16_half": None, "offset_mean": None, "scale_std": None} p16, p50, p84 = np.percentile(arr, [16, 50, 84]) robust_scale = max(0.5 * (p84 - p16), 1e-6) std_scale = max(float(arr.std(ddof=0)), 1e-6) return { "offset_median": float(p50), "scale_p84_p16_half": float(robust_scale), "offset_mean": float(arr.mean()), "scale_std": float(std_scale), } def normalized_abs_stats(values: list[float] | np.ndarray, offset: float | None, scale: float | None) -> dict[str, float | int | None]: """Report normalized target spread under a given offset/scale pair.""" arr = np.asarray(values, dtype=np.float64) if arr.size == 0 or offset is None or scale is None or scale <= 0: return {"count": 0, "abs_p50": None, "abs_p90": None, "abs_p99": None, "max_abs": None} norm = (arr - offset) / scale abs_norm = np.abs(norm) percentiles = np.percentile(abs_norm, [50, 90, 99]) return { "count": int(arr.size), "abs_p50": float(percentiles[0]), "abs_p90": float(percentiles[1]), "abs_p99": float(percentiles[2]), "max_abs": float(abs_norm.max()), } def logit(prob: float, eps: float = 1e-6) -> float: """Stable logit.""" p = min(max(float(prob), eps), 1.0 - eps) return float(math.log(p / (1.0 - p))) def atanh_clamped(value: float, eps: float = 1e-6) -> float: """Stable inverse tanh for target means.""" v = min(max(float(value), -1.0 + eps), 1.0 - eps) return float(np.arctanh(v)) def summarize_uv_offsets(offsets_xy: list[np.ndarray]) -> dict[str, float | int | None]: """Summarize best-case UV offsets in cells.""" if not offsets_xy: return { "count": 0, "abs_dx_p90": None, "abs_dx_p99": None, "abs_dy_p90": None, "abs_dy_p99": None, "max_abs_p90": None, "max_abs_p99": None, "over_8_rate": None, } arr = np.asarray(offsets_xy, dtype=np.float64).reshape(-1, 2) abs_dx = np.abs(arr[:, 0]) abs_dy = np.abs(arr[:, 1]) max_abs = np.maximum(abs_dx, abs_dy) p_dx = np.percentile(abs_dx, [90, 99]) p_dy = np.percentile(abs_dy, [90, 99]) p_max = np.percentile(max_abs, [90, 99]) return { "count": int(arr.shape[0]), "abs_dx_p90": float(p_dx[0]), "abs_dx_p99": float(p_dx[1]), "abs_dy_p90": float(p_dy[0]), "abs_dy_p99": float(p_dy[1]), "max_abs_p90": float(p_max[0]), "max_abs_p99": float(p_max[1]), "over_8_rate": float(np.mean(max_abs >= UV_LIMIT_CELLS)), } def init_branch_value_store(strides: list[int]) -> dict[str, dict[str, list[Any]]]: """Initialize per-branch per-stride storage.""" return { branch_name: {stride_key: [] for stride_key in [*(str(s) for s in strides), "best_any_level"]} for branch_name in BRANCH_KEYS } def accumulate_uv_offsets( uv_store: dict[str, dict[str, list[np.ndarray]]], lateral_store_m: dict[str, dict[str, list[float]]], branch_name: str, target_uv_px: np.ndarray, bbox_xyxy: np.ndarray, img_w: int, img_h: int, strides: list[int], assigner_min_box: float, calib: dict[str, Any] | None = None, depth_metric: float | None = None, ) -> None: """Accumulate best-case in-box UV offsets for one branch target.""" expanded_bbox = expand_bbox_for_assigner(bbox_xyxy, assigner_min_box) per_level: list[tuple[np.ndarray, float | None]] = [] for stride in strides: result = best_in_box_anchor_and_offset(target_uv_px, expanded_bbox, img_w, img_h, stride) if result is None: continue anchor_uv_px, offset = result uv_store[branch_name][str(stride)].append(offset) lateral_half_span_m = activation_lateral_half_span_m(anchor_uv_px, float(target_uv_px[1]), stride, calib, float(depth_metric)) if depth_metric is not None else None if lateral_half_span_m is not None: lateral_store_m[branch_name][str(stride)].append(lateral_half_span_m) per_level.append((offset, lateral_half_span_m)) if per_level: best_idx = int(np.argmin([max(abs(v[0][0]), abs(v[0][1])) for v in per_level])) best_offset, best_half_span = per_level[best_idx] uv_store[branch_name]["best_any_level"].append(best_offset) if best_half_span is not None: lateral_store_m[branch_name]["best_any_level"].append(best_half_span) def collect_report( dataset: Any, split_names: list[str], imgsz: list[int], strides: list[int], repeats: int, max_samples: int | None, face_visibility_score_thresh: float, ) -> dict[str, Any]: """Collect GT statistics and init recommendations.""" face_3d_classes = {int(x) for x in dataset.face_3d_classes} complete_3d_classes = {int(x) for x in dataset.complete_3d_classes} valid_3d_classes = face_3d_classes | complete_3d_classes assigner_min_box = float(strides[1] if len(strides) > 1 else strides[0]) z_model_whole: list[float] = [] z_model_face: dict[str, list[float]] = {name: [] for name, _, _ in FACE_SPECS} z_model_shared: list[float] = [] z_metric_whole: list[float] = [] z_metric_face: dict[str, list[float]] = {name: [] for name, _, _ in FACE_SPECS} z_metric_shared: list[float] = [] size_whole: list[float] = [] size_face: dict[str, list[float]] = {name: [] for name, _, _ in FACE_SPECS} size_shared: list[float] = [] visible_score: dict[str, list[float]] = {name: [] for name, _, _ in FACE_SPECS} cut_counts = np.zeros(3, dtype=np.int64) yaw_cls_targets: list[np.ndarray] = [] yaw_reg_targets: dict[int, list[float]] = {idx: [] for idx in range(4)} uv_store: dict[str, dict[str, list[np.ndarray]]] = init_branch_value_store(strides) lateral_store_m: dict[str, dict[str, list[float]]] = init_branch_value_store(strides) depth_scale_values: list[float] = [] fx_values: list[float] = [] z_model_shared_by_mode: dict[str, list[float]] = {} z_metric_shared_by_mode: dict[str, list[float]] = {} depth_scale_by_mode: dict[str, list[float]] = {} fx_by_mode: dict[str, list[float]] = {} uv_store_by_mode: dict[str, dict[str, dict[str, list[np.ndarray]]]] = {} lateral_store_m_by_mode: dict[str, dict[str, dict[str, list[float]]]] = {} samples_seen = 0 objects_seen = 0 valid_3d_objects_seen = 0 camera_mode_counts = {"roi": 0, "virtual": 0} total_slots = len(dataset) * max(repeats, 1) if max_samples is not None: total_slots = min(total_slots, max_samples) for repeat_idx in range(max(repeats, 1)): for sample_idx in range(len(dataset)): if max_samples is not None and samples_seen >= max_samples: break sample = dataset.get_image_and_label(sample_idx) samples_seen += 1 camera_mode = str(sample.get("camera_mode", "roi")) camera_mode_counts[camera_mode] = camera_mode_counts.get(camera_mode, 0) + 1 z_model_shared_by_mode.setdefault(camera_mode, []) z_metric_shared_by_mode.setdefault(camera_mode, []) depth_scale_by_mode.setdefault(camera_mode, []) fx_by_mode.setdefault(camera_mode, []) uv_store_by_mode.setdefault(camera_mode, init_branch_value_store(strides)) lateral_store_m_by_mode.setdefault(camera_mode, init_branch_value_store(strides)) calib = sample.get("calib") or {} depth_scale = float(calib.get("depth_scale", 1.0)) fx = float(calib.get("fx", float("nan"))) if calib is not None else float("nan") depth_scale_values.append(depth_scale) depth_scale_by_mode[camera_mode].append(depth_scale) if np.isfinite(fx): fx_values.append(fx) fx_by_mode[camera_mode].append(fx) img_h, img_w = int(sample["resized_shape"][0]), int(sample["resized_shape"][1]) bboxes = extract_normalized_xywh_bboxes(sample, img_w=img_w, img_h=img_h) cls_ids = np.asarray(sample["cls"], dtype=np.float64).reshape(-1) if sample.get("labels_3d") is None: continue labels_3d = np.asarray(sample["labels_3d"], dtype=np.float64) if labels_3d.size == 0: continue for obj_idx in range(len(labels_3d)): objects_seen += 1 cls_id = int(cls_ids[obj_idx]) target = labels_3d[obj_idx] valid_3d = cls_id in valid_3d_classes and np.isfinite(target[2]) and target[2] > 0 if not valid_3d: continue valid_3d_objects_seen += 1 bbox_xyxy = xywhn_to_xyxy(bboxes[obj_idx], img_w, img_h) is_face = cls_id in face_3d_classes is_cut = bool(is_face and infer_cut_label(target) != 0) if not is_cut: z_model_value = float(target[2]) z_metric_value = float(target[2] * depth_scale) z_model_whole.append(z_model_value) z_model_shared.append(z_model_value) z_metric_whole.append(z_metric_value) z_metric_shared.append(z_metric_value) z_model_shared_by_mode[camera_mode].append(z_model_value) z_metric_shared_by_mode[camera_mode].append(z_metric_value) whole_uv = target[7:9] if np.isfinite(whole_uv).all(): accumulate_uv_offsets( uv_store, lateral_store_m, "whole", whole_uv * np.array([img_w, img_h], dtype=np.float64), bbox_xyxy, img_w, img_h, strides, assigner_min_box, calib=calib, depth_metric=z_metric_value, ) accumulate_uv_offsets( uv_store_by_mode[camera_mode], lateral_store_m_by_mode[camera_mode], "whole", whole_uv * np.array([img_w, img_h], dtype=np.float64), bbox_xyxy, img_w, img_h, strides, assigner_min_box, calib=calib, depth_metric=z_metric_value, ) whole_sizes = target[3:6] if np.isfinite(whole_sizes).all(): size_whole.extend(whole_sizes.tolist()) size_shared.extend(whole_sizes.tolist()) rot_y = float(target[6]) if np.isfinite(rot_y): ang_cls, target_sin, valid_yaw = compute_yaw_targets(rot_y) yaw_cls_targets.append(ang_cls) for bin_idx in range(4): if valid_yaw[bin_idx]: yaw_reg_targets[bin_idx].append(float(target_sin[bin_idx])) if is_face: cut_counts[infer_cut_label(target)] += 1 for face_name, face_offset, size_indices in FACE_SPECS: face_score = float(target[face_offset + 6]) face_visible = float(target[face_offset + 7]) if face_visible == 1.0 and face_score >= 0.0: visible_score[face_name].append(face_score) if face_visible == 1.0 and face_score >= face_visibility_score_thresh: face_z_model = float(target[face_offset + 2]) face_z_metric = float(target[face_offset + 2] * depth_scale) if np.isfinite(face_z_model) and face_z_model > 0: z_model_face[face_name].append(face_z_model) z_model_shared.append(face_z_model) z_metric_face[face_name].append(face_z_metric) z_metric_shared.append(face_z_metric) z_model_shared_by_mode[camera_mode].append(face_z_model) z_metric_shared_by_mode[camera_mode].append(face_z_metric) face_size_values = target[list(size_indices)] if np.isfinite(face_size_values).all(): size_face[face_name].extend(face_size_values.tolist()) size_shared.extend(face_size_values.tolist()) face_uv = target[face_offset + 4 : face_offset + 6] if np.isfinite(face_uv).all(): uv_px = face_uv * np.array([img_w, img_h], dtype=np.float64) accumulate_uv_offsets( uv_store, lateral_store_m, face_name, uv_px, bbox_xyxy, img_w, img_h, strides, assigner_min_box, calib=calib, depth_metric=face_z_metric, ) accumulate_uv_offsets( uv_store, lateral_store_m, "visible_faces", uv_px, bbox_xyxy, img_w, img_h, strides, assigner_min_box, calib=calib, depth_metric=face_z_metric, ) accumulate_uv_offsets( uv_store_by_mode[camera_mode], lateral_store_m_by_mode[camera_mode], face_name, uv_px, bbox_xyxy, img_w, img_h, strides, assigner_min_box, calib=calib, depth_metric=face_z_metric, ) accumulate_uv_offsets( uv_store_by_mode[camera_mode], lateral_store_m_by_mode[camera_mode], "visible_faces", uv_px, bbox_xyxy, img_w, img_h, strides, assigner_min_box, calib=calib, depth_metric=face_z_metric, ) if max_samples is not None and samples_seen >= max_samples: break yaw_cls_mean = np.mean(np.stack(yaw_cls_targets, axis=0), axis=0) if yaw_cls_targets else np.full(4, np.nan, dtype=np.float64) yaw_reg_mean = np.array( [np.mean(np.asarray(yaw_reg_targets[idx], dtype=np.float64)) if yaw_reg_targets[idx] else np.nan for idx in range(4)], dtype=np.float64, ) cut_priors = cut_counts / cut_counts.sum() if cut_counts.sum() > 0 else np.full(3, np.nan, dtype=np.float64) current_norm = dataset.norm_scales_3d or {} z_norm_rec = recommend_l1_norm(z_model_shared) size_norm_rec = recommend_l1_norm(size_shared) recommended_norm = { "z3d_offset": z_norm_rec["offset_median"], "z3d_scale": z_norm_rec["scale_p84_p16_half"], "size_offset": size_norm_rec["offset_median"], "size_scale": size_norm_rec["scale_p84_p16_half"], "yaw_scale": current_norm.get("yaw_scale", float(math.pi / 2)), } report = { "dataset": { "split": split_names, "imgsz": imgsz, "strides": strides, "fraction": float(dataset.fraction), "selected_roi": None, "samples_seen": samples_seen, "max_possible_samples": total_slots, "repeats": repeats, "camera_modes": camera_mode_counts, }, "counts": { "objects_seen": objects_seen, "valid_3d_objects_seen": valid_3d_objects_seen, "face_3d_classes": sorted(face_3d_classes), "complete_3d_classes": sorted(complete_3d_classes), }, "current_norm_scales_3d": { "z3d_scale": current_norm.get("z3d_scale"), "z3d_offset": current_norm.get("z3d_offset"), "size_scale": current_norm.get("size_scale"), "size_offset": current_norm.get("size_offset"), "yaw_scale": current_norm.get("yaw_scale"), }, "recommended_norm_scales_3d": recommended_norm, "recommended_norm_scales_3d_mean_std": { "z3d_offset": z_norm_rec["offset_mean"], "z3d_scale": z_norm_rec["scale_std"], "size_offset": size_norm_rec["offset_mean"], "size_scale": size_norm_rec["scale_std"], "yaw_scale": current_norm.get("yaw_scale", float(math.pi / 2)), }, "camera_geometry": { "depth_scale": summarize_array(depth_scale_values), "fx": summarize_array(fx_values), "by_camera_mode": { mode: { "depth_scale": summarize_array(depth_scale_by_mode.get(mode, [])), "fx": summarize_array(fx_by_mode.get(mode, [])), } for mode in sorted(depth_scale_by_mode) }, }, "target_stats": { "z_model_whole_supervised": summarize_array(z_model_whole), "z_model_face_visible_supervised": { face_name: summarize_array(values) for face_name, values in z_model_face.items() }, "z_model_shared_supervised": summarize_array(z_model_shared), "z_metric_whole_restored": summarize_array(z_metric_whole), "z_metric_face_visible_restored": { face_name: summarize_array(values) for face_name, values in z_metric_face.items() }, "z_metric_shared_restored": summarize_array(z_metric_shared), "size_whole_supervised": summarize_array(size_whole), "size_face_visible_supervised": { face_name: summarize_array(values) for face_name, values in size_face.items() }, "size_shared_supervised": summarize_array(size_shared), "visible_score_supervised": { face_name: summarize_array(values) for face_name, values in visible_score.items() }, "visible_score_supervised_all": summarize_array([x for values in visible_score.values() for x in values]), "cut_priors": { "normal": float(cut_priors[0]) if np.isfinite(cut_priors[0]) else None, "cut_in": float(cut_priors[1]) if np.isfinite(cut_priors[1]) else None, "cut_out": float(cut_priors[2]) if np.isfinite(cut_priors[2]) else None, }, "yaw_cls_mean_soft_target": [float(x) if np.isfinite(x) else None for x in yaw_cls_mean.tolist()], "yaw_reg_mean_sin_target": [float(x) if np.isfinite(x) else None for x in yaw_reg_mean.tolist()], "by_camera_mode": { mode: { "z_model_shared_supervised": summarize_array(z_model_shared_by_mode.get(mode, [])), "z_metric_shared_restored": summarize_array(z_metric_shared_by_mode.get(mode, [])), } for mode in sorted(z_model_shared_by_mode) }, }, "normalization_diagnostics": { "z_current": normalized_abs_stats(z_model_shared, current_norm.get("z3d_offset"), current_norm.get("z3d_scale")), "z_recommended": normalized_abs_stats(z_model_shared, recommended_norm["z3d_offset"], recommended_norm["z3d_scale"]), "size_current": normalized_abs_stats(size_shared, current_norm.get("size_offset"), current_norm.get("size_scale")), "size_recommended": normalized_abs_stats(size_shared, recommended_norm["size_offset"], recommended_norm["size_scale"]), }, "uv_limits": { "activation_limit_cells": UV_LIMIT_CELLS, "activation_limit_pixels": {str(stride): float(UV_LIMIT_CELLS * stride) for stride in strides}, "whole": { "best_any_level": summarize_uv_offsets(uv_store["whole"]["best_any_level"]), "by_stride": {str(stride): summarize_uv_offsets(uv_store["whole"][str(stride)]) for stride in strides}, }, "visible_faces": { "best_any_level": summarize_uv_offsets(uv_store["visible_faces"]["best_any_level"]), "by_stride": {str(stride): summarize_uv_offsets(uv_store["visible_faces"][str(stride)]) for stride in strides}, }, "per_face_best_any_level": { face_name: summarize_uv_offsets(uv_store[face_name]["best_any_level"]) for face_name, _, _ in FACE_SPECS }, "lateral_half_span_m": { "whole": { "best_any_level": summarize_array(lateral_store_m["whole"]["best_any_level"]), "by_stride": {str(stride): summarize_array(lateral_store_m["whole"][str(stride)]) for stride in strides}, }, "visible_faces": { "best_any_level": summarize_array(lateral_store_m["visible_faces"]["best_any_level"]), "by_stride": {str(stride): summarize_array(lateral_store_m["visible_faces"][str(stride)]) for stride in strides}, }, }, "by_camera_mode": { mode: { "whole": { "best_any_level": summarize_uv_offsets(uv_store_by_mode[mode]["whole"]["best_any_level"]), "lateral_half_span_m": summarize_array(lateral_store_m_by_mode[mode]["whole"]["best_any_level"]), }, "visible_faces": { "best_any_level": summarize_uv_offsets(uv_store_by_mode[mode]["visible_faces"]["best_any_level"]), "lateral_half_span_m": summarize_array(lateral_store_m_by_mode[mode]["visible_faces"]["best_any_level"]), }, } for mode in sorted(uv_store_by_mode) }, }, "bias_init_advice": { "visible_score_bias_median": { face_name: summarize_array(values)["p50"] for face_name, values in visible_score.items() }, "cut_logit_bias_log_prior": { "normal": float(np.log(cut_priors[0])) if cut_priors.sum() > 0 and cut_priors[0] > 0 else None, "cut_in": float(np.log(cut_priors[1])) if cut_priors.sum() > 0 and cut_priors[1] > 0 else None, "cut_out": float(np.log(cut_priors[2])) if cut_priors.sum() > 0 and cut_priors[2] > 0 else None, }, "yaw_cls_bias_logit_mean_target": [ logit(prob) if np.isfinite(prob) else None for prob in yaw_cls_mean.tolist() ], "yaw_reg_bias_atanh_mean_target": [ atanh_clamped(value) if np.isfinite(value) else None for value in yaw_reg_mean.tolist() ], }, "notes": build_notes( current_norm=current_norm, recommended_norm=recommended_norm, uv_whole_best=summarize_uv_offsets(uv_store["whole"]["best_any_level"]), uv_face_best=summarize_uv_offsets(uv_store["visible_faces"]["best_any_level"]), visible_score_stats={face_name: summarize_array(values) for face_name, values in visible_score.items()}, cut_priors=cut_priors, depth_scale_stats=summarize_array(depth_scale_values), ), } return report def build_notes( current_norm: dict[str, Any], recommended_norm: dict[str, Any], uv_whole_best: dict[str, Any], uv_face_best: dict[str, Any], visible_score_stats: dict[str, dict[str, Any]], cut_priors: np.ndarray, depth_scale_stats: dict[str, Any], ) -> list[str]: """Generate concise recommendations from the measured stats.""" notes = [ "The Detect3D head does not regress x3d/y3d directly; lateral position is recovered from bounded UV offsets plus depth.", "The current UV decoder is sigmoid(raw) * 16 - 8, so each branch is hard-limited to about (-8, 8) grid cells.", "For L1 losses, median-based offsets are usually better zero-bias priors than mean-based offsets.", "The recommended z3d_offset/z3d_scale values are computed in model space after ROI or virtual-camera depth normalization.", "Use the restored metric-depth and lateral-half-span stats only to interpret real-world range, not to set norm_scales_3d directly.", ] current_z_offset = current_norm.get("z3d_offset") rec_z_offset = recommended_norm.get("z3d_offset") rec_z_scale = recommended_norm.get("z3d_scale") if current_z_offset is not None and rec_z_offset is not None and rec_z_scale is not None: if abs(float(current_z_offset) - float(rec_z_offset)) > 0.25 * float(rec_z_scale): notes.append( f"Current z3d_offset={float(current_z_offset):.3f} is noticeably away from the L1-centered median {float(rec_z_offset):.3f}." ) current_size_offset = current_norm.get("size_offset") rec_size_offset = recommended_norm.get("size_offset") rec_size_scale = recommended_norm.get("size_scale") if current_size_offset is not None and rec_size_offset is not None and rec_size_scale is not None: if abs(float(current_size_offset) - float(rec_size_offset)) > 0.25 * float(rec_size_scale): notes.append( f"Current size_offset={float(current_size_offset):.3f} is noticeably away from the L1-centered median {float(rec_size_offset):.3f}." ) uv_whole_rate = uv_whole_best.get("over_8_rate") if uv_whole_rate is not None and uv_whole_rate > 0.01: notes.append( f"Even with the best in-box anchor across P3/P4/P5, {100.0 * float(uv_whole_rate):.2f}% of whole-box UV targets exceed the +/-8-cell range." ) uv_face_rate = uv_face_best.get("over_8_rate") if uv_face_rate is not None and uv_face_rate > 0.01: notes.append( f"Visible-face UV targets still exceed the +/-8-cell range for {100.0 * float(uv_face_rate):.2f}% of best-case assignments." ) vis_medians = {face: stats.get("p50") for face, stats in visible_score_stats.items() if stats.get("p50") is not None} if vis_medians and max(vis_medians.values()) > 0.1: notes.append( "Visible-score channels only get positive-face regression supervision, so zero bias may start far below the typical target." ) if depth_scale_stats.get("count", 0) > 0 and depth_scale_stats.get("std") is not None and float(depth_scale_stats["std"]) > 0.05: notes.append( f"depth_scale varies materially across samples (std={float(depth_scale_stats['std']):.3f}); metric depth ranges should be read from the restored stats, not the model-space z stats." ) if np.isfinite(cut_priors).all() and cut_priors[0] > 0.7: notes.append( "Cut-state labels are strongly imbalanced toward 'normal'; initializing cut logits from log priors should reduce the initial CE loss." ) return notes def print_report(report: dict[str, Any]) -> None: """Print a concise human-readable report.""" dataset = report["dataset"] current_norm = report["current_norm_scales_3d"] recommended_norm = report["recommended_norm_scales_3d"] current_mean_std = report["recommended_norm_scales_3d_mean_std"] camera_geometry = report["camera_geometry"] target_stats = report["target_stats"] norm_diag = report["normalization_diagnostics"] uv_limits = report["uv_limits"] bias_advice = report["bias_init_advice"] print("== Ground3D Detect3D Head Target Analysis ==") print( f"split={','.join(dataset['split'])} imgsz={tuple(dataset['imgsz'])} strides={dataset['strides']} " f"samples_seen={dataset['samples_seen']} repeats={dataset['repeats']} camera_modes={dataset['camera_modes']}" ) print() print("Current norm_scales_3d:") print(json.dumps(current_norm, indent=2, ensure_ascii=True)) print() print("Recommended norm_scales_3d (median + 0.5*(p84-p16)):") print(json.dumps(recommended_norm, indent=2, ensure_ascii=True)) print() print("Alternative norm_scales_3d (mean + std):") print(json.dumps(current_mean_std, indent=2, ensure_ascii=True)) print() print("Camera geometry / depth scaling:") print(" depth_scale:", json.dumps(camera_geometry["depth_scale"], ensure_ascii=True)) print(" fx:", json.dumps(camera_geometry["fx"], ensure_ascii=True)) print(" by_camera_mode:", json.dumps(camera_geometry["by_camera_mode"], ensure_ascii=True)) print() print("Shared target stats:") print(" z_model_shared_supervised:", json.dumps(target_stats["z_model_shared_supervised"], ensure_ascii=True)) print(" z_metric_shared_restored:", json.dumps(target_stats["z_metric_shared_restored"], ensure_ascii=True)) print(" size_shared_supervised:", json.dumps(target_stats["size_shared_supervised"], ensure_ascii=True)) print(" by_camera_mode:", json.dumps(target_stats["by_camera_mode"], ensure_ascii=True)) print() print("Normalized spread diagnostics:") print(" z_current:", json.dumps(norm_diag["z_current"], ensure_ascii=True)) print(" z_recommended:", json.dumps(norm_diag["z_recommended"], ensure_ascii=True)) print(" size_current:", json.dumps(norm_diag["size_current"], ensure_ascii=True)) print(" size_recommended:", json.dumps(norm_diag["size_recommended"], ensure_ascii=True)) print() print("UV activation limits:") print( f" limit_cells=+/-{uv_limits['activation_limit_cells']} " f"limit_pixels={uv_limits['activation_limit_pixels']}" ) print(" whole.best_any_level:", json.dumps(uv_limits["whole"]["best_any_level"], ensure_ascii=True)) print(" visible_faces.best_any_level:", json.dumps(uv_limits["visible_faces"]["best_any_level"], ensure_ascii=True)) print(" whole.lateral_half_span_m.best_any_level:", json.dumps(uv_limits["lateral_half_span_m"]["whole"]["best_any_level"], ensure_ascii=True)) print(" visible_faces.lateral_half_span_m.best_any_level:", json.dumps(uv_limits["lateral_half_span_m"]["visible_faces"]["best_any_level"], ensure_ascii=True)) print(" whole.by_stride:", json.dumps(uv_limits["whole"]["by_stride"], ensure_ascii=True)) print(" visible_faces.by_stride:", json.dumps(uv_limits["visible_faces"]["by_stride"], ensure_ascii=True)) print(" by_camera_mode:", json.dumps(uv_limits["by_camera_mode"], ensure_ascii=True)) print() print("Bias-init advice:") print(" visible_score_bias_median:", json.dumps(bias_advice["visible_score_bias_median"], ensure_ascii=True)) print(" cut_logit_bias_log_prior:", json.dumps(bias_advice["cut_logit_bias_log_prior"], ensure_ascii=True)) print(" yaw_cls_bias_logit_mean_target:", json.dumps(bias_advice["yaw_cls_bias_logit_mean_target"], ensure_ascii=True)) print(" yaw_reg_bias_atanh_mean_target:", json.dumps(bias_advice["yaw_reg_bias_atanh_mean_target"], ensure_ascii=True)) print() print("Notes:") for note in report["notes"]: print(f"- {note}") def main() -> None: """Run the analysis.""" args = parse_args() random.seed(args.seed) np.random.seed(args.seed) split_names = [x.strip() for x in args.split.split(",") if x.strip()] if not split_names: raise ValueError("At least one split name is required.") imgsz = parse_imgsz(args.imgsz) strides = parse_int_list(args.strides) if not strides: raise ValueError("At least one stride is required.") data_cfg, selected_roi = load_data_cfg(args.data, args.roi) dataset = make_dataset( data_cfg=data_cfg, split_names=split_names, imgsz=imgsz, fraction=args.fraction, augment=bool(args.augment), face_visibility_score_thresh=float(args.face_visibility_score_thresh), ) report = collect_report( dataset=dataset, split_names=split_names, imgsz=imgsz, strides=strides, repeats=max(int(args.repeats), 1), max_samples=args.max_samples, face_visibility_score_thresh=float(args.face_visibility_score_thresh), ) report["dataset"]["selected_roi"] = selected_roi print_report(report) if args.json_output: output_path = Path(args.json_output).expanduser() output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(report, indent=2, ensure_ascii=True), encoding="utf-8") if __name__ == "__main__": main()