#!/usr/bin/env python3 """ Heading Error Analysis Tool This tool performs comprehensive analysis of heading errors between two models, focusing on understanding why one model has larger heading errors. Usage: python eval_tools/analyze_heading_errors.py \\ --common-matches eval_results/common_matches.json \\ --model1-matches eval_results/model1/detailed_3d_matches.json \\ --model2-matches eval_results/model2/detailed_3d_matches.json \\ --model1-name "mono3d" \\ --model2-name "yolov5s-300w" \\ --output-dir heading_analysis_results """ import argparse import json import numpy as np import matplotlib.pyplot as plt from pathlib import Path from collections import defaultdict import pandas as pd from scipy import stats class HeadingErrorAnalyzer: """Analyze heading errors between two models.""" def __init__(self, common_matches_data, model1_matches, model2_matches, model1_name="Model1", model2_name="Model2"): """ Initialize analyzer. Args: common_matches_data: dict, common matches data model1_matches: dict, model1 detailed matches model2_matches: dict, model2 detailed matches model1_name: str, model1 name model2_name: str, model2 name """ self.common_matches_data = common_matches_data self.model1_matches = model1_matches self.model2_matches = model2_matches self.model1_name = model1_name self.model2_name = model2_name # Extract heading errors for common matches self.data = self._extract_heading_data() def _extract_heading_data(self): """Extract heading error data for all common matches.""" data = { 'model1': defaultdict(list), 'model2': defaultdict(list), 'common': defaultdict(list) } common_matches = self.common_matches_data['common_matches'] for case_name, frames in common_matches.items(): for frame_name, classes in frames.items(): for class_name, match_list in classes.items(): for match_info in match_list: # Get match indices m1_idx = match_info['model1_idx'] m2_idx = match_info['model2_idx'] # Get match data m1_match = self.model1_matches[case_name][frame_name][class_name][m1_idx] m2_match = self.model2_matches[case_name][frame_name][class_name][m2_idx] # Extract information item = { 'case': case_name, 'frame': frame_name, 'class': class_name, 'gt_rotation': m1_match.get('gt_rotation', 0), 'model1_rotation': m1_match.get('det_rotation', 0), 'model2_rotation': m2_match.get('det_rotation', 0), 'model1_error': m1_match['errors']['heading'], 'model2_error': m2_match['errors']['heading'], 'lateral_dist': m1_match['distance']['lateral'], 'longitudinal_dist': m1_match['distance']['longitudinal'], 'iou': m1_match['iou'], 'confidence': m1_match['confidence'] } data['model1'][class_name].append(m1_match['errors']['heading']) data['model2'][class_name].append(m2_match['errors']['heading']) data['common'][class_name].append(item) return data def generate_distribution_analysis(self, output_dir): """Generate distribution analysis and plots.""" print("\n" + "="*80) print("Heading Error Distribution Analysis") print("="*80) output_dir = Path(output_dir) / 'distribution' output_dir.mkdir(parents=True, exist_ok=True) results = {} for class_name in self.data['model1'].keys(): m1_errors = np.array(self.data['model1'][class_name]) m2_errors = np.array(self.data['model2'][class_name]) # Calculate statistics stats_data = { 'class': class_name, 'count': len(m1_errors), 'model1': { 'mean': float(np.mean(m1_errors)), 'median': float(np.median(m1_errors)), 'std': float(np.std(m1_errors)), 'p50': float(np.percentile(m1_errors, 50)), 'p75': float(np.percentile(m1_errors, 75)), 'p90': float(np.percentile(m1_errors, 90)), 'p95': float(np.percentile(m1_errors, 95)), 'p99': float(np.percentile(m1_errors, 99)) }, 'model2': { 'mean': float(np.mean(m2_errors)), 'median': float(np.median(m2_errors)), 'std': float(np.std(m2_errors)), 'p50': float(np.percentile(m2_errors, 50)), 'p75': float(np.percentile(m2_errors, 75)), 'p90': float(np.percentile(m2_errors, 90)), 'p95': float(np.percentile(m2_errors, 95)), 'p99': float(np.percentile(m2_errors, 99)) } } results[class_name] = stats_data # Print results print(f"\n{class_name.upper()} (n={stats_data['count']:,}):") print(f" {self.model1_name}:") print(f" Mean: {stats_data['model1']['mean']:.4f} rad") print(f" Median: {stats_data['model1']['median']:.4f} rad") print(f" P90: {stats_data['model1']['p90']:.4f} rad") print(f" P95: {stats_data['model1']['p95']:.4f} rad") print(f" {self.model2_name}:") print(f" Mean: {stats_data['model2']['mean']:.4f} rad") print(f" Median: {stats_data['model2']['median']:.4f} rad") print(f" P90: {stats_data['model2']['p90']:.4f} rad") print(f" P95: {stats_data['model2']['p95']:.4f} rad") print(f" Change:") print(f" Mean: +{(stats_data['model2']['mean'] - stats_data['model1']['mean']):.4f} rad " f"({((stats_data['model2']['mean'] / stats_data['model1']['mean'] - 1) * 100):.1f}%)") # Create plots fig, axes = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle(f'Heading Error Distribution - {class_name}', fontsize=16) # Histogram axes[0, 0].hist(m1_errors, bins=50, alpha=0.5, label=self.model1_name, density=True) axes[0, 0].hist(m2_errors, bins=50, alpha=0.5, label=self.model2_name, density=True) axes[0, 0].set_xlabel('Heading Error (rad)') axes[0, 0].set_ylabel('Density') axes[0, 0].set_title('Histogram') axes[0, 0].legend() axes[0, 0].grid(True, alpha=0.3) # CDF m1_sorted = np.sort(m1_errors) m2_sorted = np.sort(m2_errors) axes[0, 1].plot(m1_sorted, np.arange(len(m1_sorted)) / len(m1_sorted), label=self.model1_name) axes[0, 1].plot(m2_sorted, np.arange(len(m2_sorted)) / len(m2_sorted), label=self.model2_name) axes[0, 1].set_xlabel('Heading Error (rad)') axes[0, 1].set_ylabel('Cumulative Probability') axes[0, 1].set_title('Cumulative Distribution Function') axes[0, 1].legend() axes[0, 1].grid(True, alpha=0.3) # Box plot axes[1, 0].boxplot([m1_errors, m2_errors], labels=[self.model1_name, self.model2_name]) axes[1, 0].set_ylabel('Heading Error (rad)') axes[1, 0].set_title('Box Plot') axes[1, 0].grid(True, alpha=0.3) # Q-Q plot stats.probplot(m2_errors - m1_errors, dist="norm", plot=axes[1, 1]) axes[1, 1].set_title('Q-Q Plot (Error Difference)') axes[1, 1].grid(True, alpha=0.3) plt.tight_layout() plt.savefig(output_dir / f'{class_name}_distribution.png', dpi=150) plt.close() # Save results with open(output_dir / 'statistics.json', 'w') as f: json.dump(results, f, indent=2) print(f"\n✓ Distribution analysis saved to: {output_dir}") return results def generate_distance_analysis(self, output_dir): """Analyze heading errors by distance ranges.""" print("\n" + "="*80) print("Heading Error by Distance Analysis") print("="*80) output_dir = Path(output_dir) / 'distance' output_dir.mkdir(parents=True, exist_ok=True) # Define distance ranges long_ranges = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100)] lat_ranges = [(-30, -10), (-10, 0), (0, 10), (10, 30)] for class_name, items in self.data['common'].items(): # Analyze by longitudinal distance long_stats = {} for range_start, range_end in long_ranges: range_key = f"{range_start}-{range_end}m" m1_errors = [] m2_errors = [] for item in items: dist = item['longitudinal_dist'] if range_start <= dist < range_end: m1_errors.append(item['model1_error']) m2_errors.append(item['model2_error']) if len(m1_errors) > 0: long_stats[range_key] = { 'count': len(m1_errors), 'model1_mean': float(np.mean(m1_errors)), 'model2_mean': float(np.mean(m2_errors)), 'diff': float(np.mean(m2_errors) - np.mean(m1_errors)) } # Plot longitudinal distance analysis if long_stats: fig, ax = plt.subplots(figsize=(12, 6)) ranges = list(long_stats.keys()) m1_means = [long_stats[r]['model1_mean'] for r in ranges] m2_means = [long_stats[r]['model2_mean'] for r in ranges] x = np.arange(len(ranges)) width = 0.35 ax.bar(x - width/2, m1_means, width, label=self.model1_name) ax.bar(x + width/2, m2_means, width, label=self.model2_name) ax.set_xlabel('Longitudinal Distance Range') ax.set_ylabel('Mean Heading Error (rad)') ax.set_title(f'Heading Error by Longitudinal Distance - {class_name}') ax.set_xticks(x) ax.set_xticklabels(ranges) ax.legend() ax.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(output_dir / f'{class_name}_longitudinal.png', dpi=150) plt.close() print(f"\n{class_name.upper()} - Longitudinal Distance:") for range_key, data in long_stats.items(): print(f" {range_key}: {self.model1_name}={data['model1_mean']:.4f}, " f"{self.model2_name}={data['model2_mean']:.4f}, " f"diff={data['diff']:+.4f} (n={data['count']})") print(f"\n✓ Distance analysis saved to: {output_dir}") def identify_bad_cases(self, output_dir, threshold=1.0): """Identify cases with large heading errors.""" print("\n" + "="*80) print(f"Identifying Bad Cases (threshold > {threshold} rad)") print("="*80) output_dir = Path(output_dir) / 'bad_cases' output_dir.mkdir(parents=True, exist_ok=True) bad_cases = [] for class_name, items in self.data['common'].items(): for item in items: # Check if either model has large error if item['model2_error'] > threshold: bad_cases.append({ 'case': item['case'], 'frame': item['frame'], 'class': class_name, 'gt_rotation': item['gt_rotation'], 'model1_rotation': item['model1_rotation'], 'model2_rotation': item['model2_rotation'], 'model1_error': item['model1_error'], 'model2_error': item['model2_error'], 'error_increase': item['model2_error'] - item['model1_error'], 'longitudinal_dist': item['longitudinal_dist'], 'lateral_dist': item['lateral_dist'], 'iou': item['iou'], 'confidence': item['confidence'] }) # Sort by model2 error bad_cases.sort(key=lambda x: x['model2_error'], reverse=True) # Save to CSV if bad_cases: df = pd.DataFrame(bad_cases) csv_path = output_dir / 'bad_cases.csv' df.to_csv(csv_path, index=False) print(f"\nFound {len(bad_cases)} bad cases:") print(f" Saved to: {csv_path}") print(f"\nTop 10 worst cases:") print(df.head(10).to_string(index=False)) else: print(f"\nNo bad cases found with threshold > {threshold} rad") print(f"\n✓ Bad cases analysis saved to: {output_dir}") return bad_cases def generate_summary_report(self, output_dir): """Generate summary text report.""" report_path = output_dir / 'heading_analysis_summary.txt' with open(report_path, 'w') as f: f.write("="*80 + "\n") f.write("HEADING ERROR ANALYSIS SUMMARY\n") f.write("="*80 + "\n\n") f.write(f"Model 1: {self.model1_name}\n") f.write(f"Model 2: {self.model2_name}\n\n") f.write("Overall Statistics by Class:\n") f.write("-"*80 + "\n\n") for class_name in self.data['model1'].keys(): m1_errors = np.array(self.data['model1'][class_name]) m2_errors = np.array(self.data['model2'][class_name]) f.write(f"{class_name.upper()} (n={len(m1_errors):,}):\n") f.write(f" {self.model1_name}:\n") f.write(f" Mean: {np.mean(m1_errors):.4f} rad ({np.degrees(np.mean(m1_errors)):.2f}°)\n") f.write(f" Median: {np.median(m1_errors):.4f} rad ({np.degrees(np.median(m1_errors)):.2f}°)\n") f.write(f" Std: {np.std(m1_errors):.4f} rad\n") f.write(f" {self.model2_name}:\n") f.write(f" Mean: {np.mean(m2_errors):.4f} rad ({np.degrees(np.mean(m2_errors)):.2f}°)\n") f.write(f" Median: {np.median(m2_errors):.4f} rad ({np.degrees(np.median(m2_errors)):.2f}°)\n") f.write(f" Std: {np.std(m2_errors):.4f} rad\n") diff_mean = np.mean(m2_errors) - np.mean(m1_errors) pct_change = (np.mean(m2_errors) / np.mean(m1_errors) - 1) * 100 f.write(f" Change:\n") f.write(f" Mean: +{diff_mean:.4f} rad ({pct_change:+.2f}%)\n\n") print(f"\n✓ Summary report saved to: {report_path}") def main(): """Main function.""" parser = argparse.ArgumentParser(description='Analyze heading errors between two models') parser.add_argument('--common-matches', type=str, required=True, help='Path to common_matches.json') parser.add_argument('--model1-matches', type=str, required=True, help='Path to model1 detailed_3d_matches.json') parser.add_argument('--model2-matches', type=str, required=True, help='Path to model2 detailed_3d_matches.json') parser.add_argument('--model1-name', type=str, default='Model1', help='Name of model 1') parser.add_argument('--model2-name', type=str, default='Model2', help='Name of model 2') parser.add_argument('--output-dir', type=str, default='heading_analysis', help='Output directory for analysis results') parser.add_argument('--bad-case-threshold', type=float, default=1.0, help='Threshold for identifying bad cases (radians)') args = parser.parse_args() # Load data print("Loading data...") with open(args.common_matches, 'r') as f: common_matches_data = json.load(f) with open(args.model1_matches, 'r') as f: model1_matches = json.load(f) with open(args.model2_matches, 'r') as f: model2_matches = json.load(f) # Create analyzer analyzer = HeadingErrorAnalyzer( common_matches_data, model1_matches, model2_matches, model1_name=args.model1_name, model2_name=args.model2_name ) # Create output directory output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) # Run analyses print("\n" + "="*80) print("HEADING ERROR ANALYSIS") print("="*80) analyzer.generate_distribution_analysis(output_dir) analyzer.generate_distance_analysis(output_dir) analyzer.identify_bad_cases(output_dir, threshold=args.bad_case_threshold) analyzer.generate_summary_report(output_dir) print("\n" + "="*80) print("ANALYSIS COMPLETE!") print("="*80) print(f"\nResults saved to: {output_dir}/") print("\nGenerated files:") print(" - heading_analysis_summary.txt: Text summary report") print(" - distribution/: Error distribution analysis and plots") print(" - distance/: Distance-based analysis and plots") print(" - bad_cases/bad_cases.csv: List of cases with large errors") print("") if __name__ == '__main__': main()