"""2D metrics calculation module. Supports optional per-distance-range evaluation for 3D-capable classes (vehicle, pedestrian, bicycle, rider) that carry z3d / x3d coordinates. Storage per detection: (confidence, is_tp, z, x) z = GT z3d for TPs; detection's own predicted z3d for FPs. x = GT x3d for TPs; detection's own predicted x3d for FPs. z, x = None when no 3D output (2D-only classes). Two evaluation views are produced when ``distance_ranges`` is configured: per_class_by_distance - all lateral positions + longitudinal bins per_class_by_distance_lat_roi - lateral pre-filter (``lateral_roi``) + longitudinal bins only produced when ``lateral_roi`` is also set. """ import numpy as np from collections import defaultdict class Metrics2D: """Calculate 2D detection metrics (Precision, Recall, AP, mAP). Parameters ---------- num_classes : int distance_ranges : list of [z_min, z_max] pairs (metres), optional Longitudinal distance bins for per-range evaluation. lateral_roi : [x_min, x_max] pair (metres), optional Lateral region-of-interest filter applied on top of ``distance_ranges`` to produce a second "lat-filtered" section in the summary. e.g. [-15, 15] keeps only targets within 15 m of the vehicle centre line. """ def __init__(self, num_classes=14, distance_ranges=None, lateral_roi=None, coord_system='camera'): self.num_classes = num_classes self.distance_ranges = distance_ranges # [[z0,z1], [z1,z2], ...] self.lateral_roi = lateral_roi # [x_min, x_max] or None if coord_system not in ('camera', 'ego'): raise ValueError(f"Unsupported coord_system: {coord_system}") self.coord_system = coord_system # Per detection: (confidence, is_tp, z, x) self.all_detections = defaultdict(list) # Per GT: (z, x) -- both None for 2D-only classes self.all_gt_coords = defaultdict(list) def _get_lateral_axis(self): return 0 if self.coord_system == 'camera' else 1 def _get_longitudinal_axis(self): return 2 if self.coord_system == 'camera' else 0 # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _gt_count(self, class_id, z_range=None, x_range=None): """Count GTs matching optional z_range and/or x_range filters.""" count = 0 for z, x in self.all_gt_coords[class_id]: if z_range is not None: if z is None or not (z_range[0] <= z < z_range[1]): continue if x_range is not None: if x is None or not (x_range[0] <= x < x_range[1]): continue count += 1 return count def _filter_dets(self, class_id, z_range=None, x_range=None): """Return detections matching optional z_range and/or x_range filters. TPs use matched GT (z, x); FPs use detection's own predicted (z, x). Detections without coordinates are excluded from any range query. """ dets = self.all_detections[class_id] if z_range is None and x_range is None: return dets result = [] for conf, is_tp, z, x in dets: if z_range is not None: if z is None or not (z_range[0] <= z < z_range[1]): continue if x_range is not None: if x is None or not (x_range[0] <= x < x_range[1]): continue result.append((conf, is_tp, z, x)) return result # ------------------------------------------------------------------ # Data ingestion # ------------------------------------------------------------------ def add_image_results(self, match_result, all_gts, all_dets, class_id): """Add matching results from one image. Args: match_result : dict from Matcher2D.match() all_gts : unused (kept for API compatibility) all_dets : unused class_id : int """ gts_filtered = match_result['gts_filtered'] dets_sorted = match_result['dets_sorted'] matches = match_result['matches'] # TP: map det_idx -> (z, x) from matched GT det_to_gt_coords = {} for gt_idx, det_idx, _iou in matches: gt = gts_filtered[gt_idx] d3d = gt.get('3d_info') if d3d is not None: long_axis = self._get_longitudinal_axis() lat_axis = self._get_lateral_axis() z, x = d3d['center'][long_axis], d3d['center'][lat_axis] else: z, x = None, None det_to_gt_coords[det_idx] = (z, x) # Record all GT coordinates (for GT-count denominator) for gt in gts_filtered: d3d = gt.get('3d_info') if d3d is not None: long_axis = self._get_longitudinal_axis() lat_axis = self._get_lateral_axis() self.all_gt_coords[class_id].append((d3d['center'][long_axis], d3d['center'][lat_axis])) else: self.all_gt_coords[class_id].append((None, None)) # Record each detection for det_idx, det in enumerate(dets_sorted): is_tp = det_idx in det_to_gt_coords if is_tp: z, x = det_to_gt_coords[det_idx] else: # FP: use detection's own predicted 3D coordinates d3d = det.get('3d_info') if d3d is not None: long_axis = self._get_longitudinal_axis() lat_axis = self._get_lateral_axis() z, x = d3d['center'][long_axis], d3d['center'][lat_axis] else: z, x = None, None self.all_detections[class_id].append((det['confidence'], is_tp, z, x)) # ------------------------------------------------------------------ # Core metric computation (all accept optional spatial filters) # ------------------------------------------------------------------ def compute_precision_recall(self, class_id, z_range=None, x_range=None): """Compute precision-recall curve for a class. Returns (precisions, recalls, confidences) as numpy arrays. """ if class_id not in self.all_detections or not self.all_detections[class_id]: return np.array([]), np.array([]), np.array([]) detections = self._filter_dets(class_id, z_range=z_range, x_range=x_range) gt_count = self._gt_count(class_id, z_range=z_range, x_range=x_range) if not detections: return np.array([]), np.array([]), np.array([]) detections = sorted(detections, key=lambda d: d[0], reverse=True) confidences = np.array([d[0] for d in detections]) is_tp = np.array([d[1] for d in detections]) tp_cumsum = np.cumsum(is_tp) fp_cumsum = np.cumsum(~is_tp) precisions = tp_cumsum / (tp_cumsum + fp_cumsum) recalls = tp_cumsum / max(gt_count, 1) return precisions, recalls, confidences def compute_ap(self, class_id, method='voc2010', z_range=None, x_range=None): """Compute Average Precision for a class.""" precisions, recalls, _ = self.compute_precision_recall( class_id, z_range=z_range, x_range=x_range) if len(precisions) == 0: return 0.0 if method == 'voc2010': ap = 0.0 for t in np.linspace(0, 1, 11): mask = recalls >= t ap += (float(np.max(precisions[mask])) if mask.any() else 0.0) / 11.0 return ap elif method == 'coco': mrec = np.concatenate(([0.], recalls, [1.])) mpre = np.concatenate(([0.], precisions, [0.])) for i in range(mpre.size - 1, 0, -1): mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i]) i = np.where(mrec[1:] != mrec[:-1])[0] return float(np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])) else: raise ValueError(f"Unknown AP method: {method}") def compute_map(self, method='voc2010', z_range=None, x_range=None): """Compute mAP across all classes.""" return float(np.mean([ self.compute_ap(c, method, z_range=z_range, x_range=x_range) for c in range(self.num_classes) ])) def get_class_metrics(self, class_id, conf_threshold=0.5, z_range=None, x_range=None): """Get Precision/Recall/F1/TP/FP/FN at conf_threshold for one class.""" gt_count = self._gt_count(class_id, z_range=z_range, x_range=x_range) if class_id not in self.all_detections: return dict(precision=0., recall=0., f1_score=0., tp=0, fp=0, fn=gt_count) dets = self._filter_dets(class_id, z_range=z_range, x_range=x_range) filtered = [(conf, is_tp) for conf, is_tp, *_ in dets if conf >= conf_threshold] if not filtered: return dict(precision=0., recall=0., f1_score=0., tp=0, fp=0, fn=gt_count) tp = sum(1 for _, is_tp in filtered if is_tp) fp = len(filtered) - tp fn = gt_count - tp p = tp / (tp + fp) if (tp + fp) > 0 else 0. r = tp / (tp + fn) if (tp + fn) > 0 else 0. f1 = 2 * p * r / (p + r) if (p + r) > 0 else 0. return dict(precision=p, recall=r, f1_score=f1, tp=tp, fp=fp, fn=fn) def get_overall_metrics(self, conf_threshold=0.5, z_range=None, x_range=None): """Aggregate Precision/Recall/F1 across all classes.""" total_tp = total_fp = total_fn = 0 for c in range(self.num_classes): m = self.get_class_metrics(c, conf_threshold, z_range=z_range, x_range=x_range) total_tp += m['tp'] total_fp += m['fp'] total_fn += m['fn'] p = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0. r = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0. f1 = 2 * p * r / (p + r) if (p + r) > 0 else 0. return dict(precision=p, recall=r, f1_score=f1, tp=total_tp, fp=total_fp, fn=total_fn) # ------------------------------------------------------------------ # Summary builder # ------------------------------------------------------------------ def _build_dist_table(self, conf_threshold, ap_method, x_range=None): """Build {class_name: {range_key: metrics_dict}} for given x_range filter. Args: x_range : None -> all lateral positions (full-lateral view) tuple -> restricted lateral ROI """ from .parser import GroundTruthParser table = {} for class_id in range(self.num_classes): class_name = GroundTruthParser.CLASS_NAMES.get(class_id, f"class_{class_id}") # Skip classes with no 3D coordinates if not any(z is not None for z, _ in self.all_gt_coords.get(class_id, [])): continue rows = {} for r in self.distance_ranges: z_range = (r[0], r[1]) range_key = f"{r[0]}-{r[1]}m" gt_count = self._gt_count(class_id, z_range=z_range, x_range=x_range) metrics = self.get_class_metrics( class_id, conf_threshold, z_range=z_range, x_range=x_range) ap = self.compute_ap( class_id, ap_method, z_range=z_range, x_range=x_range) rows[range_key] = dict( precision=metrics['precision'], recall=metrics['recall'], f1_score=metrics['f1_score'], ap=ap, num_gt=gt_count, tp=metrics['tp'], fp=metrics['fp'], fn=metrics['fn'], ) table[class_name] = rows return table def get_summary(self, conf_threshold=0.5, ap_method='voc2010'): """Return the complete evaluation summary dict. Keys always present: per_class - overall per-class metrics (all objects, no spatial filter) overall - micro-aggregated overall metrics Keys present when ``distance_ranges`` is configured: per_class_by_distance - longitudinal bins, all lateral positions per_class_by_distance_lat_roi - longitudinal bins inside lateral_roi (only when ``lateral_roi`` is also set) lateral_roi - the configured [x_min, x_max] value """ from .parser import GroundTruthParser summary = {'per_class': {}, 'overall': {}} # Per-class overall (no spatial filter) for class_id in range(self.num_classes): class_name = GroundTruthParser.CLASS_NAMES.get(class_id, f"class_{class_id}") metrics = self.get_class_metrics(class_id, conf_threshold) ap = self.compute_ap(class_id, ap_method) summary['per_class'][class_name] = dict( precision=metrics['precision'], recall=metrics['recall'], f1_score=metrics['f1_score'], ap=ap, num_gt=self._gt_count(class_id), num_det=len(self.all_detections.get(class_id, [])), tp=metrics['tp'], fp=metrics['fp'], fn=metrics['fn'], ) # Overall aggregated overall = self.get_overall_metrics(conf_threshold) summary['overall'] = dict( precision=overall['precision'], recall=overall['recall'], f1_score=overall['f1_score'], map=self.compute_map(ap_method), num_classes=self.num_classes, tp=overall['tp'], fp=overall['fp'], fn=overall['fn'], ) if self.distance_ranges: # View 1: all lateral positions + longitudinal bins summary['per_class_by_distance'] = self._build_dist_table( conf_threshold, ap_method, x_range=None) # View 2: lateral ROI + longitudinal bins if self.lateral_roi is not None: x_range = (self.lateral_roi[0], self.lateral_roi[1]) summary['per_class_by_distance_lat_roi'] = self._build_dist_table( conf_threshold, ap_method, x_range=x_range) summary['lateral_roi'] = self.lateral_roi return summary