Files
yolov26_3d/tools/temporal_analysis/evaluate_temporal_stability.py
2026-06-24 09:35:46 +08:00

1357 lines
54 KiB
Python
Executable File

"""
Evaluate temporal stability of 3D predictions across tracked objects.
Usage:
python evaluate_temporal_stability.py --input runs/val_viz/exp/tracking.json --output runs/val_viz/exp/stability_report.json
"""
import argparse
import csv
import json
from pathlib import Path
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
def compute_angle_diff(angle1, angle2):
"""Compute the smallest difference between two angles in radians.
Args:
angle1, angle2: Angles in radians
Returns:
Angle difference in range [-pi, pi]
"""
diff = angle1 - angle2
while diff > np.pi:
diff -= 2 * np.pi
while diff < -np.pi:
diff += 2 * np.pi
return diff
def safe_int(value):
"""Best-effort integer conversion."""
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def safe_float(value):
"""Best-effort float conversion."""
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _extract_list_xyz(values):
"""Extract xyz from a list-like payload."""
if not isinstance(values, (list, tuple)) or len(values) < 3:
return None
xyz = [safe_float(value) for value in values[:3]]
if any(value is None for value in xyz):
return None
return tuple(float(value) for value in xyz)
def _extract_object_rotation(value, *, assume_degrees=False):
"""Extract yaw / heading from a detection object payload."""
if value is None:
return None
if isinstance(value, dict):
rotation_y = value.get('rotation_y')
numeric = safe_float(rotation_y)
elif isinstance(value, (list, tuple)) and len(value) >= 7:
numeric = safe_float(value[6])
else:
return None
if numeric is None:
return None
if assume_degrees:
return float(np.radians(numeric))
return numeric
def _get_dimensions(det, prefer_ego=False):
"""Extract object dimensions [l, h, w] from a detection dict."""
object_keys = ('object_3d_ego', 'object_3d') if prefer_ego else ('object_3d', 'object_3d_ego')
for key in object_keys:
object_value = det.get(key)
if object_value is None:
continue
if isinstance(object_value, dict):
dims = object_value.get('dimensions')
if isinstance(dims, (list, tuple)) and len(dims) >= 3:
values = [safe_float(item) for item in dims[:3]]
if all(value is not None for value in values):
return tuple(float(value) for value in values)
elif isinstance(object_value, (list, tuple)) and len(object_value) >= 6:
values = [safe_float(item) for item in object_value[3:6]]
if all(value is not None for value in values):
return tuple(float(value) for value in values)
return None
def _get_position_observation(det, prefer_ego=True):
"""Extract the preferred observation position and the source name."""
if prefer_ego:
box_center_3d_ego = det.get('box_center_3d_ego')
if box_center_3d_ego is not None:
center = _extract_list_xyz(box_center_3d_ego)
if center is not None:
return center, 'box_center_3d_ego'
object_3d_ego = _extract_list_xyz(det.get('object_3d_ego'))
if object_3d_ego is not None:
return object_3d_ego, 'object_3d_ego'
box_center_3d = det.get('box_center_3d')
if box_center_3d is not None:
center = _extract_list_xyz(box_center_3d)
if center is not None:
return center, 'box_center_3d'
object_3d = det.get('object_3d')
if isinstance(object_3d, dict):
center = _extract_list_xyz(object_3d.get('location'))
if center is not None:
return center, 'object_3d.location'
else:
center = _extract_list_xyz(object_3d)
if center is not None:
return center, 'object_3d'
if not prefer_ego:
box_center_3d_ego = det.get('box_center_3d_ego')
if box_center_3d_ego is not None:
center = _extract_list_xyz(box_center_3d_ego)
if center is not None:
return center, 'box_center_3d_ego'
object_3d_ego = _extract_list_xyz(det.get('object_3d_ego'))
if object_3d_ego is not None:
return object_3d_ego, 'object_3d_ego'
return None, None
def _get_heading_observation(det, prefer_ego=True, heading_source='auto'):
"""Extract the preferred heading value and the source name."""
heading_debug = det.get('heading_debug')
if heading_source == 'decoded':
if isinstance(heading_debug, dict):
decoded = safe_float(heading_debug.get('rot_y_decoded'))
if decoded is not None:
return decoded, 'heading_debug.rot_y_decoded'
return None, None
if heading_source == 'camera_reg':
rotation = _extract_object_rotation(det.get('object_3d'), assume_degrees=False)
return (rotation, 'object_3d') if rotation is not None else (None, None)
if heading_source == 'ego':
rotation = _extract_object_rotation(det.get('object_3d_ego'), assume_degrees=True)
return (rotation, 'object_3d_ego_deg_to_rad') if rotation is not None else (None, None)
if prefer_ego and isinstance(heading_debug, dict):
decoded = safe_float(heading_debug.get('rot_y_decoded'))
if decoded is not None:
return decoded, 'heading_debug.rot_y_decoded'
object_keys = ('object_3d_ego', 'object_3d') if prefer_ego else ('object_3d', 'object_3d_ego')
for key in object_keys:
rotation = _extract_object_rotation(
det.get(key),
assume_degrees=(key == 'object_3d_ego'),
)
if rotation is not None:
source_name = 'object_3d_ego_deg_to_rad' if key == 'object_3d_ego' else key
return rotation, source_name
if not prefer_ego and isinstance(heading_debug, dict):
decoded = safe_float(heading_debug.get('rot_y_decoded'))
if decoded is not None:
return decoded, 'heading_debug.rot_y_decoded'
return None, None
def _get_frame_id(det, fallback_frame_idx):
"""Extract frame id from a detection dict."""
for key in ('frameId', 'frame_id'):
frame_id = safe_int(det.get(key))
if frame_id is not None:
return frame_id
return fallback_frame_idx
def _get_timestamp(det):
"""Extract timestamp from a detection dict."""
timestamp = det.get('timestamp')
if timestamp is None:
return None
try:
return float(timestamp)
except (TypeError, ValueError):
return None
def extract_trajectories(tracking_data):
"""Extract trajectories for each tracked object, organized by class.
Args:
tracking_data: List of frame data from tracking.json
Returns:
Dictionary mapping class_id to dict of {track_id: list of (frame_idx, detection) tuples}
"""
trajectories_by_class = defaultdict(lambda: defaultdict(list))
for frame_idx, frame_data in enumerate(tracking_data):
for det in frame_data.get('detections', []):
track_id = det.get('track_id')
class_id = det.get('class_id')
has_temporal_signal = any(
key in det for key in ('object_3d', 'object_3d_ego', 'heading_debug')
)
if track_id is not None and class_id is not None and has_temporal_signal:
trajectories_by_class[class_id][track_id].append((frame_idx, det))
return trajectories_by_class
def _get_position(det):
"""Extract 3D position [x, y, z] from a detection dict.
Priority:
1. det['box_center_3d'] — 3D box center (preferred, more stable)
2. det['object_3d'][0:3] — face-center fallback
"""
if 'box_center_3d' in det and det['box_center_3d'] is not None:
c = det['box_center_3d']
return float(c[0]), float(c[1]), float(c[2])
object_3d = det.get('object_3d')
if object_3d is None:
return None
if isinstance(object_3d, dict):
loc = object_3d['location']
return float(loc[0]), float(loc[1]), float(loc[2])
return float(object_3d[0]), float(object_3d[1]), float(object_3d[2])
def compute_position_stability(trajectory):
"""Compute position stability metrics for a trajectory.
Args:
trajectory: List of (frame_idx, detection) tuples
Returns:
Dictionary with stability metrics
"""
if len(trajectory) < 2:
return None
positions = []
frame_indices = []
for frame_idx, det in trajectory:
position = _get_position(det)
if position is None:
continue
x, y, z = position
positions.append([x, y, z])
frame_indices.append(frame_idx)
if len(positions) < 2:
return None
positions = np.array(positions)
frame_indices = np.array(frame_indices)
# Compute frame-to-frame position differences
position_diffs = []
frame_gaps = []
for i in range(1, len(positions)):
diff = np.linalg.norm(positions[i] - positions[i-1])
gap = frame_indices[i] - frame_indices[i-1]
position_diffs.append(diff)
frame_gaps.append(gap)
position_diffs = np.array(position_diffs)
frame_gaps = np.array(frame_gaps)
# Normalize by frame gap (for non-consecutive frames)
position_diffs_per_frame = position_diffs / np.maximum(frame_gaps, 1)
# Compute velocities (m/frame)
velocities = []
for i in range(1, len(positions)):
gap = frame_indices[i] - frame_indices[i-1]
if gap > 0:
velocity = (positions[i] - positions[i-1]) / gap
velocities.append(velocity)
velocities = np.array(velocities)
# Compute accelerations (m/frame^2)
accelerations = []
if len(velocities) >= 2:
for i in range(1, len(velocities)):
accel = velocities[i] - velocities[i-1]
accelerations.append(np.linalg.norm(accel))
accelerations = np.array(accelerations)
metrics = {
'position_jitter_mean': float(np.mean(position_diffs_per_frame)),
'position_jitter_std': float(np.std(position_diffs_per_frame)),
'position_jitter_max': float(np.max(position_diffs_per_frame)),
'velocity_mean': float(np.mean(np.linalg.norm(velocities, axis=1))) if len(velocities) > 0 else 0.0,
'velocity_std': float(np.std(np.linalg.norm(velocities, axis=1))) if len(velocities) > 0 else 0.0,
'acceleration_mean': float(np.mean(accelerations)) if len(accelerations) > 0 else 0.0,
'acceleration_std': float(np.std(accelerations)) if len(accelerations) > 0 else 0.0,
'trajectory_length': len(trajectory),
'total_distance': float(np.sum(position_diffs)),
}
return metrics
def compute_dimension_stability(trajectory):
"""Compute dimension stability metrics for a trajectory.
Args:
trajectory: List of (frame_idx, detection) tuples
Returns:
Dictionary with stability metrics
"""
if len(trajectory) < 2:
return None
dimensions = []
for frame_idx, det in trajectory:
dims = _get_dimensions(det, prefer_ego=False)
if dims is None:
continue
dimensions.append(list(dims))
if len(dimensions) < 2:
return None
dimensions = np.array(dimensions)
# Compute frame-to-frame dimension differences
dim_diffs = []
for i in range(1, len(dimensions)):
diff = np.abs(dimensions[i] - dimensions[i-1])
dim_diffs.append(diff)
dim_diffs = np.array(dim_diffs)
# Compute relative changes (percentage)
dim_relative_changes = []
for i in range(1, len(dimensions)):
relative_change = np.abs(dimensions[i] - dimensions[i-1]) / (dimensions[i-1] + 1e-6)
dim_relative_changes.append(relative_change)
dim_relative_changes = np.array(dim_relative_changes)
metrics = {
'dimension_jitter_mean': {
'length': float(np.mean(dim_diffs[:, 0])),
'height': float(np.mean(dim_diffs[:, 1])),
'width': float(np.mean(dim_diffs[:, 2])),
},
'dimension_jitter_std': {
'length': float(np.std(dim_diffs[:, 0])),
'height': float(np.std(dim_diffs[:, 1])),
'width': float(np.std(dim_diffs[:, 2])),
},
'dimension_relative_change_mean': {
'length': float(np.mean(dim_relative_changes[:, 0])),
'height': float(np.mean(dim_relative_changes[:, 1])),
'width': float(np.mean(dim_relative_changes[:, 2])),
},
'dimension_mean': {
'length': float(np.mean(dimensions[:, 0])),
'height': float(np.mean(dimensions[:, 1])),
'width': float(np.mean(dimensions[:, 2])),
},
'dimension_std': {
'length': float(np.std(dimensions[:, 0])),
'height': float(np.std(dimensions[:, 1])),
'width': float(np.std(dimensions[:, 2])),
},
}
return metrics
def compute_rotation_stability(trajectory, heading_source='auto'):
"""Compute rotation stability metrics for a trajectory.
Args:
trajectory: List of (frame_idx, detection) tuples
Returns:
Dictionary with stability metrics
"""
if len(trajectory) < 2:
return None
rotations = []
for frame_idx, det in trajectory:
rot_y, _ = _get_heading_observation(det, prefer_ego=False, heading_source=heading_source)
if rot_y is None:
continue
rotations.append(rot_y)
if len(rotations) < 2:
return None
rotations = np.array(rotations)
# Compute frame-to-frame rotation differences
rot_diffs = []
for i in range(1, len(rotations)):
diff = compute_angle_diff(rotations[i], rotations[i-1])
rot_diffs.append(abs(diff))
rot_diffs = np.array(rot_diffs)
metrics = {
'rotation_jitter_mean': float(np.mean(rot_diffs)),
'rotation_jitter_std': float(np.std(rot_diffs)),
'rotation_jitter_max': float(np.max(rot_diffs)),
'rotation_mean': float(np.mean(rotations)),
'rotation_std': float(np.std(rotations)),
}
return metrics
def evaluate_trajectory(track_id, trajectory, class_id, heading_source='auto'):
"""Evaluate all stability metrics for a single trajectory.
Args:
track_id: Track ID
trajectory: List of (frame_idx, detection) tuples
class_id: Object class ID
Returns:
Dictionary with all stability metrics
"""
if len(trajectory) < 2:
return None
position_metrics = compute_position_stability(trajectory)
dimension_metrics = compute_dimension_stability(trajectory)
rotation_metrics = compute_rotation_stability(trajectory, heading_source=heading_source)
if position_metrics is None or dimension_metrics is None or rotation_metrics is None:
return None
frame_indices = [frame_idx for frame_idx, _ in trajectory]
result = {
'track_id': track_id,
'class_id': class_id,
'trajectory_length': len(trajectory),
'frame_start': min(frame_indices),
'frame_end': max(frame_indices),
'frame_span': max(frame_indices) - min(frame_indices) + 1,
'position_stability': position_metrics,
'dimension_stability': dimension_metrics,
'rotation_stability': rotation_metrics,
}
return result
def compute_aggregate_statistics(trajectory_metrics, by_class=True):
"""Compute aggregate statistics across all trajectories.
Args:
trajectory_metrics: List of trajectory metric dictionaries
by_class: Whether to compute statistics per class
Returns:
Dictionary with aggregate statistics
"""
if not trajectory_metrics:
return {}
# Group by class
metrics_by_class = defaultdict(list)
for metric in trajectory_metrics:
metrics_by_class[metric['class_id']].append(metric)
aggregate = {}
if by_class:
for class_id, metrics in metrics_by_class.items():
aggregate[f'class_{class_id}'] = _compute_class_statistics(metrics)
# Overall statistics
aggregate['overall'] = _compute_class_statistics(trajectory_metrics)
return aggregate
def _compute_class_statistics(metrics):
"""Compute statistics for a list of trajectory metrics."""
if not metrics:
return {}
stats = {
'num_trajectories': len(metrics),
'trajectory_length': {
'mean': float(np.mean([m['trajectory_length'] for m in metrics])),
'std': float(np.std([m['trajectory_length'] for m in metrics])),
'min': int(np.min([m['trajectory_length'] for m in metrics])),
'max': int(np.max([m['trajectory_length'] for m in metrics])),
},
'position_jitter': {
'mean': float(np.mean([m['position_stability']['position_jitter_mean'] for m in metrics])),
'std': float(np.std([m['position_stability']['position_jitter_mean'] for m in metrics])),
'max': float(np.max([m['position_stability']['position_jitter_max'] for m in metrics])),
},
'velocity': {
'mean': float(np.mean([m['position_stability']['velocity_mean'] for m in metrics])),
'std': float(np.std([m['position_stability']['velocity_mean'] for m in metrics])),
},
'acceleration': {
'mean': float(np.mean([m['position_stability']['acceleration_mean'] for m in metrics])),
'std': float(np.std([m['position_stability']['acceleration_mean'] for m in metrics])),
},
'dimension_jitter': {
'length_mean': float(np.mean([m['dimension_stability']['dimension_jitter_mean']['length'] for m in metrics])),
'height_mean': float(np.mean([m['dimension_stability']['dimension_jitter_mean']['height'] for m in metrics])),
'width_mean': float(np.mean([m['dimension_stability']['dimension_jitter_mean']['width'] for m in metrics])),
},
'dimension_relative_change': {
'length_mean': float(np.mean([m['dimension_stability']['dimension_relative_change_mean']['length'] for m in metrics])),
'height_mean': float(np.mean([m['dimension_stability']['dimension_relative_change_mean']['height'] for m in metrics])),
'width_mean': float(np.mean([m['dimension_stability']['dimension_relative_change_mean']['width'] for m in metrics])),
},
'rotation_jitter': {
'mean': float(np.mean([m['rotation_stability']['rotation_jitter_mean'] for m in metrics])),
'std': float(np.std([m['rotation_stability']['rotation_jitter_mean'] for m in metrics])),
'max': float(np.max([m['rotation_stability']['rotation_jitter_max'] for m in metrics])),
},
}
return stats
def build_track_observation_series(track_id, trajectory, prefer_ego=True, heading_source='auto'):
"""Build per-frame observation records for one track."""
series = []
previous_heading = None
previous_heading_unwrapped = None
for frame_idx, det in trajectory:
frame_id = _get_frame_id(det, frame_idx)
timestamp = _get_timestamp(det)
position, position_source = _get_position_observation(det, prefer_ego=prefer_ego)
heading_rad, heading_source_name = _get_heading_observation(
det,
prefer_ego=prefer_ego,
heading_source=heading_source,
)
x_value = y_value = z_value = None
range_3d = None
range_xz = None
if position is not None:
x_value, y_value, z_value = position
range_3d = float(np.linalg.norm(np.array([x_value, y_value, z_value], dtype=np.float64)))
range_xz = float(np.linalg.norm(np.array([x_value, z_value], dtype=np.float64)))
heading_delta_rad = None
heading_unwrapped_rad = None
if heading_rad is not None and previous_heading is not None:
heading_delta_rad = float(compute_angle_diff(heading_rad, previous_heading))
if previous_heading_unwrapped is not None:
heading_unwrapped_rad = float(previous_heading_unwrapped + heading_delta_rad)
if heading_rad is not None:
if heading_unwrapped_rad is None:
heading_unwrapped_rad = float(heading_rad)
previous_heading = heading_rad
previous_heading_unwrapped = heading_unwrapped_rad
series.append({
'track_id': track_id,
'class_id': det.get('class_id'),
'frame_idx': frame_idx,
'frame_id': frame_id,
'timestamp': timestamp,
'confidence': safe_float(det.get('confidence')),
'type_name': det.get('type_name'),
'position_source': position_source,
'heading_source': heading_source_name,
'x_ego': x_value,
'y_ego': y_value,
'z_ego': z_value,
'range_3d': range_3d,
'range_xz': range_xz,
'heading_rad': heading_rad,
'heading_deg': None if heading_rad is None else float(np.degrees(heading_rad)),
'heading_unwrapped_rad': heading_unwrapped_rad,
'heading_unwrapped_deg': None if heading_unwrapped_rad is None else float(np.degrees(heading_unwrapped_rad)),
'heading_delta_rad': heading_delta_rad,
'heading_delta_deg': None if heading_delta_rad is None else float(np.degrees(heading_delta_rad)),
})
return series
def summarize_track_observation(track_id, class_id, series):
"""Summarize one track's distance / heading observation series."""
valid_range_xz = [item['range_xz'] for item in series if item.get('range_xz') is not None]
valid_heading_deg = [item['heading_deg'] for item in series if item.get('heading_deg') is not None]
valid_heading_delta_deg = [
abs(item['heading_delta_deg']) for item in series if item.get('heading_delta_deg') is not None
]
valid_timestamps = [item['timestamp'] for item in series if item.get('timestamp') is not None]
frame_ids = [item['frame_id'] for item in series if item.get('frame_id') is not None]
frame_indices = [item['frame_idx'] for item in series]
position_sources = sorted({item['position_source'] for item in series if item.get('position_source')})
heading_sources = sorted({item['heading_source'] for item in series if item.get('heading_source')})
summary = {
'track_id': track_id,
'class_id': class_id,
'num_samples': len(series),
'frame_idx_start': min(frame_indices) if frame_indices else None,
'frame_idx_end': max(frame_indices) if frame_indices else None,
'frame_id_start': min(frame_ids) if frame_ids else None,
'frame_id_end': max(frame_ids) if frame_ids else None,
'timestamp_start': min(valid_timestamps) if valid_timestamps else None,
'timestamp_end': max(valid_timestamps) if valid_timestamps else None,
'position_sources': position_sources,
'heading_sources': heading_sources,
'range_xz': {
'min': float(np.min(valid_range_xz)) if valid_range_xz else None,
'max': float(np.max(valid_range_xz)) if valid_range_xz else None,
'mean': float(np.mean(valid_range_xz)) if valid_range_xz else None,
'std': float(np.std(valid_range_xz)) if valid_range_xz else None,
},
'heading_deg': {
'min': float(np.min(valid_heading_deg)) if valid_heading_deg else None,
'max': float(np.max(valid_heading_deg)) if valid_heading_deg else None,
'mean': float(np.mean(valid_heading_deg)) if valid_heading_deg else None,
'std': float(np.std(valid_heading_deg)) if valid_heading_deg else None,
},
'heading_delta_deg': {
'mean_abs': float(np.mean(valid_heading_delta_deg)) if valid_heading_delta_deg else None,
'max_abs': float(np.max(valid_heading_delta_deg)) if valid_heading_delta_deg else None,
},
}
return summary
def filter_trajectory_by_frame_id_range(trajectory, frame_id_start=None, frame_id_end=None):
"""Filter one trajectory by inclusive frame_id range."""
if frame_id_start is None and frame_id_end is None:
return list(trajectory)
filtered = []
for frame_idx, det in trajectory:
frame_id = _get_frame_id(det, frame_idx)
if frame_id_start is not None and frame_id < frame_id_start:
continue
if frame_id_end is not None and frame_id > frame_id_end:
continue
filtered.append((frame_idx, det))
return filtered
def export_track_series_json(series, output_path):
"""Write one track's observation series to JSON."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open('w', encoding='utf-8') as file:
json.dump(series, file, indent=2, ensure_ascii=False)
def export_track_series_csv(series, output_path):
"""Write one track's observation series to CSV."""
output_path.parent.mkdir(parents=True, exist_ok=True)
fieldnames = [
'track_id',
'class_id',
'frame_idx',
'frame_id',
'timestamp',
'confidence',
'type_name',
'position_source',
'heading_source',
'x_ego',
'y_ego',
'z_ego',
'range_3d',
'range_xz',
'heading_rad',
'heading_deg',
'heading_unwrapped_rad',
'heading_unwrapped_deg',
'heading_delta_rad',
'heading_delta_deg',
]
with output_path.open('w', encoding='utf-8', newline='') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
for row in series:
writer.writerow(row)
def _select_x_value(item, x_axis):
"""Pick x-axis values for a track observation row."""
if x_axis == 'timestamp' and item.get('timestamp') is not None:
return item['timestamp']
if x_axis == 'frame_id' and item.get('frame_id') is not None:
return item['frame_id']
return item['frame_idx']
def _collect_plot_xy(series, y_key, x_axis):
"""Collect aligned x/y arrays for plotting one series field."""
x_values = []
y_values = []
for item in series:
y_value = item.get(y_key)
if y_value is None:
continue
x_values.append(_select_x_value(item, x_axis))
y_values.append(y_value)
return x_values, y_values
def plot_focus_track_series(track_id, class_id, series, output_path, x_axis='frame'):
"""Plot one track's distance and heading observation series."""
output_path.parent.mkdir(parents=True, exist_ok=True)
fig, axes = plt.subplots(3, 2, figsize=(16, 12))
if x_axis == 'timestamp':
x_label = 'Timestamp'
elif x_axis == 'frame_id':
x_label = 'Frame ID'
else:
x_label = 'Frame Index'
range_x, range_xz = _collect_plot_xy(series, 'range_xz', x_axis)
range3_x, range3 = _collect_plot_xy(series, 'range_3d', x_axis)
if range_xz:
axes[0, 0].plot(range_x, range_xz, marker='o', markersize=3, label='range_xz', alpha=0.8)
if range3:
axes[0, 0].plot(range3_x, range3, marker='o', markersize=3, label='range_3d', alpha=0.6)
axes[0, 0].set_title('Distance Over Time')
axes[0, 0].set_xlabel(x_label)
axes[0, 0].set_ylabel('Distance (m)')
axes[0, 0].grid(True, alpha=0.3)
if range_xz or range3:
axes[0, 0].legend()
ego_axis_specs = (
('x_ego', 'tab:blue', axes[0, 1], 'Ego X Position', 'X Position (m)'),
('y_ego', 'tab:orange', axes[1, 1], 'Ego Y Position', 'Y Position (m)'),
('z_ego', 'tab:green', axes[2, 1], 'Ego Z Position', 'Z Position (m)'),
)
for axis_name, color, axis, title, y_label in ego_axis_specs:
x_values, y_values = _collect_plot_xy(series, axis_name, x_axis)
if y_values:
axis.plot(x_values, y_values, marker='o', markersize=3, label=axis_name, color=color, alpha=0.8)
axis.set_title(title)
axis.set_xlabel(x_label)
axis.set_ylabel(y_label)
axis.grid(True, alpha=0.3)
if axis.lines:
axis.legend()
heading_x, heading_deg = _collect_plot_xy(series, 'heading_unwrapped_deg', x_axis)
if not heading_deg:
heading_x, heading_deg = _collect_plot_xy(series, 'heading_deg', x_axis)
if heading_deg:
axes[1, 0].plot(heading_x, heading_deg, marker='o', markersize=3, color='tab:red', alpha=0.8)
axes[1, 0].set_title('Heading Over Time')
axes[1, 0].set_xlabel(x_label)
axes[1, 0].set_ylabel('Heading (deg)')
axes[1, 0].grid(True, alpha=0.3)
axes[1, 0].axhline(y=0.0, color='k', linestyle='--', alpha=0.2)
delta_x, heading_delta_deg = _collect_plot_xy(series, 'heading_delta_deg', x_axis)
if heading_delta_deg:
axes[2, 0].plot(delta_x, heading_delta_deg, marker='o', markersize=3, color='tab:purple', alpha=0.8)
axes[2, 0].set_title('Frame-to-Frame Heading Delta')
axes[2, 0].set_xlabel(x_label)
axes[2, 0].set_ylabel('Heading Delta (deg)')
axes[2, 0].grid(True, alpha=0.3)
axes[2, 0].axhline(y=0.0, color='k', linestyle='--', alpha=0.2)
fig.suptitle(f'Track {track_id} Class {class_id}: Distance and Heading Observation', fontsize=14)
plt.tight_layout(rect=[0, 0, 1, 0.97])
plt.savefig(output_path, dpi=150)
plt.close()
def export_focus_track_artifacts(
trajectories,
output_base_dir,
*,
prefer_ego=True,
x_axis='frame',
export_series=False,
focus_track_plots=False,
frame_id_start=None,
frame_id_end=None,
heading_source='auto',
):
"""Export per-track observation summaries, series, and plots."""
output_base_dir = Path(output_base_dir)
output_base_dir.mkdir(parents=True, exist_ok=True)
observation_summaries = []
for track_id, trajectory in sorted(trajectories.items()):
if not trajectory:
continue
class_id = trajectory[0][1].get('class_id')
series = build_track_observation_series(
track_id,
trajectory,
prefer_ego=prefer_ego,
heading_source=heading_source,
)
track_dir = output_base_dir / f'track_{track_id}'
track_dir.mkdir(parents=True, exist_ok=True)
summary = summarize_track_observation(track_id, class_id, series)
summary['requested_frame_id_start'] = frame_id_start
summary['requested_frame_id_end'] = frame_id_end
summary['requested_heading_source'] = heading_source
summary_path = track_dir / f'track_{track_id}_summary.json'
with summary_path.open('w', encoding='utf-8') as file:
json.dump(summary, file, indent=2, ensure_ascii=False)
if export_series:
export_track_series_json(series, track_dir / f'track_{track_id}_series.json')
export_track_series_csv(series, track_dir / f'track_{track_id}_series.csv')
if focus_track_plots:
plot_focus_track_series(
track_id,
class_id,
series,
track_dir / f'track_{track_id}_distance_heading.png',
x_axis=x_axis,
)
observation_summaries.append(summary)
return observation_summaries
def plot_temporal_distributions(trajectories, output_dir, max_tracks_per_class=5):
"""Generate temporal distribution plots for 3D metrics.
Args:
trajectories: Dictionary mapping track_id to list of (frame_idx, detection) tuples
output_dir: Output directory for plots
max_tracks_per_class: Maximum number of tracks to plot per class
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Group trajectories by class
trajectories_by_class = defaultdict(list)
for track_id, trajectory in trajectories.items():
if len(trajectory) >= 3: # Only plot tracks with sufficient length
class_id = trajectory[0][1]['class_id']
trajectories_by_class[class_id].append((track_id, trajectory))
for class_id, class_trajectories in sorted(trajectories_by_class.items()):
# Sort by trajectory length and select longest ones
class_trajectories.sort(key=lambda x: len(x[1]), reverse=True)
selected_trajectories = class_trajectories[:max_tracks_per_class]
# Plot 1: Position (x, y, z) over time
fig, axes = plt.subplots(3, 1, figsize=(12, 10))
for track_id, trajectory in selected_trajectories:
frames = []
positions = []
for frame_idx, det in trajectory:
position = _get_position(det)
if position is None:
continue
x, y, z = position
frames.append(frame_idx)
positions.append([x, y, z])
if not positions:
continue
positions = np.array(positions)
axes[0].plot(frames, positions[:, 0], marker='o', markersize=3, label=f'Track {track_id}', alpha=0.7)
axes[1].plot(frames, positions[:, 1], marker='o', markersize=3, label=f'Track {track_id}', alpha=0.7)
axes[2].plot(frames, positions[:, 2], marker='o', markersize=3, label=f'Track {track_id}', alpha=0.7)
axes[0].set_ylabel('X Position (m)')
axes[0].set_title(f'Class {class_id}: Position Temporal Distribution')
axes[0].legend(fontsize=8)
axes[0].grid(True, alpha=0.3)
axes[1].set_ylabel('Y Position (m)')
axes[1].legend(fontsize=8)
axes[1].grid(True, alpha=0.3)
axes[2].set_ylabel('Z Position (m)')
axes[2].set_xlabel('Frame Index')
axes[2].legend(fontsize=8)
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_dir / f'class_{class_id}_position_temporal.png', dpi=150)
plt.close()
# Plot 2: Dimensions (l, h, w) over time
fig, axes = plt.subplots(3, 1, figsize=(12, 10))
for track_id, trajectory in selected_trajectories:
frames = [frame_idx for frame_idx, _ in trajectory]
dimensions = []
for _, det in trajectory:
dims = _get_dimensions(det, prefer_ego=False)
if dims is None:
continue
dimensions.append(dims)
if len(dimensions) != len(frames):
frames = [frame_idx for frame_idx, det in trajectory if _get_dimensions(det, prefer_ego=False) is not None]
if not dimensions:
continue
dimensions = np.array(dimensions)
axes[0].plot(frames, dimensions[:, 0], marker='o', markersize=3, label=f'Track {track_id}', alpha=0.7)
axes[1].plot(frames, dimensions[:, 1], marker='o', markersize=3, label=f'Track {track_id}', alpha=0.7)
axes[2].plot(frames, dimensions[:, 2], marker='o', markersize=3, label=f'Track {track_id}', alpha=0.7)
axes[0].set_ylabel('Length (m)')
axes[0].set_title(f'Class {class_id}: Dimension Temporal Distribution')
axes[0].legend(fontsize=8)
axes[0].grid(True, alpha=0.3)
axes[1].set_ylabel('Height (m)')
axes[1].legend(fontsize=8)
axes[1].grid(True, alpha=0.3)
axes[2].set_ylabel('Width (m)')
axes[2].set_xlabel('Frame Index')
axes[2].legend(fontsize=8)
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_dir / f'class_{class_id}_dimension_temporal.png', dpi=150)
plt.close()
# Plot 3: Rotation (rot_y) over time
fig, ax = plt.subplots(figsize=(12, 5))
for track_id, trajectory in selected_trajectories:
frames = [frame_idx for frame_idx, _ in trajectory]
rotations = []
valid_frames = []
for frame_idx, det in trajectory:
rotation, _ = _get_heading_observation(det, prefer_ego=False)
if rotation is None:
continue
valid_frames.append(frame_idx)
rotations.append(rotation)
frames = valid_frames
if not rotations:
continue
rotations = np.array(rotations)
ax.plot(frames, rotations, marker='o', markersize=3, label=f'Track {track_id}', alpha=0.7)
ax.set_ylabel('Rotation Y (rad)')
ax.set_xlabel('Frame Index')
ax.set_title(f'Class {class_id}: Rotation Temporal Distribution')
ax.legend(fontsize=8)
ax.grid(True, alpha=0.3)
ax.axhline(y=0, color='k', linestyle='--', alpha=0.3)
ax.axhline(y=np.pi, color='k', linestyle='--', alpha=0.3)
ax.axhline(y=-np.pi, color='k', linestyle='--', alpha=0.3)
plt.tight_layout()
plt.savefig(output_dir / f'class_{class_id}_rotation_temporal.png', dpi=150)
plt.close()
# Plot 4: 3D trajectory (bird's eye view)
fig, ax = plt.subplots(figsize=(10, 10))
for track_id, trajectory in selected_trajectories:
positions = []
for _, det in trajectory:
position = _get_position(det)
if position is None:
continue
x, _, z = position
positions.append([x, z])
if not positions:
continue
positions = np.array(positions)
# Plot trajectory path
ax.plot(positions[:, 0], positions[:, 1], marker='o', markersize=4,
label=f'Track {track_id}', alpha=0.7, linewidth=2)
# Mark start and end
ax.scatter(positions[0, 0], positions[0, 1], s=100, marker='s',
edgecolors='black', linewidths=2, zorder=5)
ax.scatter(positions[-1, 0], positions[-1, 1], s=100, marker='^',
edgecolors='black', linewidths=2, zorder=5)
ax.set_xlabel('X Position (m)')
ax.set_ylabel('Z Position (m)')
ax.set_title(f'Class {class_id}: 3D Trajectories (Bird\'s Eye View)')
ax.legend(fontsize=8)
ax.grid(True, alpha=0.3)
ax.axis('equal')
plt.tight_layout()
plt.savefig(output_dir / f'class_{class_id}_trajectory_birds_eye.png', dpi=150)
plt.close()
print(f"Temporal distribution plots for class {class_id} saved")
print(f"All temporal distribution plots saved to {output_dir}")
def plot_stability_metrics(trajectory_metrics, output_dir):
"""Generate visualization plots for stability metrics.
Args:
trajectory_metrics: List of trajectory metric dictionaries
output_dir: Output directory for plots
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Group by class
metrics_by_class = defaultdict(list)
for metric in trajectory_metrics:
metrics_by_class[metric['class_id']].append(metric)
# Plot 1: Position jitter distribution by class
fig, ax = plt.subplots(figsize=(10, 6))
for class_id, metrics in sorted(metrics_by_class.items()):
jitters = [m['position_stability']['position_jitter_mean'] for m in metrics]
ax.hist(jitters, bins=30, alpha=0.5, label=f'Class {class_id}')
ax.set_xlabel('Position Jitter (m/frame)')
ax.set_ylabel('Number of Trajectories')
ax.set_title('Position Jitter Distribution by Class')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_dir / 'position_jitter_distribution.png', dpi=150)
plt.close()
# Plot 2: Dimension jitter by component
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
components = ['length', 'height', 'width']
for idx, comp in enumerate(components):
for class_id, metrics in sorted(metrics_by_class.items()):
jitters = [m['dimension_stability']['dimension_jitter_mean'][comp] for m in metrics]
axes[idx].hist(jitters, bins=30, alpha=0.5, label=f'Class {class_id}')
axes[idx].set_xlabel(f'{comp.capitalize()} Jitter (m/frame)')
axes[idx].set_ylabel('Number of Trajectories')
axes[idx].set_title(f'{comp.capitalize()} Stability')
axes[idx].legend()
axes[idx].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_dir / 'dimension_jitter_distribution.png', dpi=150)
plt.close()
# Plot 3: Rotation jitter distribution
fig, ax = plt.subplots(figsize=(10, 6))
for class_id, metrics in sorted(metrics_by_class.items()):
jitters = [m['rotation_stability']['rotation_jitter_mean'] for m in metrics]
ax.hist(jitters, bins=30, alpha=0.5, label=f'Class {class_id}')
ax.set_xlabel('Rotation Jitter (rad/frame)')
ax.set_ylabel('Number of Trajectories')
ax.set_title('Rotation Jitter Distribution by Class')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_dir / 'rotation_jitter_distribution.png', dpi=150)
plt.close()
# Plot 4: Velocity distribution
fig, ax = plt.subplots(figsize=(10, 6))
for class_id, metrics in sorted(metrics_by_class.items()):
velocities = [m['position_stability']['velocity_mean'] for m in metrics]
ax.hist(velocities, bins=30, alpha=0.5, label=f'Class {class_id}')
ax.set_xlabel('Average Velocity (m/frame)')
ax.set_ylabel('Number of Trajectories')
ax.set_title('Average Velocity Distribution by Class')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_dir / 'velocity_distribution.png', dpi=150)
plt.close()
# Plot 5: Trajectory length distribution
fig, ax = plt.subplots(figsize=(10, 6))
for class_id, metrics in sorted(metrics_by_class.items()):
lengths = [m['trajectory_length'] for m in metrics]
ax.hist(lengths, bins=30, alpha=0.5, label=f'Class {class_id}')
ax.set_xlabel('Trajectory Length (frames)')
ax.set_ylabel('Number of Trajectories')
ax.set_title('Trajectory Length Distribution by Class')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(output_dir / 'trajectory_length_distribution.png', dpi=150)
plt.close()
print(f"Plots saved to {output_dir}")
def main():
parser = argparse.ArgumentParser(description='Evaluate temporal stability of 3D predictions')
parser.add_argument('--input', type=str, default="runs/val_viz/20251210222153/tracking.json", help='Input tracking.json file path')
parser.add_argument('--output', type=str, default='', help='Output stability report JSON file path')
parser.add_argument('--plots', action='store_true', help='Generate visualization plots')
parser.add_argument('--min-length', type=int, default=3, help='Minimum trajectory length to evaluate (default: 3)')
parser.add_argument('--track-ids', type=int, nargs='+', default=None,
help='Specific track IDs to evaluate (e.g., --track-ids 1 2 3). If not specified, evaluate all tracks.')
parser.add_argument('--class-id', type=int, default=None,
help='Filter by class ID (0=Car, 1=Pedestrian, 2=Cyclist, etc.). If not specified, all classes are included.')
parser.add_argument('--frame-id-start', type=int, default=None,
help='Inclusive frame_id lower bound for temporal observation.')
parser.add_argument('--frame-id-end', type=int, default=None,
help='Inclusive frame_id upper bound for temporal observation.')
parser.add_argument('--focus-track-plots', action='store_true',
help='Generate per-track distance and heading plots for the selected trajectories.')
parser.add_argument('--export-series', action='store_true',
help='Export per-track distance and heading series as JSON and CSV.')
parser.add_argument('--x-axis', choices=('frame', 'frame_id', 'timestamp'), default='frame_id',
help='X-axis used for focus-track plots (default: frame_id).')
parser.add_argument('--heading-source', choices=('auto', 'camera_reg', 'ego', 'decoded'), default='auto',
help='Heading source used for temporal observation (default: auto).')
parser.add_argument('--prefer-ego', dest='prefer_ego', action='store_true',
help='Prefer ego-frame position and heading signals for per-track observation export (default).')
parser.add_argument('--no-prefer-ego', dest='prefer_ego', action='store_false',
help='Use non-ego object_3d signals first for per-track observation export.')
parser.set_defaults(prefer_ego=True)
args = parser.parse_args()
if (
args.frame_id_start is not None
and args.frame_id_end is not None
and args.frame_id_start > args.frame_id_end
):
parser.error('--frame-id-start must be less than or equal to --frame-id-end')
# Read input tracking data
input_path = Path(args.input)
if not input_path.exists():
print(f"Error: Input file not found: {input_path}")
return
print(f"Loading tracking data from {input_path}")
with open(input_path, 'r', encoding='utf-8') as f:
tracking_data = json.load(f)
print(f"Loaded {len(tracking_data)} frames")
# Extract trajectories
print("Extracting trajectories...")
trajectories_by_class = extract_trajectories(tracking_data)
total_tracks = sum(len(tracks) for tracks in trajectories_by_class.values())
print(f"Found {total_tracks} unique tracks across {len(trajectories_by_class)} classes")
for class_id, tracks in sorted(trajectories_by_class.items()):
print(f" Class {class_id}: {len(tracks)} tracks")
# Filter by class ID first
if args.class_id is not None:
if args.class_id in trajectories_by_class:
trajectories = trajectories_by_class[args.class_id]
print(f"Filtered to class_id={args.class_id}: {len(trajectories)} tracks")
else:
print(f"Warning: No tracks found for class_id={args.class_id}")
trajectories = {}
else:
# Flatten all classes into single dict if no class filter specified
trajectories = {}
for class_tracks in trajectories_by_class.values():
trajectories.update(class_tracks)
print(f"Using all classes: {len(trajectories)} total tracks")
# Then filter by specified track IDs
if args.track_ids is not None:
specified_ids = set(args.track_ids)
original_count = len(trajectories)
trajectories = {track_id: traj for track_id, traj in trajectories.items()
if track_id in specified_ids}
missing_ids = specified_ids - set(trajectories.keys())
if missing_ids:
if args.class_id is not None:
print(f"Warning: Specified track IDs not found in class {args.class_id}: {sorted(missing_ids)}")
else:
print(f"Warning: Specified track IDs not found in data: {sorted(missing_ids)}")
print(f"Filtered to {len(trajectories)} specified tracks (out of {original_count} total)")
if args.frame_id_start is not None or args.frame_id_end is not None:
original_count = len(trajectories)
trajectories = {
track_id: filter_trajectory_by_frame_id_range(
trajectory,
frame_id_start=args.frame_id_start,
frame_id_end=args.frame_id_end,
)
for track_id, trajectory in trajectories.items()
}
non_empty_tracks = sum(1 for trajectory in trajectories.values() if trajectory)
print(
"Filtered trajectories by frame_id range "
f"[{args.frame_id_start if args.frame_id_start is not None else '-inf'}, "
f"{args.frame_id_end if args.frame_id_end is not None else '+inf'}]: "
f"{non_empty_tracks}/{original_count} tracks still have samples"
)
# Filter by minimum length
trajectories = {track_id: traj for track_id, traj in trajectories.items()
if len(traj) >= args.min_length}
print(f"Evaluating {len(trajectories)} tracks with length >= {args.min_length}")
# Evaluate each trajectory
print("Computing stability metrics...")
trajectory_metrics = []
for track_id, trajectory in trajectories.items():
# Get class_id from first detection
class_id = trajectory[0][1]['class_id']
metrics = evaluate_trajectory(track_id, trajectory, class_id, heading_source=args.heading_source)
if metrics is not None:
trajectory_metrics.append(metrics)
print(f"Successfully evaluated {len(trajectory_metrics)} trajectories")
# Compute aggregate statistics
print("Computing aggregate statistics...")
aggregate_stats = compute_aggregate_statistics(trajectory_metrics)
# Prepare output report
report = {
'summary': {
'total_frames': len(tracking_data),
'total_tracks': len(trajectories),
'evaluated_tracks': len(trajectory_metrics),
'min_trajectory_length': args.min_length,
'specified_class_id': args.class_id if args.class_id is not None else 'all',
'specified_track_ids': args.track_ids if args.track_ids is not None else 'all',
'frame_id_start': args.frame_id_start,
'frame_id_end': args.frame_id_end,
'heading_source': args.heading_source,
'prefer_ego_for_focus_observation': args.prefer_ego,
'focus_plot_x_axis': args.x_axis,
},
'aggregate_statistics': aggregate_stats,
'per_trajectory_metrics': trajectory_metrics,
}
# Determine output path
if not args.output:
output_path = input_path.parent / 'stability_report.json'
else:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
observation_summaries = []
if args.export_series or args.focus_track_plots:
print("Exporting per-track observation artifacts...")
observation_summaries = export_focus_track_artifacts(
trajectories,
output_path.parent,
prefer_ego=args.prefer_ego,
x_axis=args.x_axis,
export_series=args.export_series,
focus_track_plots=args.focus_track_plots,
frame_id_start=args.frame_id_start,
frame_id_end=args.frame_id_end,
heading_source=args.heading_source,
)
report['focus_track_observation'] = observation_summaries
# Save report
print(f"Saving stability report to {output_path}")
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
# Print summary
print("\n" + "="*80)
print("TEMPORAL STABILITY EVALUATION SUMMARY")
print("="*80)
if args.class_id is not None or args.track_ids or args.frame_id_start is not None or args.frame_id_end is not None:
print(f"\nFilter Settings:")
if args.class_id is not None:
print(f" Class ID: {args.class_id}")
if args.track_ids:
print(f" Track IDs: {args.track_ids}")
if args.frame_id_start is not None or args.frame_id_end is not None:
print(
" Frame ID Range: "
f"[{args.frame_id_start if args.frame_id_start is not None else '-inf'}, "
f"{args.frame_id_end if args.frame_id_end is not None else '+inf'}]"
)
print(f" Heading Source: {args.heading_source}")
print(f" Successfully evaluated: {len(trajectory_metrics)} trajectories")
if args.export_series or args.focus_track_plots:
print(f" Exported focus tracks: {len(observation_summaries)}")
for key, stats in aggregate_stats.items():
if not stats:
continue
print(f"\n{key.upper()}:")
print(f" Number of trajectories: {stats['num_trajectories']}")
print(f" Average trajectory length: {stats['trajectory_length']['mean']:.1f} ± {stats['trajectory_length']['std']:.1f} frames")
print(f" Position jitter: {stats['position_jitter']['mean']:.4f} ± {stats['position_jitter']['std']:.4f} m/frame (max: {stats['position_jitter']['max']:.4f})")
print(f" Dimension jitter: L={stats['dimension_jitter']['length_mean']:.4f}, H={stats['dimension_jitter']['height_mean']:.4f}, W={stats['dimension_jitter']['width_mean']:.4f} m/frame")
print(f" Dimension relative change: L={stats['dimension_relative_change']['length_mean']:.2%}, H={stats['dimension_relative_change']['height_mean']:.2%}, W={stats['dimension_relative_change']['width_mean']:.2%}")
print(f" Rotation jitter: {stats['rotation_jitter']['mean']:.4f} ± {stats['rotation_jitter']['std']:.4f} rad/frame (max: {stats['rotation_jitter']['max']:.4f})")
print(f" Average velocity: {stats['velocity']['mean']:.4f} ± {stats['velocity']['std']:.4f} m/frame")
print(f" Average acceleration: {stats['acceleration']['mean']:.4f} ± {stats['acceleration']['std']:.4f} m/frame²")
print("\n" + "="*80)
print(f"Report saved to: {output_path}")
if args.export_series or args.focus_track_plots:
print(f"Focus-track artifacts saved under: {output_path.parent}")
# Generate plots
if args.plots and trajectory_metrics:
print("\nGenerating visualization plots...")
plot_dir = output_path.parent / 'stability_plots'
plot_stability_metrics(trajectory_metrics, plot_dir)
print("\nGenerating temporal distribution plots...")
temporal_dir = output_path.parent / 'temporal_plots'
plot_temporal_distributions(trajectories, temporal_dir, max_tracks_per_class=5)
elif args.plots:
print("\nSkipping aggregate plots because no trajectories were evaluated.")
print("\nEvaluation completed!")
if __name__ == '__main__':
main()