349 lines
14 KiB
Python
Executable File
349 lines
14 KiB
Python
Executable File
"""2D metrics calculation module.
|
|
|
|
Supports optional per-distance-range evaluation for 3D-capable classes
|
|
(vehicle, pedestrian, bicycle, rider) that carry z3d / x3d coordinates.
|
|
|
|
Storage per detection: (confidence, is_tp, z, x)
|
|
z = GT z3d for TPs; detection's own predicted z3d for FPs.
|
|
x = GT x3d for TPs; detection's own predicted x3d for FPs.
|
|
z, x = None when no 3D output (2D-only classes).
|
|
|
|
Two evaluation views are produced when ``distance_ranges`` is configured:
|
|
per_class_by_distance - all lateral positions + longitudinal bins
|
|
per_class_by_distance_lat_roi - lateral pre-filter (``lateral_roi``) + longitudinal bins
|
|
only produced when ``lateral_roi`` is also set.
|
|
"""
|
|
import numpy as np
|
|
from collections import defaultdict
|
|
|
|
|
|
class Metrics2D:
|
|
"""Calculate 2D detection metrics (Precision, Recall, AP, mAP).
|
|
|
|
Parameters
|
|
----------
|
|
num_classes : int
|
|
distance_ranges : list of [z_min, z_max] pairs (metres), optional
|
|
Longitudinal distance bins for per-range evaluation.
|
|
lateral_roi : [x_min, x_max] pair (metres), optional
|
|
Lateral region-of-interest filter applied on top of ``distance_ranges``
|
|
to produce a second "lat-filtered" section in the summary.
|
|
e.g. [-15, 15] keeps only targets within 15 m of the vehicle centre line.
|
|
"""
|
|
|
|
def __init__(self, num_classes=14, distance_ranges=None, lateral_roi=None, coord_system='camera'):
|
|
self.num_classes = num_classes
|
|
self.distance_ranges = distance_ranges # [[z0,z1], [z1,z2], ...]
|
|
self.lateral_roi = lateral_roi # [x_min, x_max] or None
|
|
if coord_system not in ('camera', 'ego'):
|
|
raise ValueError(f"Unsupported coord_system: {coord_system}")
|
|
self.coord_system = coord_system
|
|
|
|
# Per detection: (confidence, is_tp, z, x)
|
|
self.all_detections = defaultdict(list)
|
|
|
|
# Per GT: (z, x) -- both None for 2D-only classes
|
|
self.all_gt_coords = defaultdict(list)
|
|
|
|
def _get_lateral_axis(self):
|
|
return 0 if self.coord_system == 'camera' else 1
|
|
|
|
def _get_longitudinal_axis(self):
|
|
return 2 if self.coord_system == 'camera' else 0
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _gt_count(self, class_id, z_range=None, x_range=None):
|
|
"""Count GTs matching optional z_range and/or x_range filters."""
|
|
count = 0
|
|
for z, x in self.all_gt_coords[class_id]:
|
|
if z_range is not None:
|
|
if z is None or not (z_range[0] <= z < z_range[1]):
|
|
continue
|
|
if x_range is not None:
|
|
if x is None or not (x_range[0] <= x < x_range[1]):
|
|
continue
|
|
count += 1
|
|
return count
|
|
|
|
def _filter_dets(self, class_id, z_range=None, x_range=None):
|
|
"""Return detections matching optional z_range and/or x_range filters.
|
|
|
|
TPs use matched GT (z, x); FPs use detection's own predicted (z, x).
|
|
Detections without coordinates are excluded from any range query.
|
|
"""
|
|
dets = self.all_detections[class_id]
|
|
if z_range is None and x_range is None:
|
|
return dets
|
|
result = []
|
|
for conf, is_tp, z, x in dets:
|
|
if z_range is not None:
|
|
if z is None or not (z_range[0] <= z < z_range[1]):
|
|
continue
|
|
if x_range is not None:
|
|
if x is None or not (x_range[0] <= x < x_range[1]):
|
|
continue
|
|
result.append((conf, is_tp, z, x))
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Data ingestion
|
|
# ------------------------------------------------------------------
|
|
|
|
def add_image_results(self, match_result, all_gts, all_dets, class_id):
|
|
"""Add matching results from one image.
|
|
|
|
Args:
|
|
match_result : dict from Matcher2D.match()
|
|
all_gts : unused (kept for API compatibility)
|
|
all_dets : unused
|
|
class_id : int
|
|
"""
|
|
gts_filtered = match_result['gts_filtered']
|
|
dets_sorted = match_result['dets_sorted']
|
|
matches = match_result['matches']
|
|
|
|
# TP: map det_idx -> (z, x) from matched GT
|
|
det_to_gt_coords = {}
|
|
for gt_idx, det_idx, _iou in matches:
|
|
gt = gts_filtered[gt_idx]
|
|
d3d = gt.get('3d_info')
|
|
if d3d is not None:
|
|
long_axis = self._get_longitudinal_axis()
|
|
lat_axis = self._get_lateral_axis()
|
|
z, x = d3d['center'][long_axis], d3d['center'][lat_axis]
|
|
else:
|
|
z, x = None, None
|
|
det_to_gt_coords[det_idx] = (z, x)
|
|
|
|
# Record all GT coordinates (for GT-count denominator)
|
|
for gt in gts_filtered:
|
|
d3d = gt.get('3d_info')
|
|
if d3d is not None:
|
|
long_axis = self._get_longitudinal_axis()
|
|
lat_axis = self._get_lateral_axis()
|
|
self.all_gt_coords[class_id].append((d3d['center'][long_axis], d3d['center'][lat_axis]))
|
|
else:
|
|
self.all_gt_coords[class_id].append((None, None))
|
|
|
|
# Record each detection
|
|
for det_idx, det in enumerate(dets_sorted):
|
|
is_tp = det_idx in det_to_gt_coords
|
|
if is_tp:
|
|
z, x = det_to_gt_coords[det_idx]
|
|
else:
|
|
# FP: use detection's own predicted 3D coordinates
|
|
d3d = det.get('3d_info')
|
|
if d3d is not None:
|
|
long_axis = self._get_longitudinal_axis()
|
|
lat_axis = self._get_lateral_axis()
|
|
z, x = d3d['center'][long_axis], d3d['center'][lat_axis]
|
|
else:
|
|
z, x = None, None
|
|
self.all_detections[class_id].append((det['confidence'], is_tp, z, x))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Core metric computation (all accept optional spatial filters)
|
|
# ------------------------------------------------------------------
|
|
|
|
def compute_precision_recall(self, class_id, z_range=None, x_range=None):
|
|
"""Compute precision-recall curve for a class.
|
|
|
|
Returns (precisions, recalls, confidences) as numpy arrays.
|
|
"""
|
|
if class_id not in self.all_detections or not self.all_detections[class_id]:
|
|
return np.array([]), np.array([]), np.array([])
|
|
|
|
detections = self._filter_dets(class_id, z_range=z_range, x_range=x_range)
|
|
gt_count = self._gt_count(class_id, z_range=z_range, x_range=x_range)
|
|
|
|
if not detections:
|
|
return np.array([]), np.array([]), np.array([])
|
|
|
|
detections = sorted(detections, key=lambda d: d[0], reverse=True)
|
|
confidences = np.array([d[0] for d in detections])
|
|
is_tp = np.array([d[1] for d in detections])
|
|
|
|
tp_cumsum = np.cumsum(is_tp)
|
|
fp_cumsum = np.cumsum(~is_tp)
|
|
precisions = tp_cumsum / (tp_cumsum + fp_cumsum)
|
|
recalls = tp_cumsum / max(gt_count, 1)
|
|
|
|
return precisions, recalls, confidences
|
|
|
|
def compute_ap(self, class_id, method='voc2010', z_range=None, x_range=None):
|
|
"""Compute Average Precision for a class."""
|
|
precisions, recalls, _ = self.compute_precision_recall(
|
|
class_id, z_range=z_range, x_range=x_range)
|
|
|
|
if len(precisions) == 0:
|
|
return 0.0
|
|
|
|
if method == 'voc2010':
|
|
ap = 0.0
|
|
for t in np.linspace(0, 1, 11):
|
|
mask = recalls >= t
|
|
ap += (float(np.max(precisions[mask])) if mask.any() else 0.0) / 11.0
|
|
return ap
|
|
elif method == 'coco':
|
|
mrec = np.concatenate(([0.], recalls, [1.]))
|
|
mpre = np.concatenate(([0.], precisions, [0.]))
|
|
for i in range(mpre.size - 1, 0, -1):
|
|
mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])
|
|
i = np.where(mrec[1:] != mrec[:-1])[0]
|
|
return float(np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1]))
|
|
else:
|
|
raise ValueError(f"Unknown AP method: {method}")
|
|
|
|
def compute_map(self, method='voc2010', z_range=None, x_range=None):
|
|
"""Compute mAP across all classes."""
|
|
return float(np.mean([
|
|
self.compute_ap(c, method, z_range=z_range, x_range=x_range)
|
|
for c in range(self.num_classes)
|
|
]))
|
|
|
|
def get_class_metrics(self, class_id, conf_threshold=0.5, z_range=None, x_range=None):
|
|
"""Get Precision/Recall/F1/TP/FP/FN at conf_threshold for one class."""
|
|
gt_count = self._gt_count(class_id, z_range=z_range, x_range=x_range)
|
|
|
|
if class_id not in self.all_detections:
|
|
return dict(precision=0., recall=0., f1_score=0., tp=0, fp=0, fn=gt_count)
|
|
|
|
dets = self._filter_dets(class_id, z_range=z_range, x_range=x_range)
|
|
filtered = [(conf, is_tp) for conf, is_tp, *_ in dets if conf >= conf_threshold]
|
|
|
|
if not filtered:
|
|
return dict(precision=0., recall=0., f1_score=0., tp=0, fp=0, fn=gt_count)
|
|
|
|
tp = sum(1 for _, is_tp in filtered if is_tp)
|
|
fp = len(filtered) - tp
|
|
fn = gt_count - tp
|
|
|
|
p = tp / (tp + fp) if (tp + fp) > 0 else 0.
|
|
r = tp / (tp + fn) if (tp + fn) > 0 else 0.
|
|
f1 = 2 * p * r / (p + r) if (p + r) > 0 else 0.
|
|
return dict(precision=p, recall=r, f1_score=f1, tp=tp, fp=fp, fn=fn)
|
|
|
|
def get_overall_metrics(self, conf_threshold=0.5, z_range=None, x_range=None):
|
|
"""Aggregate Precision/Recall/F1 across all classes."""
|
|
total_tp = total_fp = total_fn = 0
|
|
for c in range(self.num_classes):
|
|
m = self.get_class_metrics(c, conf_threshold, z_range=z_range, x_range=x_range)
|
|
total_tp += m['tp']
|
|
total_fp += m['fp']
|
|
total_fn += m['fn']
|
|
|
|
p = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0.
|
|
r = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0.
|
|
f1 = 2 * p * r / (p + r) if (p + r) > 0 else 0.
|
|
return dict(precision=p, recall=r, f1_score=f1,
|
|
tp=total_tp, fp=total_fp, fn=total_fn)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Summary builder
|
|
# ------------------------------------------------------------------
|
|
|
|
def _build_dist_table(self, conf_threshold, ap_method, x_range=None):
|
|
"""Build {class_name: {range_key: metrics_dict}} for given x_range filter.
|
|
|
|
Args:
|
|
x_range : None -> all lateral positions (full-lateral view)
|
|
tuple -> restricted lateral ROI
|
|
"""
|
|
from .parser import GroundTruthParser
|
|
|
|
table = {}
|
|
for class_id in range(self.num_classes):
|
|
class_name = GroundTruthParser.CLASS_NAMES.get(class_id, f"class_{class_id}")
|
|
|
|
# Skip classes with no 3D coordinates
|
|
if not any(z is not None for z, _ in self.all_gt_coords.get(class_id, [])):
|
|
continue
|
|
|
|
rows = {}
|
|
for r in self.distance_ranges:
|
|
z_range = (r[0], r[1])
|
|
range_key = f"{r[0]}-{r[1]}m"
|
|
|
|
gt_count = self._gt_count(class_id, z_range=z_range, x_range=x_range)
|
|
metrics = self.get_class_metrics(
|
|
class_id, conf_threshold, z_range=z_range, x_range=x_range)
|
|
ap = self.compute_ap(
|
|
class_id, ap_method, z_range=z_range, x_range=x_range)
|
|
|
|
rows[range_key] = dict(
|
|
precision=metrics['precision'],
|
|
recall=metrics['recall'],
|
|
f1_score=metrics['f1_score'],
|
|
ap=ap,
|
|
num_gt=gt_count,
|
|
tp=metrics['tp'],
|
|
fp=metrics['fp'],
|
|
fn=metrics['fn'],
|
|
)
|
|
table[class_name] = rows
|
|
return table
|
|
|
|
def get_summary(self, conf_threshold=0.5, ap_method='voc2010'):
|
|
"""Return the complete evaluation summary dict.
|
|
|
|
Keys always present:
|
|
per_class - overall per-class metrics (all objects, no spatial filter)
|
|
overall - micro-aggregated overall metrics
|
|
|
|
Keys present when ``distance_ranges`` is configured:
|
|
per_class_by_distance - longitudinal bins, all lateral positions
|
|
per_class_by_distance_lat_roi - longitudinal bins inside lateral_roi
|
|
(only when ``lateral_roi`` is also set)
|
|
lateral_roi - the configured [x_min, x_max] value
|
|
"""
|
|
from .parser import GroundTruthParser
|
|
|
|
summary = {'per_class': {}, 'overall': {}}
|
|
|
|
# Per-class overall (no spatial filter)
|
|
for class_id in range(self.num_classes):
|
|
class_name = GroundTruthParser.CLASS_NAMES.get(class_id, f"class_{class_id}")
|
|
metrics = self.get_class_metrics(class_id, conf_threshold)
|
|
ap = self.compute_ap(class_id, ap_method)
|
|
summary['per_class'][class_name] = dict(
|
|
precision=metrics['precision'],
|
|
recall=metrics['recall'],
|
|
f1_score=metrics['f1_score'],
|
|
ap=ap,
|
|
num_gt=self._gt_count(class_id),
|
|
num_det=len(self.all_detections.get(class_id, [])),
|
|
tp=metrics['tp'],
|
|
fp=metrics['fp'],
|
|
fn=metrics['fn'],
|
|
)
|
|
|
|
# Overall aggregated
|
|
overall = self.get_overall_metrics(conf_threshold)
|
|
summary['overall'] = dict(
|
|
precision=overall['precision'],
|
|
recall=overall['recall'],
|
|
f1_score=overall['f1_score'],
|
|
map=self.compute_map(ap_method),
|
|
num_classes=self.num_classes,
|
|
tp=overall['tp'],
|
|
fp=overall['fp'],
|
|
fn=overall['fn'],
|
|
)
|
|
|
|
if self.distance_ranges:
|
|
# View 1: all lateral positions + longitudinal bins
|
|
summary['per_class_by_distance'] = self._build_dist_table(
|
|
conf_threshold, ap_method, x_range=None)
|
|
|
|
# View 2: lateral ROI + longitudinal bins
|
|
if self.lateral_roi is not None:
|
|
x_range = (self.lateral_roi[0], self.lateral_roi[1])
|
|
summary['per_class_by_distance_lat_roi'] = self._build_dist_table(
|
|
conf_threshold, ap_method, x_range=x_range)
|
|
summary['lateral_roi'] = self.lateral_roi
|
|
|
|
return summary
|