""" Visualize heading error cases from extracted bad_heading_cases.json This tool generates BEV, image projection, and angle analysis visualizations for cases with large heading errors. Usage: python eval_tools/visualize_heading_errors.py \ --input bad_heading_cases.json \ --image-root /path/to/images \ --output runs/heading_viz """ import argparse import json import sys from pathlib import Path import cv2 import numpy as np import matplotlib matplotlib.use('Agg') # Non-interactive backend for multiprocessing import matplotlib.pyplot as plt from matplotlib.patches import Rectangle, FancyArrow from tqdm import tqdm from multiprocessing import Pool, cpu_count from functools import partial # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from utils.plots import Annotator, colors def parse_args(): parser = argparse.ArgumentParser(description='Visualize heading error cases') parser.add_argument('--input', type=str, required=True, help='Path to bad_heading_cases.json') parser.add_argument('--image-root', type=str, required=True, help='Root directory containing case images') parser.add_argument('--output', type=str, default='runs/heading_viz', help='Output directory for visualizations') parser.add_argument('--max-cases', type=int, default=None, help='Maximum number of cases to visualize') parser.add_argument('--viz-types', nargs='+', default=['bev', 'image', 'angle', 'combined'], choices=['bev', 'image', 'angle', 'combined'], help='Types of visualizations to generate') parser.add_argument('--dpi', type=int, default=100, help='DPI for saved figures') parser.add_argument('--merge-same-frame', action='store_true', help='Merge multiple objects in the same frame into one visualization (BEV/image only)') parser.add_argument('--workers', type=int, default=4, help='Number of worker processes (default: 4, use 1 to disable multiprocessing)') return parser.parse_args() def find_image_file(image_root, case_id, frame_id): """Find image file for a case/frame. Try multiple possible paths: 1. image_root/case_id/images/frame_id.jpg 2. image_root/case_id/frame_id.jpg 3. image_root/images/frame_id.jpg """ image_root = Path(image_root) # Extract base frame name (remove case prefix if exists) frame_base = frame_id.replace('.jpg', '').replace('.png', '') # Try different patterns patterns = [ image_root / case_id / 'images' / f'{frame_base}.jpg', image_root / case_id / 'images' / f'{frame_base}.png', image_root / case_id / f'{frame_base}.jpg', image_root / case_id / f'{frame_base}.png', image_root / 'images' / f'{frame_base}.jpg', image_root / 'images' / f'{frame_base}.png', image_root / f'{frame_base}.jpg', image_root / f'{frame_base}.png', ] for pattern in patterns: if pattern.exists(): return pattern return None def draw_bev_blank(bev_range=(-50, 50, 0, 100), bev_res=0.1): """Create blank BEV canvas. Args: bev_range: (x_min, x_max, z_min, z_max) in meters bev_res: Resolution in meters per pixel Returns: bev_canvas: Blank BEV image bev_info: Dict with transformation info """ x_min, x_max, z_min, z_max = bev_range bev_width = int((x_max - x_min) / bev_res) bev_height = int((z_max - z_min) / bev_res) bev_canvas = np.ones((bev_height, bev_width, 3), dtype=np.uint8) * 255 # Draw grid lines for x in range(int(x_min), int(x_max), 10): px = int((x - x_min) / bev_res) cv2.line(bev_canvas, (px, 0), (px, bev_height), (200, 200, 200), 1) for z in range(int(z_min), int(z_max), 10): py = int((z - z_min) / bev_res) cv2.line(bev_canvas, (0, py), (bev_width, py), (200, 200, 200), 1) # Draw ego vehicle position (center at z=0) # In camera coordinates: ego at origin, z=0 at bottom of image (near) ego_x = int((0 - x_min) / bev_res) ego_z = int((z_max - 0) / bev_res) # Flipped: z=0 at bottom cv2.circle(bev_canvas, (ego_x, ego_z), 5, (0, 0, 255), -1) bev_info = { 'range': bev_range, 'res': bev_res, 'width': bev_width, 'height': bev_height, 'x_min': x_min, 'x_max': x_max, 'z_min': z_min, 'z_max': z_max } return bev_canvas, bev_info def world_to_bev(x, z, bev_info): """Convert world coordinates to BEV pixel coordinates. Camera coordinate system: x right, y down, z forward BEV image: x maps to columns (right), z maps to rows (up=forward) """ px = int((x - bev_info['x_min']) / bev_info['res']) # Flip z-axis: z increases upward (forward) in the image py = int((bev_info['z_max'] - z) / bev_info['res']) return px, py def draw_3d_box_bev(canvas, center, dimensions, rotation, bev_info, color, thickness=2, draw_arrow=True): """Draw 3D bounding box on BEV. Args: canvas: BEV image center: [x, y, z] center in world coordinates dimensions: [l, w, h] dimensions rotation: Rotation angle in radians (around Y-axis) bev_info: BEV transformation info color: RGB color tuple thickness: Line thickness draw_arrow: Whether to draw direction arrow """ x, y, z = center l, w, h = dimensions # Calculate 4 corners in world coordinates # Box is centered at (x, z), extends l/2 forward/backward, w/2 left/right cos_r = np.cos(rotation) sin_r = np.sin(rotation) # Corners in local coordinates (before rotation) corners_local = np.array([ [-l/2, -w/2], # back-left [ l/2, -w/2], # front-left [ l/2, w/2], # front-right [-l/2, w/2], # back-right ]) # Rotate corners # Current state: rotation=0° points to +z (up in image), rotation=90° points to -x (left) # Target state: rotation=0° points to +x (right), rotation=90° points to -z (down/forward) # This means we need to rotate the coordinate frame by -90° (clockwise) adjusted_rotation = rotation - np.pi / 2 cos_r_adj = np.cos(adjusted_rotation) sin_r_adj = np.sin(adjusted_rotation) rotation_matrix = np.array([ [cos_r_adj, -sin_r_adj], [sin_r_adj, cos_r_adj] ]) corners_rotated = corners_local @ rotation_matrix.T # Translate to world position # Note: Change to [x, z] order so rotated[0]→x, rotated[1]→z corners_world = corners_rotated + np.array([x, z]) # Convert to BEV pixels corners_bev = [] for corner in corners_world: px, py = world_to_bev(corner[0], corner[1], bev_info) # corner[0]=x, corner[1]=z corners_bev.append([px, py]) corners_bev = np.array(corners_bev, dtype=np.int32) # Draw box cv2.polylines(canvas, [corners_bev], True, color, thickness) # Draw direction arrow if draw_arrow: # Arrow from center to front center center_px, center_py = world_to_bev(x, z, bev_info) front_center = corners_world[1:3].mean(axis=0) # Average of front-left and front-right front_px, front_py = world_to_bev(front_center[0], front_center[1], bev_info) # [0]=x, [1]=z cv2.arrowedLine(canvas, (center_px, center_py), (front_px, front_py), color, thickness, tipLength=0.3) return canvas def visualize_bev(case, bev_range=(-20, 20, 0, 80), bev_res=0.1): """Generate BEV visualization for a case. Args: case: Case dict from bad_heading_cases.json bev_range: (x_min, x_max, z_min, z_max) bev_res: Resolution in meters per pixel Returns: bev_canvas: BEV visualization image """ # Create blank BEV bev_canvas, bev_info = draw_bev_blank(bev_range, bev_res) # Extract data gt_center = case['gt_center'] det_center = case['det_center'] gt_rotation = case['gt_rotation'] det_rotation = case['det_rotation'] # Use fixed dimensions if not available (from typical vehicle size) # Note: dimensions might not be in the JSON, use defaults dimensions = [4.5, 1.8, 1.5] # [length, width, height] # Draw GT (green) draw_3d_box_bev(bev_canvas, gt_center, dimensions, gt_rotation, bev_info, color=(0, 255, 0), thickness=2) # Draw Detection (red) draw_3d_box_bev(bev_canvas, det_center, dimensions, det_rotation, bev_info, color=(0, 0, 255), thickness=2) # Add legend legend_x = 10 legend_y = 30 cv2.putText(bev_canvas, 'GT', (legend_x, legend_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.rectangle(bev_canvas, (legend_x - 5, legend_y - 20), (legend_x + 30, legend_y + 5), (0, 255, 0), 2) cv2.putText(bev_canvas, 'Pred', (legend_x, legend_y + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) cv2.rectangle(bev_canvas, (legend_x - 5, legend_y + 20), (legend_x + 50, legend_y + 45), (0, 0, 255), 2) # Add error info error_text = f"Heading Error: {case['heading_error_deg']:.1f}°" if case['is_reversal']: error_text += " (REVERSAL)" cv2.putText(bev_canvas, error_text, (legend_x, bev_info['height'] - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2) return bev_canvas def visualize_bev_multi(cases, bev_range=(-20, 20, 0, 80), bev_res=0.1): """Generate BEV visualization for multiple objects in the same frame. Args: cases: List of case dicts from bad_heading_cases.json bev_range: (x_min, x_max, z_min, z_max) bev_res: Resolution in meters per pixel Returns: bev_canvas: BEV visualization image with all objects """ # Create blank BEV bev_canvas, bev_info = draw_bev_blank(bev_range, bev_res) # Color palette for different objects (alternating colors) colors_gt = [(0, 255, 0), (0, 200, 100), (0, 150, 150)] # Green shades colors_det = [(0, 0, 255), (100, 0, 200), (150, 0, 150)] # Red/purple shades dimensions = [4.5, 1.8, 1.5] # Default dimensions # Draw all objects for idx, case in enumerate(cases): gt_center = case['gt_center'] det_center = case['det_center'] gt_rotation = case['gt_rotation'] det_rotation = case['det_rotation'] # Use different colors for different objects color_idx = idx % len(colors_gt) gt_color = colors_gt[color_idx] det_color = colors_det[color_idx] # Draw GT draw_3d_box_bev(bev_canvas, gt_center, dimensions, gt_rotation, bev_info, color=gt_color, thickness=2) # Draw Detection draw_3d_box_bev(bev_canvas, det_center, dimensions, det_rotation, bev_info, color=det_color, thickness=2) # Add object number label near the box center_px, center_py = world_to_bev(det_center[0], det_center[2], bev_info) cv2.putText(bev_canvas, f'#{idx+1}', (center_px + 10, center_py), cv2.FONT_HERSHEY_SIMPLEX, 0.6, det_color, 2) # Add legend legend_x = 10 legend_y = 30 cv2.putText(bev_canvas, 'GT', (legend_x, legend_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.rectangle(bev_canvas, (legend_x - 5, legend_y - 20), (legend_x + 30, legend_y + 5), (0, 255, 0), 2) cv2.putText(bev_canvas, 'Pred', (legend_x, legend_y + 40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) cv2.rectangle(bev_canvas, (legend_x - 5, legend_y + 20), (legend_x + 50, legend_y + 45), (0, 0, 255), 2) # Add info info_text = f"{len(cases)} objects with heading errors" cv2.putText(bev_canvas, info_text, (legend_x, bev_info['height'] - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2) return bev_canvas def visualize_image(case, image_path): """Generate image visualization with 2D boxes. Args: case: Case dict from bad_heading_cases.json image_path: Path to image file Returns: vis_image: Annotated image """ if image_path is None or not Path(image_path).exists(): # Create placeholder vis_image = np.ones((720, 1280, 3), dtype=np.uint8) * 128 cv2.putText(vis_image, 'Image not found', (100, 360), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 3) return vis_image # Load image image = cv2.imread(str(image_path)) if image is None: vis_image = np.ones((720, 1280, 3), dtype=np.uint8) * 128 cv2.putText(vis_image, f'Failed to load: {image_path.name}', (100, 360), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) return vis_image vis_image = image.copy() # Draw GT bbox (green) gt_bbox = case['gt_bbox_2d'] if gt_bbox and len(gt_bbox) == 4: x1, y1, x2, y2 = [int(x) for x in gt_bbox] cv2.rectangle(vis_image, (x1, y1), (x2, y2), (0, 255, 0), 3) cv2.putText(vis_image, 'GT', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) # Draw Detection bbox (red) det_bbox = case['det_bbox_2d'] if det_bbox and len(det_bbox) == 4: x1, y1, x2, y2 = [int(x) for x in det_bbox] cv2.rectangle(vis_image, (x1, y1), (x2, y2), (0, 0, 255), 3) cv2.putText(vis_image, f"Pred (conf={case['confidence']:.2f})", (x1, y2 + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) # Add error info info_y = 30 cv2.putText(vis_image, f"Class: {case['class']}", (10, info_y), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) cv2.putText(vis_image, f"Heading Error: {case['heading_error_deg']:.1f}°", (10, info_y + 35), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 2) cv2.putText(vis_image, f"Distance: {case['distance']:.1f}m", (10, info_y + 70), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) if case['is_reversal']: cv2.putText(vis_image, "REVERSAL ERROR", (10, info_y + 105), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 3) return vis_image def visualize_image_multi(cases, image_path): """Generate image visualization for multiple objects in the same frame. Args: cases: List of case dicts from bad_heading_cases.json image_path: Path to the image file Returns: image_canvas: Image with all 2D boxes """ # Load image if not image_path.exists(): # Return placeholder image vis_image = np.ones((720, 1280, 3), dtype=np.uint8) * 128 cv2.putText(vis_image, f'Failed to load: {image_path.name}', (100, 360), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) return vis_image image = cv2.imread(str(image_path)) if image is None: vis_image = np.ones((720, 1280, 3), dtype=np.uint8) * 128 cv2.putText(vis_image, f'Failed to load: {image_path.name}', (100, 360), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) return vis_image vis_image = image.copy() # Color palette for different objects colors_gt = [(0, 255, 0), (0, 200, 100), (0, 150, 150), (100, 255, 100)] # Green shades colors_det = [(0, 0, 255), (100, 0, 200), (150, 0, 150), (200, 0, 100)] # Red/purple shades # Draw all objects for idx, case in enumerate(cases): # Use different colors for different objects color_idx = idx % len(colors_gt) gt_color = colors_gt[color_idx] det_color = colors_det[color_idx] # Draw GT bbox gt_bbox = case['gt_bbox_2d'] if gt_bbox and len(gt_bbox) == 4: x1, y1, x2, y2 = [int(x) for x in gt_bbox] cv2.rectangle(vis_image, (x1, y1), (x2, y2), gt_color, 3) cv2.putText(vis_image, f'GT#{idx+1}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, gt_color, 2) # Draw Detection bbox det_bbox = case['det_bbox_2d'] if det_bbox and len(det_bbox) == 4: x1, y1, x2, y2 = [int(x) for x in det_bbox] cv2.rectangle(vis_image, (x1, y1), (x2, y2), det_color, 3) cv2.putText(vis_image, f"Pred#{idx+1} ({case['confidence']:.2f})", (x1, y2 + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.7, det_color, 2) # Add summary info info_y = 30 cv2.putText(vis_image, f"{len(cases)} objects with heading errors", (10, info_y), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 2) # Show error details for each object for idx, case in enumerate(cases): info_text = f"#{idx+1}: {case['heading_error_deg']:.1f}° ({case['class']})" if case['is_reversal']: info_text += " REVERSAL" cv2.putText(vis_image, info_text, (10, info_y + 35 + idx * 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) return vis_image def visualize_angle(case): """Generate angle comparison visualization. Creates a polar plot showing GT and predicted angles. Args: case: Case dict from bad_heading_cases.json Returns: fig: Matplotlib figure """ fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5)) # Polar plot ax1 = plt.subplot(121, projection='polar') gt_angle = case['gt_rotation'] det_angle = case['det_rotation'] error = case['heading_error'] # Plot angles ax1.plot([gt_angle, gt_angle], [0, 1], 'g-', linewidth=3, label='GT') ax1.plot([det_angle, det_angle], [0, 1], 'r-', linewidth=3, label='Pred') ax1.scatter([gt_angle], [1], c='green', s=200, marker='^', zorder=10) ax1.scatter([det_angle], [1], c='red', s=200, marker='v', zorder=10) ax1.set_ylim(0, 1.2) ax1.set_theta_zero_location('E') # 0° points right (camera x-axis) ax1.set_theta_direction(-1) # Clockwise (90° points down) ax1.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1)) ax1.set_title(f'Heading Comparison\nError: {np.degrees(error):.1f}°', fontsize=12, pad=20) # Bar plot ax2 = plt.subplot(122) categories = ['GT', 'Pred'] angles_deg = [np.degrees(gt_angle), np.degrees(det_angle)] colors_bar = ['green', 'red'] bars = ax2.bar(categories, angles_deg, color=colors_bar, alpha=0.7, edgecolor='black', linewidth=2) ax2.set_ylabel('Rotation Angle (degrees)', fontsize=11) ax2.set_title(f'Rotation Comparison', fontsize=12) ax2.grid(True, axis='y', alpha=0.3) ax2.axhline(0, color='black', linewidth=0.5) # Add value labels on bars for bar, angle in zip(bars, angles_deg): height = bar.get_height() ax2.text(bar.get_x() + bar.get_width()/2., height, f'{angle:.1f}°', ha='center', va='bottom' if height > 0 else 'top', fontsize=10) # Add case info fig.text(0.5, 0.02, f"Case: {case['case_id'][:20]}... | Frame: {case['frame_id'][:30]}... | " f"Class: {case['class']} | Distance: {case['distance']:.1f}m | " f"Confidence: {case['confidence']:.2f}", ha='center', fontsize=9, style='italic') plt.tight_layout(rect=[0, 0.05, 1, 1]) return fig def visualize_combined(case, image_path, bev_range=(-20, 20, 0, 80)): """Generate combined visualization with BEV, image, and stats. Args: case: Case dict from bad_heading_cases.json image_path: Path to image file bev_range: BEV range for visualization Returns: fig: Matplotlib figure with combined view """ fig = plt.figure(figsize=(16, 10)) gs = fig.add_gridspec(2, 2, height_ratios=[1, 1], width_ratios=[1, 1], hspace=0.3, wspace=0.3) # BEV view (top-left) ax1 = fig.add_subplot(gs[0, 0]) bev_canvas = visualize_bev(case, bev_range) ax1.imshow(cv2.cvtColor(bev_canvas, cv2.COLOR_BGR2RGB)) ax1.set_title('Bird\'s Eye View (BEV)', fontsize=14, fontweight='bold') ax1.axis('off') # Image view (top-right) ax2 = fig.add_subplot(gs[0, 1]) vis_image = visualize_image(case, image_path) ax2.imshow(cv2.cvtColor(vis_image, cv2.COLOR_BGR2RGB)) ax2.set_title('Image View', fontsize=14, fontweight='bold') ax2.axis('off') # Polar angle plot (bottom-left) ax3 = fig.add_subplot(gs[1, 0], projection='polar') gt_angle = case['gt_rotation'] det_angle = case['det_rotation'] ax3.plot([gt_angle, gt_angle], [0, 1], 'g-', linewidth=3, label='GT') ax3.plot([det_angle, det_angle], [0, 1], 'r-', linewidth=3, label='Pred') ax3.scatter([gt_angle], [1], c='green', s=200, marker='^', zorder=10) ax3.scatter([det_angle], [1], c='red', s=200, marker='v', zorder=10) ax3.set_ylim(0, 1.2) ax3.set_theta_zero_location('E') # 0° points right (camera x-axis) ax3.set_theta_direction(-1) # Clockwise (90° points down) ax3.legend(loc='upper right', fontsize=10) ax3.set_title('Heading Comparison', fontsize=14, fontweight='bold', pad=20) # Statistics panel (bottom-right) ax4 = fig.add_subplot(gs[1, 1]) ax4.axis('off') # Create statistics text stats_text = f""" CASE INFORMATION ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Case ID: {case['case_id'][:40]}... Frame ID: {case['frame_id'][:40]}... Class: {case['class'].upper()} DETECTION METRICS ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Confidence: {case['confidence']:.3f} IoU: {case['iou']:.3f} Distance: {case['distance']:.2f} m HEADING ERROR ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Error: {case['heading_error_deg']:.2f}° ({case['heading_error']:.3f} rad) GT Angle: {case['gt_rotation_deg']:.2f}° Pred Angle: {case['det_rotation_deg']:.2f}° Reversal: {'YES - 180° ERROR' if case['is_reversal'] else 'No'} POSITION ERROR ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Lateral: {case['lateral_error']:.3f} m Longitudinal: {case['longitudinal_error']:.3f} m 3D CENTERS ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ GT: [{case['gt_center'][0]:6.2f}, {case['gt_center'][1]:6.2f}, {case['gt_center'][2]:6.2f}] Pred: [{case['det_center'][0]:6.2f}, {case['det_center'][1]:6.2f}, {case['det_center'][2]:6.2f}] """ # Add colored background for reversal errors if case['is_reversal']: ax4.add_patch(Rectangle((0, 0), 1, 1, transform=ax4.transAxes, facecolor='red', alpha=0.1)) ax4.text(0.05, 0.95, stats_text.strip(), transform=ax4.transAxes, fontsize=10, verticalalignment='top', fontfamily='monospace', bbox=dict(boxstyle='round', facecolor='wheat' if case['is_reversal'] else 'lightblue', alpha=0.3, edgecolor='black', linewidth=2)) # Add title fig.suptitle(f"Heading Error Analysis - {case['class'].upper()} (Error: {case['heading_error_deg']:.1f}°)", fontsize=16, fontweight='bold', y=0.98) return fig def visualize_combined_multi(cases, image_path, bev_range=(-20, 20, 0, 80)): """Generate combined visualization for multiple objects in the same frame. Args: cases: List of case dicts from bad_heading_cases.json image_path: Path to image file bev_range: BEV range for visualization Returns: fig: Matplotlib figure with combined view """ fig = plt.figure(figsize=(16, 10)) gs = fig.add_gridspec(2, 2, height_ratios=[1, 1], width_ratios=[1, 1], hspace=0.3, wspace=0.3) # BEV view (top-left) - merged ax1 = fig.add_subplot(gs[0, 0]) bev_canvas = visualize_bev_multi(cases, bev_range) ax1.imshow(cv2.cvtColor(bev_canvas, cv2.COLOR_BGR2RGB)) ax1.set_title('Bird\'s Eye View (BEV)', fontsize=14, fontweight='bold') ax1.axis('off') # Image view (top-right) - merged ax2 = fig.add_subplot(gs[0, 1]) vis_image = visualize_image_multi(cases, image_path) ax2.imshow(cv2.cvtColor(vis_image, cv2.COLOR_BGR2RGB)) ax2.set_title('Image View', fontsize=14, fontweight='bold') ax2.axis('off') # Polar angle plot (bottom-left) - show all objects ax3 = fig.add_subplot(gs[1, 0], projection='polar') colors_gt = ['green', 'darkgreen', 'lightgreen', 'seagreen'] colors_det = ['red', 'darkred', 'lightcoral', 'crimson'] for idx, case in enumerate(cases): gt_angle = case['gt_rotation'] det_angle = case['det_rotation'] color_gt = colors_gt[idx % len(colors_gt)] color_det = colors_det[idx % len(colors_det)] # Plot with different line styles for different objects linestyle = ['-', '--', '-.', ':'][idx % 4] ax3.plot([gt_angle, gt_angle], [0, 1], color=color_gt, linewidth=2, linestyle=linestyle, label=f'GT#{idx+1}', alpha=0.8) ax3.plot([det_angle, det_angle], [0, 1], color=color_det, linewidth=2, linestyle=linestyle, label=f'Pred#{idx+1}', alpha=0.8) ax3.scatter([gt_angle], [1], c=color_gt, s=150, marker='^', zorder=10, alpha=0.8) ax3.scatter([det_angle], [1], c=color_det, s=150, marker='v', zorder=10, alpha=0.8) ax3.set_ylim(0, 1.2) ax3.set_theta_zero_location('E') # 0° points right (camera x-axis) ax3.set_theta_direction(-1) # Clockwise (90° points down) ax3.legend(loc='upper right', fontsize=8, ncol=2) ax3.set_title('Heading Comparison (All Objects)', fontsize=14, fontweight='bold', pad=20) # Statistics panel (bottom-right) - summary of all objects ax4 = fig.add_subplot(gs[1, 1]) ax4.axis('off') # Create statistics text for all objects stats_lines = [] stats_lines.append("MULTIPLE OBJECTS ANALYSIS") stats_lines.append("═" * 42) stats_lines.append(f"Frame: {cases[0]['frame_id'][:35]}...") stats_lines.append(f"Case: {cases[0]['case_id'][:35]}...") stats_lines.append(f"Total objects: {len(cases)}") stats_lines.append("") for idx, case in enumerate(cases): stats_lines.append(f"OBJECT #{idx+1} - {case['class'].upper()}") stats_lines.append("─" * 42) stats_lines.append(f"Heading Error: {case['heading_error_deg']:6.1f}° {'(REVERSAL)' if case['is_reversal'] else ''}") stats_lines.append(f"GT Angle: {case['gt_rotation_deg']:6.1f}°") stats_lines.append(f"Pred Angle: {case['det_rotation_deg']:6.1f}°") stats_lines.append(f"Confidence: {case['confidence']:6.3f}") stats_lines.append(f"Distance: {case['distance']:6.1f} m") stats_lines.append(f"IoU: {case['iou']:6.3f}") stats_lines.append(f"Lateral Error: {case['lateral_error']:6.3f} m") stats_lines.append(f"Longitud. Err: {case['longitudinal_error']:6.3f} m") if idx < len(cases) - 1: stats_lines.append("") stats_text = "\n".join(stats_lines) # Add colored background if any reversal errors has_reversal = any(c['is_reversal'] for c in cases) if has_reversal: ax4.add_patch(Rectangle((0, 0), 1, 1, transform=ax4.transAxes, facecolor='red', alpha=0.1)) # Adjust font size based on number of objects fontsize = max(7, 10 - len(cases)) ax4.text(0.05, 0.95, stats_text, transform=ax4.transAxes, fontsize=fontsize, verticalalignment='top', fontfamily='monospace', bbox=dict(boxstyle='round', facecolor='wheat' if has_reversal else 'lightblue', alpha=0.3, edgecolor='black', linewidth=2)) # Add title max_error = max(c['heading_error_deg'] for c in cases) fig.suptitle(f"Multi-Object Heading Error Analysis - {len(cases)} Objects (Max Error: {max_error:.1f}°)", fontsize=16, fontweight='bold', y=0.98) return fig def process_single_case(case_data): """Process a single case for visualization. Args: case_data: Tuple of (case, args_dict, image_root, output_dir, frame_groups, obj_idx, suffix) Returns: Tuple of (success, case_id, error_msg) """ case, args_dict, image_root, output_dir, frame_groups, obj_idx, suffix = case_data case_id = case['case_id'] frame_id = case['frame_id'] viz_types = args_dict['viz_types'] dpi = args_dict['dpi'] try: # Find image file image_path = find_image_file(image_root, case_id, frame_id) # Generate BEV if 'bev' in viz_types: bev_canvas = visualize_bev(case) bev_case_dir = output_dir / 'bev' / case_id bev_case_dir.mkdir(parents=True, exist_ok=True) bev_path = bev_case_dir / f'{frame_id}{suffix}_bev.jpg' cv2.imwrite(str(bev_path), bev_canvas) # Generate image visualization if 'image' in viz_types: vis_image = visualize_image(case, image_path) img_case_dir = output_dir / 'image' / case_id img_case_dir.mkdir(parents=True, exist_ok=True) img_path = img_case_dir / f'{frame_id}{suffix}_image.jpg' cv2.imwrite(str(img_path), vis_image) # Generate angle plot if 'angle' in viz_types: fig_angle = visualize_angle(case) angle_case_dir = output_dir / 'angle' / case_id angle_case_dir.mkdir(parents=True, exist_ok=True) angle_path = angle_case_dir / f'{frame_id}{suffix}_angle.png' fig_angle.savefig(angle_path, dpi=dpi, bbox_inches='tight') plt.close(fig_angle) # Generate combined view if 'combined' in viz_types: fig_combined = visualize_combined(case, image_path) combined_case_dir = output_dir / 'combined' / case_id combined_case_dir.mkdir(parents=True, exist_ok=True) combined_path = combined_case_dir / f'{frame_id}{suffix}_combined.png' fig_combined.savefig(combined_path, dpi=dpi, bbox_inches='tight') plt.close(fig_combined) return (True, case_id, None) except Exception as e: return (False, case_id, str(e)) def process_merged_frame(frame_data): """Process a frame with merged objects. Args: frame_data: Tuple of (frame_key, frame_cases, args_dict, image_root, output_dir) Returns: Tuple of (success, case_id, error_msg) """ frame_key, frame_cases, args_dict, image_root, output_dir = frame_data case_id, frame_id = frame_key viz_types = args_dict['viz_types'] dpi = args_dict['dpi'] try: # Find image file image_path = find_image_file(image_root, case_id, frame_id) # Generate merged BEV and image if 'bev' in viz_types: bev_canvas = visualize_bev_multi(frame_cases) bev_case_dir = output_dir / 'bev' / case_id bev_case_dir.mkdir(parents=True, exist_ok=True) bev_path = bev_case_dir / f'{frame_id}_bev.jpg' cv2.imwrite(str(bev_path), bev_canvas) if 'image' in viz_types: vis_image = visualize_image_multi(frame_cases, image_path) img_case_dir = output_dir / 'image' / case_id img_case_dir.mkdir(parents=True, exist_ok=True) img_path = img_case_dir / f'{frame_id}_image.jpg' cv2.imwrite(str(img_path), vis_image) # Generate merged combined view for all objects if 'combined' in viz_types: fig_combined = visualize_combined_multi(frame_cases, image_path) combined_case_dir = output_dir / 'combined' / case_id combined_case_dir.mkdir(parents=True, exist_ok=True) combined_path = combined_case_dir / f'{frame_id}_combined.png' fig_combined.savefig(combined_path, dpi=dpi, bbox_inches='tight') plt.close(fig_combined) # Generate separate angle plots for each object (cannot be merged) for obj_idx, case in enumerate(frame_cases): if 'angle' in viz_types: fig_angle = visualize_angle(case) angle_case_dir = output_dir / 'angle' / case_id angle_case_dir.mkdir(parents=True, exist_ok=True) angle_path = angle_case_dir / f'{frame_id}_obj{obj_idx+1}_angle.png' fig_angle.savefig(angle_path, dpi=dpi, bbox_inches='tight') plt.close(fig_angle) return (True, case_id, None) except Exception as e: return (False, case_id, str(e)) def main(): args = parse_args() # Load bad cases input_path = Path(args.input) if not input_path.exists(): print(f"Error: Input file not found: {input_path}") sys.exit(1) print(f"Loading bad cases from {input_path}...") with open(input_path, 'r') as f: data = json.load(f) cases = data['cases'] metadata = data['metadata'] print(f"Loaded {len(cases)} bad cases") print(f"Source: {metadata['source']}") print(f"Threshold: {metadata['threshold']:.2f} rad ({metadata['threshold_degrees']:.1f}°)") # Limit cases if requested if args.max_cases: cases = cases[:args.max_cases] print(f"Limited to {len(cases)} cases") # Group cases by frame for statistics from collections import defaultdict frame_groups = defaultdict(list) for idx, case in enumerate(cases): key = (case['case_id'], case['frame_id']) frame_groups[key].append((idx, case)) multi_target_frames = {k: v for k, v in frame_groups.items() if len(v) > 1} print(f"\nFrame statistics:") print(f" Total cases: {len(cases)}") print(f" Unique frames: {len(frame_groups)}") print(f" Frames with multiple targets: {len(multi_target_frames)}") if multi_target_frames: max_targets = max(len(v) for v in multi_target_frames.values()) print(f" Max targets in one frame: {max_targets}") if args.merge_same_frame and multi_target_frames: print(f"\n⚠️ Merge mode enabled: BEV/image/combined will show all targets in the same frame") print(f" Angle plots will still generate separate files for each target") elif multi_target_frames: print(f"\n⚠️ Multiple targets per frame detected!") print(f" Each target will get a unique file: {{frame_id}}_obj{{N}}_{{viz_type}}.ext") print(f" Use --merge-same-frame to draw all targets on the same BEV/image") # Create output directory output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) for viz_type in args.viz_types: (output_dir / viz_type).mkdir(exist_ok=True) print(f"\nOutput directory: {output_dir}") print(f"Visualization types: {args.viz_types}") # Determine number of workers num_workers = args.workers if num_workers < 1: num_workers = 1 elif num_workers > cpu_count(): print(f"Warning: Requested {num_workers} workers but only {cpu_count()} CPUs available") num_workers = cpu_count() use_multiprocessing = num_workers > 1 print(f"\nProcessing mode: {'Multiprocessing' if use_multiprocessing else 'Single process'}") if use_multiprocessing: print(f"Workers: {num_workers}") # Process cases print(f"\nGenerating visualizations...") # Prepare args dict for worker processes args_dict = { 'viz_types': args.viz_types, 'dpi': args.dpi } if args.merge_same_frame and ('bev' in args.viz_types or 'image' in args.viz_types or 'combined' in args.viz_types): # Prepare data for merged frame processing frame_data_list = [] for frame_key, objects in frame_groups.items(): frame_cases = [c for _, c in objects] frame_data = (frame_key, frame_cases, args_dict, args.image_root, output_dir) frame_data_list.append(frame_data) # Process with multiprocessing or single process if use_multiprocessing: with Pool(num_workers) as pool: results = list(tqdm( pool.imap(process_merged_frame, frame_data_list), total=len(frame_data_list), desc="Processing frames" )) else: results = [] for frame_data in tqdm(frame_data_list, desc="Processing frames"): results.append(process_merged_frame(frame_data)) # Report errors errors = [(case_id, err) for success, case_id, err in results if not success] if errors: print(f"\n{len(errors)} frames had errors:") for case_id, err in errors[:10]: # Show first 10 print(f" {case_id}: {err}") if len(errors) > 10: print(f" ... and {len(errors) - 10} more") # Original sequential code (kept for reference, but not executed) if False: processed_frames = set() for i, case in enumerate(tqdm(cases, desc="Processing cases")): case_id = case['case_id'] frame_id = case['frame_id'] frame_key = (case_id, frame_id) # Find image file image_path = find_image_file(args.image_root, case_id, frame_id) # Get all objects in this frame frame_objects = frame_groups[frame_key] obj_idx = next(idx for idx, (j, _) in enumerate(frame_objects) if j == i) try: # For BEV and image: merge all objects in the same frame if frame_key not in processed_frames: if 'bev' in args.viz_types: bev_canvas = visualize_bev_multi([c for _, c in frame_objects]) bev_case_dir = output_dir / 'bev' / case_id bev_case_dir.mkdir(parents=True, exist_ok=True) bev_path = bev_case_dir / f'{frame_id}_bev.jpg' cv2.imwrite(str(bev_path), bev_canvas) if 'image' in args.viz_types: vis_image = visualize_image_multi([c for _, c in frame_objects], image_path) img_case_dir = output_dir / 'image' / case_id img_case_dir.mkdir(parents=True, exist_ok=True) img_path = img_case_dir / f'{frame_id}_image.jpg' cv2.imwrite(str(img_path), vis_image) processed_frames.add(frame_key) # For angle and combined: generate separate files for each object if 'angle' in args.viz_types: fig_angle = visualize_angle(case) angle_case_dir = output_dir / 'angle' / case_id angle_case_dir.mkdir(parents=True, exist_ok=True) angle_path = angle_case_dir / f'{frame_id}_obj{obj_idx+1}_angle.png' fig_angle.savefig(angle_path, dpi=args.dpi, bbox_inches='tight') plt.close(fig_angle) if 'combined' in args.viz_types: fig_combined = visualize_combined(case, image_path) combined_case_dir = output_dir / 'combined' / case_id combined_case_dir.mkdir(parents=True, exist_ok=True) combined_path = combined_case_dir / f'{frame_id}_obj{obj_idx+1}_combined.png' fig_combined.savefig(combined_path, dpi=args.dpi, bbox_inches='tight') plt.close(fig_combined) except Exception as e: print(f"\nError processing case {i} ({case_id}): {e}") continue else: # Prepare data for separate object processing case_data_list = [] for i, case in enumerate(cases): case_id = case['case_id'] frame_id = case['frame_id'] frame_key = (case_id, frame_id) # Determine object index for this frame frame_objects = frame_groups[frame_key] obj_idx = next(idx for idx, (j, _) in enumerate(frame_objects) if j == i) # Add object index to filename if multiple objects in frame if len(frame_objects) > 1: suffix = f'_obj{obj_idx+1}' else: suffix = '' case_data = (case, args_dict, args.image_root, output_dir, frame_groups, obj_idx, suffix) case_data_list.append(case_data) # Process with multiprocessing or single process if use_multiprocessing: with Pool(num_workers) as pool: results = list(tqdm( pool.imap(process_single_case, case_data_list), total=len(case_data_list), desc="Processing cases" )) else: results = [] for case_data in tqdm(case_data_list, desc="Processing cases"): results.append(process_single_case(case_data)) # Report errors errors = [(case_id, err) for success, case_id, err in results if not success] if errors: print(f"\n{len(errors)} cases had errors:") for case_id, err in errors[:10]: # Show first 10 print(f" {case_id}: {err}") if len(errors) > 10: print(f" ... and {len(errors) - 10} more") # Original sequential code (kept for reference, but not executed) if False: for i, case in enumerate(tqdm(cases, desc="Processing cases")): case_id = case['case_id'] frame_id = case['frame_id'] frame_key = (case_id, frame_id) # Find image file image_path = find_image_file(args.image_root, case_id, frame_id) # Determine object index for this frame frame_objects = frame_groups[frame_key] obj_idx = next(idx for idx, (j, _) in enumerate(frame_objects) if j == i) # Add object index to filename if multiple objects in frame if len(frame_objects) > 1: suffix = f'_obj{obj_idx+1}' else: suffix = '' try: # Generate BEV if 'bev' in args.viz_types: bev_canvas = visualize_bev(case) bev_case_dir = output_dir / 'bev' / case_id bev_case_dir.mkdir(parents=True, exist_ok=True) bev_path = bev_case_dir / f'{frame_id}{suffix}_bev.jpg' cv2.imwrite(str(bev_path), bev_canvas) # Generate image visualization if 'image' in args.viz_types: vis_image = visualize_image(case, image_path) img_case_dir = output_dir / 'image' / case_id img_case_dir.mkdir(parents=True, exist_ok=True) img_path = img_case_dir / f'{frame_id}{suffix}_image.jpg' cv2.imwrite(str(img_path), vis_image) # Generate angle plot if 'angle' in args.viz_types: fig_angle = visualize_angle(case) angle_case_dir = output_dir / 'angle' / case_id angle_case_dir.mkdir(parents=True, exist_ok=True) angle_path = angle_case_dir / f'{frame_id}{suffix}_angle.png' fig_angle.savefig(angle_path, dpi=args.dpi, bbox_inches='tight') plt.close(fig_angle) # Generate combined view if 'combined' in args.viz_types: fig_combined = visualize_combined(case, image_path) combined_case_dir = output_dir / 'combined' / case_id combined_case_dir.mkdir(parents=True, exist_ok=True) combined_path = combined_case_dir / f'{frame_id}{suffix}_combined.png' fig_combined.savefig(combined_path, dpi=args.dpi, bbox_inches='tight') plt.close(fig_combined) except Exception as e: print(f"\nError processing case {i} ({case_id}): {e}") continue print(f"\n{'='*80}") print(f"Visualization complete!") print(f"{'='*80}") print(f"Output directory: {output_dir}") print(f"Total cases processed: {len(cases)}") # Count files by traversing directories for viz_type in args.viz_types: viz_dir = output_dir / viz_type # Count all files recursively num_files = len(list(viz_dir.rglob('*.*'))) num_cases = len(list(viz_dir.glob('*'))) print(f" - {viz_type}: {num_files} files in {num_cases} case directories") print(f"\nOutput structure:") print(f" {output_dir}/") for viz_type in args.viz_types: print(f" ├── {viz_type}/") print(f" │ ├── {{case_id}}/") print(f" │ │ └── {{frame_id}}_{viz_type}.{{'jpg' if viz_type in ['bev', 'image'] else 'png'}}") print(f"\nYou can now:") print(f" 1. Browse visualizations in {output_dir}") print(f" 2. Use heading_error_viewer.py for interactive viewing") print(f" 3. Generate report with generate_heading_report.py") print(f"\nExample files:") # Show first file from each type for viz_type in args.viz_types: viz_dir = output_dir / viz_type files = list(viz_dir.rglob('*.*')) if files: rel_path = files[0].relative_to(output_dir) print(f" {rel_path}") if __name__ == '__main__': main()