Files
yolov26_3d/eval_tools/heading_analysis/visualize_heading_errors.py
2026-06-24 09:35:46 +08:00

1163 lines
46 KiB
Python
Executable File

"""
Visualize heading error cases from extracted bad_heading_cases.json
This tool generates BEV, image projection, and angle analysis visualizations
for cases with large heading errors.
Usage:
python eval_tools/visualize_heading_errors.py \
--input bad_heading_cases.json \
--image-root /path/to/images \
--output runs/heading_viz
"""
import argparse
import json
import sys
from pathlib import Path
import cv2
import numpy as np
import matplotlib
matplotlib.use('Agg') # Non-interactive backend for multiprocessing
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle, FancyArrow
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
from functools import partial
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from utils.plots import Annotator, colors
def parse_args():
parser = argparse.ArgumentParser(description='Visualize heading error cases')
parser.add_argument('--input', type=str, required=True,
help='Path to bad_heading_cases.json')
parser.add_argument('--image-root', type=str, required=True,
help='Root directory containing case images')
parser.add_argument('--output', type=str, default='runs/heading_viz',
help='Output directory for visualizations')
parser.add_argument('--max-cases', type=int, default=None,
help='Maximum number of cases to visualize')
parser.add_argument('--viz-types', nargs='+',
default=['bev', 'image', 'angle', 'combined'],
choices=['bev', 'image', 'angle', 'combined'],
help='Types of visualizations to generate')
parser.add_argument('--dpi', type=int, default=100,
help='DPI for saved figures')
parser.add_argument('--merge-same-frame', action='store_true',
help='Merge multiple objects in the same frame into one visualization (BEV/image only)')
parser.add_argument('--workers', type=int, default=4,
help='Number of worker processes (default: 4, use 1 to disable multiprocessing)')
return parser.parse_args()
def find_image_file(image_root, case_id, frame_id):
"""Find image file for a case/frame.
Try multiple possible paths:
1. image_root/case_id/images/frame_id.jpg
2. image_root/case_id/frame_id.jpg
3. image_root/images/frame_id.jpg
"""
image_root = Path(image_root)
# Extract base frame name (remove case prefix if exists)
frame_base = frame_id.replace('.jpg', '').replace('.png', '')
# Try different patterns
patterns = [
image_root / case_id / 'images' / f'{frame_base}.jpg',
image_root / case_id / 'images' / f'{frame_base}.png',
image_root / case_id / f'{frame_base}.jpg',
image_root / case_id / f'{frame_base}.png',
image_root / 'images' / f'{frame_base}.jpg',
image_root / 'images' / f'{frame_base}.png',
image_root / f'{frame_base}.jpg',
image_root / f'{frame_base}.png',
]
for pattern in patterns:
if pattern.exists():
return pattern
return None
def draw_bev_blank(bev_range=(-50, 50, 0, 100), bev_res=0.1):
"""Create blank BEV canvas.
Args:
bev_range: (x_min, x_max, z_min, z_max) in meters
bev_res: Resolution in meters per pixel
Returns:
bev_canvas: Blank BEV image
bev_info: Dict with transformation info
"""
x_min, x_max, z_min, z_max = bev_range
bev_width = int((x_max - x_min) / bev_res)
bev_height = int((z_max - z_min) / bev_res)
bev_canvas = np.ones((bev_height, bev_width, 3), dtype=np.uint8) * 255
# Draw grid lines
for x in range(int(x_min), int(x_max), 10):
px = int((x - x_min) / bev_res)
cv2.line(bev_canvas, (px, 0), (px, bev_height), (200, 200, 200), 1)
for z in range(int(z_min), int(z_max), 10):
py = int((z - z_min) / bev_res)
cv2.line(bev_canvas, (0, py), (bev_width, py), (200, 200, 200), 1)
# Draw ego vehicle position (center at z=0)
# In camera coordinates: ego at origin, z=0 at bottom of image (near)
ego_x = int((0 - x_min) / bev_res)
ego_z = int((z_max - 0) / bev_res) # Flipped: z=0 at bottom
cv2.circle(bev_canvas, (ego_x, ego_z), 5, (0, 0, 255), -1)
bev_info = {
'range': bev_range,
'res': bev_res,
'width': bev_width,
'height': bev_height,
'x_min': x_min,
'x_max': x_max,
'z_min': z_min,
'z_max': z_max
}
return bev_canvas, bev_info
def world_to_bev(x, z, bev_info):
"""Convert world coordinates to BEV pixel coordinates.
Camera coordinate system: x right, y down, z forward
BEV image: x maps to columns (right), z maps to rows (up=forward)
"""
px = int((x - bev_info['x_min']) / bev_info['res'])
# Flip z-axis: z increases upward (forward) in the image
py = int((bev_info['z_max'] - z) / bev_info['res'])
return px, py
def draw_3d_box_bev(canvas, center, dimensions, rotation, bev_info, color, thickness=2, draw_arrow=True):
"""Draw 3D bounding box on BEV.
Args:
canvas: BEV image
center: [x, y, z] center in world coordinates
dimensions: [l, w, h] dimensions
rotation: Rotation angle in radians (around Y-axis)
bev_info: BEV transformation info
color: RGB color tuple
thickness: Line thickness
draw_arrow: Whether to draw direction arrow
"""
x, y, z = center
l, w, h = dimensions
# Calculate 4 corners in world coordinates
# Box is centered at (x, z), extends l/2 forward/backward, w/2 left/right
cos_r = np.cos(rotation)
sin_r = np.sin(rotation)
# Corners in local coordinates (before rotation)
corners_local = np.array([
[-l/2, -w/2], # back-left
[ l/2, -w/2], # front-left
[ l/2, w/2], # front-right
[-l/2, w/2], # back-right
])
# Rotate corners
# Current state: rotation=0° points to +z (up in image), rotation=90° points to -x (left)
# Target state: rotation=0° points to +x (right), rotation=90° points to -z (down/forward)
# This means we need to rotate the coordinate frame by -90° (clockwise)
adjusted_rotation = rotation - np.pi / 2
cos_r_adj = np.cos(adjusted_rotation)
sin_r_adj = np.sin(adjusted_rotation)
rotation_matrix = np.array([
[cos_r_adj, -sin_r_adj],
[sin_r_adj, cos_r_adj]
])
corners_rotated = corners_local @ rotation_matrix.T
# Translate to world position
# Note: Change to [x, z] order so rotated[0]→x, rotated[1]→z
corners_world = corners_rotated + np.array([x, z])
# Convert to BEV pixels
corners_bev = []
for corner in corners_world:
px, py = world_to_bev(corner[0], corner[1], bev_info) # corner[0]=x, corner[1]=z
corners_bev.append([px, py])
corners_bev = np.array(corners_bev, dtype=np.int32)
# Draw box
cv2.polylines(canvas, [corners_bev], True, color, thickness)
# Draw direction arrow
if draw_arrow:
# Arrow from center to front center
center_px, center_py = world_to_bev(x, z, bev_info)
front_center = corners_world[1:3].mean(axis=0) # Average of front-left and front-right
front_px, front_py = world_to_bev(front_center[0], front_center[1], bev_info) # [0]=x, [1]=z
cv2.arrowedLine(canvas, (center_px, center_py), (front_px, front_py),
color, thickness, tipLength=0.3)
return canvas
def visualize_bev(case, bev_range=(-20, 20, 0, 80), bev_res=0.1):
"""Generate BEV visualization for a case.
Args:
case: Case dict from bad_heading_cases.json
bev_range: (x_min, x_max, z_min, z_max)
bev_res: Resolution in meters per pixel
Returns:
bev_canvas: BEV visualization image
"""
# Create blank BEV
bev_canvas, bev_info = draw_bev_blank(bev_range, bev_res)
# Extract data
gt_center = case['gt_center']
det_center = case['det_center']
gt_rotation = case['gt_rotation']
det_rotation = case['det_rotation']
# Use fixed dimensions if not available (from typical vehicle size)
# Note: dimensions might not be in the JSON, use defaults
dimensions = [4.5, 1.8, 1.5] # [length, width, height]
# Draw GT (green)
draw_3d_box_bev(bev_canvas, gt_center, dimensions, gt_rotation,
bev_info, color=(0, 255, 0), thickness=2)
# Draw Detection (red)
draw_3d_box_bev(bev_canvas, det_center, dimensions, det_rotation,
bev_info, color=(0, 0, 255), thickness=2)
# Add legend
legend_x = 10
legend_y = 30
cv2.putText(bev_canvas, 'GT', (legend_x, legend_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.rectangle(bev_canvas, (legend_x - 5, legend_y - 20),
(legend_x + 30, legend_y + 5), (0, 255, 0), 2)
cv2.putText(bev_canvas, 'Pred', (legend_x, legend_y + 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
cv2.rectangle(bev_canvas, (legend_x - 5, legend_y + 20),
(legend_x + 50, legend_y + 45), (0, 0, 255), 2)
# Add error info
error_text = f"Heading Error: {case['heading_error_deg']:.1f}°"
if case['is_reversal']:
error_text += " (REVERSAL)"
cv2.putText(bev_canvas, error_text, (legend_x, bev_info['height'] - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
return bev_canvas
def visualize_bev_multi(cases, bev_range=(-20, 20, 0, 80), bev_res=0.1):
"""Generate BEV visualization for multiple objects in the same frame.
Args:
cases: List of case dicts from bad_heading_cases.json
bev_range: (x_min, x_max, z_min, z_max)
bev_res: Resolution in meters per pixel
Returns:
bev_canvas: BEV visualization image with all objects
"""
# Create blank BEV
bev_canvas, bev_info = draw_bev_blank(bev_range, bev_res)
# Color palette for different objects (alternating colors)
colors_gt = [(0, 255, 0), (0, 200, 100), (0, 150, 150)] # Green shades
colors_det = [(0, 0, 255), (100, 0, 200), (150, 0, 150)] # Red/purple shades
dimensions = [4.5, 1.8, 1.5] # Default dimensions
# Draw all objects
for idx, case in enumerate(cases):
gt_center = case['gt_center']
det_center = case['det_center']
gt_rotation = case['gt_rotation']
det_rotation = case['det_rotation']
# Use different colors for different objects
color_idx = idx % len(colors_gt)
gt_color = colors_gt[color_idx]
det_color = colors_det[color_idx]
# Draw GT
draw_3d_box_bev(bev_canvas, gt_center, dimensions, gt_rotation,
bev_info, color=gt_color, thickness=2)
# Draw Detection
draw_3d_box_bev(bev_canvas, det_center, dimensions, det_rotation,
bev_info, color=det_color, thickness=2)
# Add object number label near the box
center_px, center_py = world_to_bev(det_center[0], det_center[2], bev_info)
cv2.putText(bev_canvas, f'#{idx+1}', (center_px + 10, center_py),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, det_color, 2)
# Add legend
legend_x = 10
legend_y = 30
cv2.putText(bev_canvas, 'GT', (legend_x, legend_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.rectangle(bev_canvas, (legend_x - 5, legend_y - 20),
(legend_x + 30, legend_y + 5), (0, 255, 0), 2)
cv2.putText(bev_canvas, 'Pred', (legend_x, legend_y + 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
cv2.rectangle(bev_canvas, (legend_x - 5, legend_y + 20),
(legend_x + 50, legend_y + 45), (0, 0, 255), 2)
# Add info
info_text = f"{len(cases)} objects with heading errors"
cv2.putText(bev_canvas, info_text, (legend_x, bev_info['height'] - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
return bev_canvas
def visualize_image(case, image_path):
"""Generate image visualization with 2D boxes.
Args:
case: Case dict from bad_heading_cases.json
image_path: Path to image file
Returns:
vis_image: Annotated image
"""
if image_path is None or not Path(image_path).exists():
# Create placeholder
vis_image = np.ones((720, 1280, 3), dtype=np.uint8) * 128
cv2.putText(vis_image, 'Image not found', (100, 360),
cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3)
return vis_image
# Load image
image = cv2.imread(str(image_path))
if image is None:
vis_image = np.ones((720, 1280, 3), dtype=np.uint8) * 128
cv2.putText(vis_image, f'Failed to load: {image_path.name}', (100, 360),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
return vis_image
vis_image = image.copy()
# Draw GT bbox (green)
gt_bbox = case['gt_bbox_2d']
if gt_bbox and len(gt_bbox) == 4:
x1, y1, x2, y2 = [int(x) for x in gt_bbox]
cv2.rectangle(vis_image, (x1, y1), (x2, y2), (0, 255, 0), 3)
cv2.putText(vis_image, 'GT', (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
# Draw Detection bbox (red)
det_bbox = case['det_bbox_2d']
if det_bbox and len(det_bbox) == 4:
x1, y1, x2, y2 = [int(x) for x in det_bbox]
cv2.rectangle(vis_image, (x1, y1), (x2, y2), (0, 0, 255), 3)
cv2.putText(vis_image, f"Pred (conf={case['confidence']:.2f})", (x1, y2 + 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
# Add error info
info_y = 30
cv2.putText(vis_image, f"Class: {case['class']}", (10, info_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(vis_image, f"Heading Error: {case['heading_error_deg']:.1f}°", (10, info_y + 35),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 2)
cv2.putText(vis_image, f"Distance: {case['distance']:.1f}m", (10, info_y + 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
if case['is_reversal']:
cv2.putText(vis_image, "REVERSAL ERROR", (10, info_y + 105),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 3)
return vis_image
def visualize_image_multi(cases, image_path):
"""Generate image visualization for multiple objects in the same frame.
Args:
cases: List of case dicts from bad_heading_cases.json
image_path: Path to the image file
Returns:
image_canvas: Image with all 2D boxes
"""
# Load image
if not image_path.exists():
# Return placeholder image
vis_image = np.ones((720, 1280, 3), dtype=np.uint8) * 128
cv2.putText(vis_image, f'Failed to load: {image_path.name}', (100, 360),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
return vis_image
image = cv2.imread(str(image_path))
if image is None:
vis_image = np.ones((720, 1280, 3), dtype=np.uint8) * 128
cv2.putText(vis_image, f'Failed to load: {image_path.name}', (100, 360),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
return vis_image
vis_image = image.copy()
# Color palette for different objects
colors_gt = [(0, 255, 0), (0, 200, 100), (0, 150, 150), (100, 255, 100)] # Green shades
colors_det = [(0, 0, 255), (100, 0, 200), (150, 0, 150), (200, 0, 100)] # Red/purple shades
# Draw all objects
for idx, case in enumerate(cases):
# Use different colors for different objects
color_idx = idx % len(colors_gt)
gt_color = colors_gt[color_idx]
det_color = colors_det[color_idx]
# Draw GT bbox
gt_bbox = case['gt_bbox_2d']
if gt_bbox and len(gt_bbox) == 4:
x1, y1, x2, y2 = [int(x) for x in gt_bbox]
cv2.rectangle(vis_image, (x1, y1), (x2, y2), gt_color, 3)
cv2.putText(vis_image, f'GT#{idx+1}', (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, gt_color, 2)
# Draw Detection bbox
det_bbox = case['det_bbox_2d']
if det_bbox and len(det_bbox) == 4:
x1, y1, x2, y2 = [int(x) for x in det_bbox]
cv2.rectangle(vis_image, (x1, y1), (x2, y2), det_color, 3)
cv2.putText(vis_image, f"Pred#{idx+1} ({case['confidence']:.2f})", (x1, y2 + 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, det_color, 2)
# Add summary info
info_y = 30
cv2.putText(vis_image, f"{len(cases)} objects with heading errors", (10, info_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 2)
# Show error details for each object
for idx, case in enumerate(cases):
info_text = f"#{idx+1}: {case['heading_error_deg']:.1f}° ({case['class']})"
if case['is_reversal']:
info_text += " REVERSAL"
cv2.putText(vis_image, info_text, (10, info_y + 35 + idx * 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
return vis_image
def visualize_angle(case):
"""Generate angle comparison visualization.
Creates a polar plot showing GT and predicted angles.
Args:
case: Case dict from bad_heading_cases.json
Returns:
fig: Matplotlib figure
"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# Polar plot
ax1 = plt.subplot(121, projection='polar')
gt_angle = case['gt_rotation']
det_angle = case['det_rotation']
error = case['heading_error']
# Plot angles
ax1.plot([gt_angle, gt_angle], [0, 1], 'g-', linewidth=3, label='GT')
ax1.plot([det_angle, det_angle], [0, 1], 'r-', linewidth=3, label='Pred')
ax1.scatter([gt_angle], [1], c='green', s=200, marker='^', zorder=10)
ax1.scatter([det_angle], [1], c='red', s=200, marker='v', zorder=10)
ax1.set_ylim(0, 1.2)
ax1.set_theta_zero_location('E') # 0° points right (camera x-axis)
ax1.set_theta_direction(-1) # Clockwise (90° points down)
ax1.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1))
ax1.set_title(f'Heading Comparison\nError: {np.degrees(error):.1f}°', fontsize=12, pad=20)
# Bar plot
ax2 = plt.subplot(122)
categories = ['GT', 'Pred']
angles_deg = [np.degrees(gt_angle), np.degrees(det_angle)]
colors_bar = ['green', 'red']
bars = ax2.bar(categories, angles_deg, color=colors_bar, alpha=0.7, edgecolor='black', linewidth=2)
ax2.set_ylabel('Rotation Angle (degrees)', fontsize=11)
ax2.set_title(f'Rotation Comparison', fontsize=12)
ax2.grid(True, axis='y', alpha=0.3)
ax2.axhline(0, color='black', linewidth=0.5)
# Add value labels on bars
for bar, angle in zip(bars, angles_deg):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{angle:.1f}°',
ha='center', va='bottom' if height > 0 else 'top', fontsize=10)
# Add case info
fig.text(0.5, 0.02,
f"Case: {case['case_id'][:20]}... | Frame: {case['frame_id'][:30]}... | "
f"Class: {case['class']} | Distance: {case['distance']:.1f}m | "
f"Confidence: {case['confidence']:.2f}",
ha='center', fontsize=9, style='italic')
plt.tight_layout(rect=[0, 0.05, 1, 1])
return fig
def visualize_combined(case, image_path, bev_range=(-20, 20, 0, 80)):
"""Generate combined visualization with BEV, image, and stats.
Args:
case: Case dict from bad_heading_cases.json
image_path: Path to image file
bev_range: BEV range for visualization
Returns:
fig: Matplotlib figure with combined view
"""
fig = plt.figure(figsize=(16, 10))
gs = fig.add_gridspec(2, 2, height_ratios=[1, 1], width_ratios=[1, 1],
hspace=0.3, wspace=0.3)
# BEV view (top-left)
ax1 = fig.add_subplot(gs[0, 0])
bev_canvas = visualize_bev(case, bev_range)
ax1.imshow(cv2.cvtColor(bev_canvas, cv2.COLOR_BGR2RGB))
ax1.set_title('Bird\'s Eye View (BEV)', fontsize=14, fontweight='bold')
ax1.axis('off')
# Image view (top-right)
ax2 = fig.add_subplot(gs[0, 1])
vis_image = visualize_image(case, image_path)
ax2.imshow(cv2.cvtColor(vis_image, cv2.COLOR_BGR2RGB))
ax2.set_title('Image View', fontsize=14, fontweight='bold')
ax2.axis('off')
# Polar angle plot (bottom-left)
ax3 = fig.add_subplot(gs[1, 0], projection='polar')
gt_angle = case['gt_rotation']
det_angle = case['det_rotation']
ax3.plot([gt_angle, gt_angle], [0, 1], 'g-', linewidth=3, label='GT')
ax3.plot([det_angle, det_angle], [0, 1], 'r-', linewidth=3, label='Pred')
ax3.scatter([gt_angle], [1], c='green', s=200, marker='^', zorder=10)
ax3.scatter([det_angle], [1], c='red', s=200, marker='v', zorder=10)
ax3.set_ylim(0, 1.2)
ax3.set_theta_zero_location('E') # 0° points right (camera x-axis)
ax3.set_theta_direction(-1) # Clockwise (90° points down)
ax3.legend(loc='upper right', fontsize=10)
ax3.set_title('Heading Comparison', fontsize=14, fontweight='bold', pad=20)
# Statistics panel (bottom-right)
ax4 = fig.add_subplot(gs[1, 1])
ax4.axis('off')
# Create statistics text
stats_text = f"""
CASE INFORMATION
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Case ID: {case['case_id'][:40]}...
Frame ID: {case['frame_id'][:40]}...
Class: {case['class'].upper()}
DETECTION METRICS
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Confidence: {case['confidence']:.3f}
IoU: {case['iou']:.3f}
Distance: {case['distance']:.2f} m
HEADING ERROR
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Error: {case['heading_error_deg']:.2f}° ({case['heading_error']:.3f} rad)
GT Angle: {case['gt_rotation_deg']:.2f}°
Pred Angle: {case['det_rotation_deg']:.2f}°
Reversal: {'YES - 180° ERROR' if case['is_reversal'] else 'No'}
POSITION ERROR
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Lateral: {case['lateral_error']:.3f} m
Longitudinal: {case['longitudinal_error']:.3f} m
3D CENTERS
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
GT: [{case['gt_center'][0]:6.2f}, {case['gt_center'][1]:6.2f}, {case['gt_center'][2]:6.2f}]
Pred: [{case['det_center'][0]:6.2f}, {case['det_center'][1]:6.2f}, {case['det_center'][2]:6.2f}]
"""
# Add colored background for reversal errors
if case['is_reversal']:
ax4.add_patch(Rectangle((0, 0), 1, 1, transform=ax4.transAxes,
facecolor='red', alpha=0.1))
ax4.text(0.05, 0.95, stats_text.strip(), transform=ax4.transAxes,
fontsize=10, verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='wheat' if case['is_reversal'] else 'lightblue',
alpha=0.3, edgecolor='black', linewidth=2))
# Add title
fig.suptitle(f"Heading Error Analysis - {case['class'].upper()} (Error: {case['heading_error_deg']:.1f}°)",
fontsize=16, fontweight='bold', y=0.98)
return fig
def visualize_combined_multi(cases, image_path, bev_range=(-20, 20, 0, 80)):
"""Generate combined visualization for multiple objects in the same frame.
Args:
cases: List of case dicts from bad_heading_cases.json
image_path: Path to image file
bev_range: BEV range for visualization
Returns:
fig: Matplotlib figure with combined view
"""
fig = plt.figure(figsize=(16, 10))
gs = fig.add_gridspec(2, 2, height_ratios=[1, 1], width_ratios=[1, 1],
hspace=0.3, wspace=0.3)
# BEV view (top-left) - merged
ax1 = fig.add_subplot(gs[0, 0])
bev_canvas = visualize_bev_multi(cases, bev_range)
ax1.imshow(cv2.cvtColor(bev_canvas, cv2.COLOR_BGR2RGB))
ax1.set_title('Bird\'s Eye View (BEV)', fontsize=14, fontweight='bold')
ax1.axis('off')
# Image view (top-right) - merged
ax2 = fig.add_subplot(gs[0, 1])
vis_image = visualize_image_multi(cases, image_path)
ax2.imshow(cv2.cvtColor(vis_image, cv2.COLOR_BGR2RGB))
ax2.set_title('Image View', fontsize=14, fontweight='bold')
ax2.axis('off')
# Polar angle plot (bottom-left) - show all objects
ax3 = fig.add_subplot(gs[1, 0], projection='polar')
colors_gt = ['green', 'darkgreen', 'lightgreen', 'seagreen']
colors_det = ['red', 'darkred', 'lightcoral', 'crimson']
for idx, case in enumerate(cases):
gt_angle = case['gt_rotation']
det_angle = case['det_rotation']
color_gt = colors_gt[idx % len(colors_gt)]
color_det = colors_det[idx % len(colors_det)]
# Plot with different line styles for different objects
linestyle = ['-', '--', '-.', ':'][idx % 4]
ax3.plot([gt_angle, gt_angle], [0, 1], color=color_gt, linewidth=2,
linestyle=linestyle, label=f'GT#{idx+1}', alpha=0.8)
ax3.plot([det_angle, det_angle], [0, 1], color=color_det, linewidth=2,
linestyle=linestyle, label=f'Pred#{idx+1}', alpha=0.8)
ax3.scatter([gt_angle], [1], c=color_gt, s=150, marker='^', zorder=10, alpha=0.8)
ax3.scatter([det_angle], [1], c=color_det, s=150, marker='v', zorder=10, alpha=0.8)
ax3.set_ylim(0, 1.2)
ax3.set_theta_zero_location('E') # 0° points right (camera x-axis)
ax3.set_theta_direction(-1) # Clockwise (90° points down)
ax3.legend(loc='upper right', fontsize=8, ncol=2)
ax3.set_title('Heading Comparison (All Objects)', fontsize=14, fontweight='bold', pad=20)
# Statistics panel (bottom-right) - summary of all objects
ax4 = fig.add_subplot(gs[1, 1])
ax4.axis('off')
# Create statistics text for all objects
stats_lines = []
stats_lines.append("MULTIPLE OBJECTS ANALYSIS")
stats_lines.append("" * 42)
stats_lines.append(f"Frame: {cases[0]['frame_id'][:35]}...")
stats_lines.append(f"Case: {cases[0]['case_id'][:35]}...")
stats_lines.append(f"Total objects: {len(cases)}")
stats_lines.append("")
for idx, case in enumerate(cases):
stats_lines.append(f"OBJECT #{idx+1} - {case['class'].upper()}")
stats_lines.append("" * 42)
stats_lines.append(f"Heading Error: {case['heading_error_deg']:6.1f}° {'(REVERSAL)' if case['is_reversal'] else ''}")
stats_lines.append(f"GT Angle: {case['gt_rotation_deg']:6.1f}°")
stats_lines.append(f"Pred Angle: {case['det_rotation_deg']:6.1f}°")
stats_lines.append(f"Confidence: {case['confidence']:6.3f}")
stats_lines.append(f"Distance: {case['distance']:6.1f} m")
stats_lines.append(f"IoU: {case['iou']:6.3f}")
stats_lines.append(f"Lateral Error: {case['lateral_error']:6.3f} m")
stats_lines.append(f"Longitud. Err: {case['longitudinal_error']:6.3f} m")
if idx < len(cases) - 1:
stats_lines.append("")
stats_text = "\n".join(stats_lines)
# Add colored background if any reversal errors
has_reversal = any(c['is_reversal'] for c in cases)
if has_reversal:
ax4.add_patch(Rectangle((0, 0), 1, 1, transform=ax4.transAxes,
facecolor='red', alpha=0.1))
# Adjust font size based on number of objects
fontsize = max(7, 10 - len(cases))
ax4.text(0.05, 0.95, stats_text, transform=ax4.transAxes,
fontsize=fontsize, verticalalignment='top', fontfamily='monospace',
bbox=dict(boxstyle='round', facecolor='wheat' if has_reversal else 'lightblue',
alpha=0.3, edgecolor='black', linewidth=2))
# Add title
max_error = max(c['heading_error_deg'] for c in cases)
fig.suptitle(f"Multi-Object Heading Error Analysis - {len(cases)} Objects (Max Error: {max_error:.1f}°)",
fontsize=16, fontweight='bold', y=0.98)
return fig
def process_single_case(case_data):
"""Process a single case for visualization.
Args:
case_data: Tuple of (case, args_dict, image_root, output_dir, frame_groups, obj_idx, suffix)
Returns:
Tuple of (success, case_id, error_msg)
"""
case, args_dict, image_root, output_dir, frame_groups, obj_idx, suffix = case_data
case_id = case['case_id']
frame_id = case['frame_id']
viz_types = args_dict['viz_types']
dpi = args_dict['dpi']
try:
# Find image file
image_path = find_image_file(image_root, case_id, frame_id)
# Generate BEV
if 'bev' in viz_types:
bev_canvas = visualize_bev(case)
bev_case_dir = output_dir / 'bev' / case_id
bev_case_dir.mkdir(parents=True, exist_ok=True)
bev_path = bev_case_dir / f'{frame_id}{suffix}_bev.jpg'
cv2.imwrite(str(bev_path), bev_canvas)
# Generate image visualization
if 'image' in viz_types:
vis_image = visualize_image(case, image_path)
img_case_dir = output_dir / 'image' / case_id
img_case_dir.mkdir(parents=True, exist_ok=True)
img_path = img_case_dir / f'{frame_id}{suffix}_image.jpg'
cv2.imwrite(str(img_path), vis_image)
# Generate angle plot
if 'angle' in viz_types:
fig_angle = visualize_angle(case)
angle_case_dir = output_dir / 'angle' / case_id
angle_case_dir.mkdir(parents=True, exist_ok=True)
angle_path = angle_case_dir / f'{frame_id}{suffix}_angle.png'
fig_angle.savefig(angle_path, dpi=dpi, bbox_inches='tight')
plt.close(fig_angle)
# Generate combined view
if 'combined' in viz_types:
fig_combined = visualize_combined(case, image_path)
combined_case_dir = output_dir / 'combined' / case_id
combined_case_dir.mkdir(parents=True, exist_ok=True)
combined_path = combined_case_dir / f'{frame_id}{suffix}_combined.png'
fig_combined.savefig(combined_path, dpi=dpi, bbox_inches='tight')
plt.close(fig_combined)
return (True, case_id, None)
except Exception as e:
return (False, case_id, str(e))
def process_merged_frame(frame_data):
"""Process a frame with merged objects.
Args:
frame_data: Tuple of (frame_key, frame_cases, args_dict, image_root, output_dir)
Returns:
Tuple of (success, case_id, error_msg)
"""
frame_key, frame_cases, args_dict, image_root, output_dir = frame_data
case_id, frame_id = frame_key
viz_types = args_dict['viz_types']
dpi = args_dict['dpi']
try:
# Find image file
image_path = find_image_file(image_root, case_id, frame_id)
# Generate merged BEV and image
if 'bev' in viz_types:
bev_canvas = visualize_bev_multi(frame_cases)
bev_case_dir = output_dir / 'bev' / case_id
bev_case_dir.mkdir(parents=True, exist_ok=True)
bev_path = bev_case_dir / f'{frame_id}_bev.jpg'
cv2.imwrite(str(bev_path), bev_canvas)
if 'image' in viz_types:
vis_image = visualize_image_multi(frame_cases, image_path)
img_case_dir = output_dir / 'image' / case_id
img_case_dir.mkdir(parents=True, exist_ok=True)
img_path = img_case_dir / f'{frame_id}_image.jpg'
cv2.imwrite(str(img_path), vis_image)
# Generate merged combined view for all objects
if 'combined' in viz_types:
fig_combined = visualize_combined_multi(frame_cases, image_path)
combined_case_dir = output_dir / 'combined' / case_id
combined_case_dir.mkdir(parents=True, exist_ok=True)
combined_path = combined_case_dir / f'{frame_id}_combined.png'
fig_combined.savefig(combined_path, dpi=dpi, bbox_inches='tight')
plt.close(fig_combined)
# Generate separate angle plots for each object (cannot be merged)
for obj_idx, case in enumerate(frame_cases):
if 'angle' in viz_types:
fig_angle = visualize_angle(case)
angle_case_dir = output_dir / 'angle' / case_id
angle_case_dir.mkdir(parents=True, exist_ok=True)
angle_path = angle_case_dir / f'{frame_id}_obj{obj_idx+1}_angle.png'
fig_angle.savefig(angle_path, dpi=dpi, bbox_inches='tight')
plt.close(fig_angle)
return (True, case_id, None)
except Exception as e:
return (False, case_id, str(e))
def main():
args = parse_args()
# Load bad cases
input_path = Path(args.input)
if not input_path.exists():
print(f"Error: Input file not found: {input_path}")
sys.exit(1)
print(f"Loading bad cases from {input_path}...")
with open(input_path, 'r') as f:
data = json.load(f)
cases = data['cases']
metadata = data['metadata']
print(f"Loaded {len(cases)} bad cases")
print(f"Source: {metadata['source']}")
print(f"Threshold: {metadata['threshold']:.2f} rad ({metadata['threshold_degrees']:.1f}°)")
# Limit cases if requested
if args.max_cases:
cases = cases[:args.max_cases]
print(f"Limited to {len(cases)} cases")
# Group cases by frame for statistics
from collections import defaultdict
frame_groups = defaultdict(list)
for idx, case in enumerate(cases):
key = (case['case_id'], case['frame_id'])
frame_groups[key].append((idx, case))
multi_target_frames = {k: v for k, v in frame_groups.items() if len(v) > 1}
print(f"\nFrame statistics:")
print(f" Total cases: {len(cases)}")
print(f" Unique frames: {len(frame_groups)}")
print(f" Frames with multiple targets: {len(multi_target_frames)}")
if multi_target_frames:
max_targets = max(len(v) for v in multi_target_frames.values())
print(f" Max targets in one frame: {max_targets}")
if args.merge_same_frame and multi_target_frames:
print(f"\n⚠️ Merge mode enabled: BEV/image/combined will show all targets in the same frame")
print(f" Angle plots will still generate separate files for each target")
elif multi_target_frames:
print(f"\n⚠️ Multiple targets per frame detected!")
print(f" Each target will get a unique file: {{frame_id}}_obj{{N}}_{{viz_type}}.ext")
print(f" Use --merge-same-frame to draw all targets on the same BEV/image")
# Create output directory
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
for viz_type in args.viz_types:
(output_dir / viz_type).mkdir(exist_ok=True)
print(f"\nOutput directory: {output_dir}")
print(f"Visualization types: {args.viz_types}")
# Determine number of workers
num_workers = args.workers
if num_workers < 1:
num_workers = 1
elif num_workers > cpu_count():
print(f"Warning: Requested {num_workers} workers but only {cpu_count()} CPUs available")
num_workers = cpu_count()
use_multiprocessing = num_workers > 1
print(f"\nProcessing mode: {'Multiprocessing' if use_multiprocessing else 'Single process'}")
if use_multiprocessing:
print(f"Workers: {num_workers}")
# Process cases
print(f"\nGenerating visualizations...")
# Prepare args dict for worker processes
args_dict = {
'viz_types': args.viz_types,
'dpi': args.dpi
}
if args.merge_same_frame and ('bev' in args.viz_types or 'image' in args.viz_types or 'combined' in args.viz_types):
# Prepare data for merged frame processing
frame_data_list = []
for frame_key, objects in frame_groups.items():
frame_cases = [c for _, c in objects]
frame_data = (frame_key, frame_cases, args_dict, args.image_root, output_dir)
frame_data_list.append(frame_data)
# Process with multiprocessing or single process
if use_multiprocessing:
with Pool(num_workers) as pool:
results = list(tqdm(
pool.imap(process_merged_frame, frame_data_list),
total=len(frame_data_list),
desc="Processing frames"
))
else:
results = []
for frame_data in tqdm(frame_data_list, desc="Processing frames"):
results.append(process_merged_frame(frame_data))
# Report errors
errors = [(case_id, err) for success, case_id, err in results if not success]
if errors:
print(f"\n{len(errors)} frames had errors:")
for case_id, err in errors[:10]: # Show first 10
print(f" {case_id}: {err}")
if len(errors) > 10:
print(f" ... and {len(errors) - 10} more")
# Original sequential code (kept for reference, but not executed)
if False:
processed_frames = set()
for i, case in enumerate(tqdm(cases, desc="Processing cases")):
case_id = case['case_id']
frame_id = case['frame_id']
frame_key = (case_id, frame_id)
# Find image file
image_path = find_image_file(args.image_root, case_id, frame_id)
# Get all objects in this frame
frame_objects = frame_groups[frame_key]
obj_idx = next(idx for idx, (j, _) in enumerate(frame_objects) if j == i)
try:
# For BEV and image: merge all objects in the same frame
if frame_key not in processed_frames:
if 'bev' in args.viz_types:
bev_canvas = visualize_bev_multi([c for _, c in frame_objects])
bev_case_dir = output_dir / 'bev' / case_id
bev_case_dir.mkdir(parents=True, exist_ok=True)
bev_path = bev_case_dir / f'{frame_id}_bev.jpg'
cv2.imwrite(str(bev_path), bev_canvas)
if 'image' in args.viz_types:
vis_image = visualize_image_multi([c for _, c in frame_objects], image_path)
img_case_dir = output_dir / 'image' / case_id
img_case_dir.mkdir(parents=True, exist_ok=True)
img_path = img_case_dir / f'{frame_id}_image.jpg'
cv2.imwrite(str(img_path), vis_image)
processed_frames.add(frame_key)
# For angle and combined: generate separate files for each object
if 'angle' in args.viz_types:
fig_angle = visualize_angle(case)
angle_case_dir = output_dir / 'angle' / case_id
angle_case_dir.mkdir(parents=True, exist_ok=True)
angle_path = angle_case_dir / f'{frame_id}_obj{obj_idx+1}_angle.png'
fig_angle.savefig(angle_path, dpi=args.dpi, bbox_inches='tight')
plt.close(fig_angle)
if 'combined' in args.viz_types:
fig_combined = visualize_combined(case, image_path)
combined_case_dir = output_dir / 'combined' / case_id
combined_case_dir.mkdir(parents=True, exist_ok=True)
combined_path = combined_case_dir / f'{frame_id}_obj{obj_idx+1}_combined.png'
fig_combined.savefig(combined_path, dpi=args.dpi, bbox_inches='tight')
plt.close(fig_combined)
except Exception as e:
print(f"\nError processing case {i} ({case_id}): {e}")
continue
else:
# Prepare data for separate object processing
case_data_list = []
for i, case in enumerate(cases):
case_id = case['case_id']
frame_id = case['frame_id']
frame_key = (case_id, frame_id)
# Determine object index for this frame
frame_objects = frame_groups[frame_key]
obj_idx = next(idx for idx, (j, _) in enumerate(frame_objects) if j == i)
# Add object index to filename if multiple objects in frame
if len(frame_objects) > 1:
suffix = f'_obj{obj_idx+1}'
else:
suffix = ''
case_data = (case, args_dict, args.image_root, output_dir, frame_groups, obj_idx, suffix)
case_data_list.append(case_data)
# Process with multiprocessing or single process
if use_multiprocessing:
with Pool(num_workers) as pool:
results = list(tqdm(
pool.imap(process_single_case, case_data_list),
total=len(case_data_list),
desc="Processing cases"
))
else:
results = []
for case_data in tqdm(case_data_list, desc="Processing cases"):
results.append(process_single_case(case_data))
# Report errors
errors = [(case_id, err) for success, case_id, err in results if not success]
if errors:
print(f"\n{len(errors)} cases had errors:")
for case_id, err in errors[:10]: # Show first 10
print(f" {case_id}: {err}")
if len(errors) > 10:
print(f" ... and {len(errors) - 10} more")
# Original sequential code (kept for reference, but not executed)
if False:
for i, case in enumerate(tqdm(cases, desc="Processing cases")):
case_id = case['case_id']
frame_id = case['frame_id']
frame_key = (case_id, frame_id)
# Find image file
image_path = find_image_file(args.image_root, case_id, frame_id)
# Determine object index for this frame
frame_objects = frame_groups[frame_key]
obj_idx = next(idx for idx, (j, _) in enumerate(frame_objects) if j == i)
# Add object index to filename if multiple objects in frame
if len(frame_objects) > 1:
suffix = f'_obj{obj_idx+1}'
else:
suffix = ''
try:
# Generate BEV
if 'bev' in args.viz_types:
bev_canvas = visualize_bev(case)
bev_case_dir = output_dir / 'bev' / case_id
bev_case_dir.mkdir(parents=True, exist_ok=True)
bev_path = bev_case_dir / f'{frame_id}{suffix}_bev.jpg'
cv2.imwrite(str(bev_path), bev_canvas)
# Generate image visualization
if 'image' in args.viz_types:
vis_image = visualize_image(case, image_path)
img_case_dir = output_dir / 'image' / case_id
img_case_dir.mkdir(parents=True, exist_ok=True)
img_path = img_case_dir / f'{frame_id}{suffix}_image.jpg'
cv2.imwrite(str(img_path), vis_image)
# Generate angle plot
if 'angle' in args.viz_types:
fig_angle = visualize_angle(case)
angle_case_dir = output_dir / 'angle' / case_id
angle_case_dir.mkdir(parents=True, exist_ok=True)
angle_path = angle_case_dir / f'{frame_id}{suffix}_angle.png'
fig_angle.savefig(angle_path, dpi=args.dpi, bbox_inches='tight')
plt.close(fig_angle)
# Generate combined view
if 'combined' in args.viz_types:
fig_combined = visualize_combined(case, image_path)
combined_case_dir = output_dir / 'combined' / case_id
combined_case_dir.mkdir(parents=True, exist_ok=True)
combined_path = combined_case_dir / f'{frame_id}{suffix}_combined.png'
fig_combined.savefig(combined_path, dpi=args.dpi, bbox_inches='tight')
plt.close(fig_combined)
except Exception as e:
print(f"\nError processing case {i} ({case_id}): {e}")
continue
print(f"\n{'='*80}")
print(f"Visualization complete!")
print(f"{'='*80}")
print(f"Output directory: {output_dir}")
print(f"Total cases processed: {len(cases)}")
# Count files by traversing directories
for viz_type in args.viz_types:
viz_dir = output_dir / viz_type
# Count all files recursively
num_files = len(list(viz_dir.rglob('*.*')))
num_cases = len(list(viz_dir.glob('*')))
print(f" - {viz_type}: {num_files} files in {num_cases} case directories")
print(f"\nOutput structure:")
print(f" {output_dir}/")
for viz_type in args.viz_types:
print(f" ├── {viz_type}/")
print(f" │ ├── {{case_id}}/")
print(f" │ │ └── {{frame_id}}_{viz_type}.{{'jpg' if viz_type in ['bev', 'image'] else 'png'}}")
print(f"\nYou can now:")
print(f" 1. Browse visualizations in {output_dir}")
print(f" 2. Use heading_error_viewer.py for interactive viewing")
print(f" 3. Generate report with generate_heading_report.py")
print(f"\nExample files:")
# Show first file from each type
for viz_type in args.viz_types:
viz_dir = output_dir / viz_type
files = list(viz_dir.rglob('*.*'))
if files:
rel_path = files[0].relative_to(output_dir)
print(f" {rel_path}")
if __name__ == '__main__':
main()