424 lines
18 KiB
Python
Executable File
424 lines
18 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Heading Error Analysis Tool
|
|
|
|
This tool performs comprehensive analysis of heading errors between two models,
|
|
focusing on understanding why one model has larger heading errors.
|
|
|
|
Usage:
|
|
python eval_tools/analyze_heading_errors.py \\
|
|
--common-matches eval_results/common_matches.json \\
|
|
--model1-matches eval_results/model1/detailed_3d_matches.json \\
|
|
--model2-matches eval_results/model2/detailed_3d_matches.json \\
|
|
--model1-name "mono3d" \\
|
|
--model2-name "yolov5s-300w" \\
|
|
--output-dir heading_analysis_results
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
import pandas as pd
|
|
from scipy import stats
|
|
|
|
|
|
class HeadingErrorAnalyzer:
|
|
"""Analyze heading errors between two models."""
|
|
|
|
def __init__(self, common_matches_data, model1_matches, model2_matches,
|
|
model1_name="Model1", model2_name="Model2"):
|
|
"""
|
|
Initialize analyzer.
|
|
|
|
Args:
|
|
common_matches_data: dict, common matches data
|
|
model1_matches: dict, model1 detailed matches
|
|
model2_matches: dict, model2 detailed matches
|
|
model1_name: str, model1 name
|
|
model2_name: str, model2 name
|
|
"""
|
|
self.common_matches_data = common_matches_data
|
|
self.model1_matches = model1_matches
|
|
self.model2_matches = model2_matches
|
|
self.model1_name = model1_name
|
|
self.model2_name = model2_name
|
|
|
|
# Extract heading errors for common matches
|
|
self.data = self._extract_heading_data()
|
|
|
|
def _extract_heading_data(self):
|
|
"""Extract heading error data for all common matches."""
|
|
data = {
|
|
'model1': defaultdict(list),
|
|
'model2': defaultdict(list),
|
|
'common': defaultdict(list)
|
|
}
|
|
|
|
common_matches = self.common_matches_data['common_matches']
|
|
|
|
for case_name, frames in common_matches.items():
|
|
for frame_name, classes in frames.items():
|
|
for class_name, match_list in classes.items():
|
|
for match_info in match_list:
|
|
# Get match indices
|
|
m1_idx = match_info['model1_idx']
|
|
m2_idx = match_info['model2_idx']
|
|
|
|
# Get match data
|
|
m1_match = self.model1_matches[case_name][frame_name][class_name][m1_idx]
|
|
m2_match = self.model2_matches[case_name][frame_name][class_name][m2_idx]
|
|
|
|
# Extract information
|
|
item = {
|
|
'case': case_name,
|
|
'frame': frame_name,
|
|
'class': class_name,
|
|
'gt_rotation': m1_match.get('gt_rotation', 0),
|
|
'model1_rotation': m1_match.get('det_rotation', 0),
|
|
'model2_rotation': m2_match.get('det_rotation', 0),
|
|
'model1_error': m1_match['errors']['heading'],
|
|
'model2_error': m2_match['errors']['heading'],
|
|
'lateral_dist': m1_match['distance']['lateral'],
|
|
'longitudinal_dist': m1_match['distance']['longitudinal'],
|
|
'iou': m1_match['iou'],
|
|
'confidence': m1_match['confidence']
|
|
}
|
|
|
|
data['model1'][class_name].append(m1_match['errors']['heading'])
|
|
data['model2'][class_name].append(m2_match['errors']['heading'])
|
|
data['common'][class_name].append(item)
|
|
|
|
return data
|
|
|
|
def generate_distribution_analysis(self, output_dir):
|
|
"""Generate distribution analysis and plots."""
|
|
print("\n" + "="*80)
|
|
print("Heading Error Distribution Analysis")
|
|
print("="*80)
|
|
|
|
output_dir = Path(output_dir) / 'distribution'
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
results = {}
|
|
|
|
for class_name in self.data['model1'].keys():
|
|
m1_errors = np.array(self.data['model1'][class_name])
|
|
m2_errors = np.array(self.data['model2'][class_name])
|
|
|
|
# Calculate statistics
|
|
stats_data = {
|
|
'class': class_name,
|
|
'count': len(m1_errors),
|
|
'model1': {
|
|
'mean': float(np.mean(m1_errors)),
|
|
'median': float(np.median(m1_errors)),
|
|
'std': float(np.std(m1_errors)),
|
|
'p50': float(np.percentile(m1_errors, 50)),
|
|
'p75': float(np.percentile(m1_errors, 75)),
|
|
'p90': float(np.percentile(m1_errors, 90)),
|
|
'p95': float(np.percentile(m1_errors, 95)),
|
|
'p99': float(np.percentile(m1_errors, 99))
|
|
},
|
|
'model2': {
|
|
'mean': float(np.mean(m2_errors)),
|
|
'median': float(np.median(m2_errors)),
|
|
'std': float(np.std(m2_errors)),
|
|
'p50': float(np.percentile(m2_errors, 50)),
|
|
'p75': float(np.percentile(m2_errors, 75)),
|
|
'p90': float(np.percentile(m2_errors, 90)),
|
|
'p95': float(np.percentile(m2_errors, 95)),
|
|
'p99': float(np.percentile(m2_errors, 99))
|
|
}
|
|
}
|
|
|
|
results[class_name] = stats_data
|
|
|
|
# Print results
|
|
print(f"\n{class_name.upper()} (n={stats_data['count']:,}):")
|
|
print(f" {self.model1_name}:")
|
|
print(f" Mean: {stats_data['model1']['mean']:.4f} rad")
|
|
print(f" Median: {stats_data['model1']['median']:.4f} rad")
|
|
print(f" P90: {stats_data['model1']['p90']:.4f} rad")
|
|
print(f" P95: {stats_data['model1']['p95']:.4f} rad")
|
|
print(f" {self.model2_name}:")
|
|
print(f" Mean: {stats_data['model2']['mean']:.4f} rad")
|
|
print(f" Median: {stats_data['model2']['median']:.4f} rad")
|
|
print(f" P90: {stats_data['model2']['p90']:.4f} rad")
|
|
print(f" P95: {stats_data['model2']['p95']:.4f} rad")
|
|
print(f" Change:")
|
|
print(f" Mean: +{(stats_data['model2']['mean'] - stats_data['model1']['mean']):.4f} rad "
|
|
f"({((stats_data['model2']['mean'] / stats_data['model1']['mean'] - 1) * 100):.1f}%)")
|
|
|
|
# Create plots
|
|
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
|
|
fig.suptitle(f'Heading Error Distribution - {class_name}', fontsize=16)
|
|
|
|
# Histogram
|
|
axes[0, 0].hist(m1_errors, bins=50, alpha=0.5, label=self.model1_name, density=True)
|
|
axes[0, 0].hist(m2_errors, bins=50, alpha=0.5, label=self.model2_name, density=True)
|
|
axes[0, 0].set_xlabel('Heading Error (rad)')
|
|
axes[0, 0].set_ylabel('Density')
|
|
axes[0, 0].set_title('Histogram')
|
|
axes[0, 0].legend()
|
|
axes[0, 0].grid(True, alpha=0.3)
|
|
|
|
# CDF
|
|
m1_sorted = np.sort(m1_errors)
|
|
m2_sorted = np.sort(m2_errors)
|
|
axes[0, 1].plot(m1_sorted, np.arange(len(m1_sorted)) / len(m1_sorted), label=self.model1_name)
|
|
axes[0, 1].plot(m2_sorted, np.arange(len(m2_sorted)) / len(m2_sorted), label=self.model2_name)
|
|
axes[0, 1].set_xlabel('Heading Error (rad)')
|
|
axes[0, 1].set_ylabel('Cumulative Probability')
|
|
axes[0, 1].set_title('Cumulative Distribution Function')
|
|
axes[0, 1].legend()
|
|
axes[0, 1].grid(True, alpha=0.3)
|
|
|
|
# Box plot
|
|
axes[1, 0].boxplot([m1_errors, m2_errors], labels=[self.model1_name, self.model2_name])
|
|
axes[1, 0].set_ylabel('Heading Error (rad)')
|
|
axes[1, 0].set_title('Box Plot')
|
|
axes[1, 0].grid(True, alpha=0.3)
|
|
|
|
# Q-Q plot
|
|
stats.probplot(m2_errors - m1_errors, dist="norm", plot=axes[1, 1])
|
|
axes[1, 1].set_title('Q-Q Plot (Error Difference)')
|
|
axes[1, 1].grid(True, alpha=0.3)
|
|
|
|
plt.tight_layout()
|
|
plt.savefig(output_dir / f'{class_name}_distribution.png', dpi=150)
|
|
plt.close()
|
|
|
|
# Save results
|
|
with open(output_dir / 'statistics.json', 'w') as f:
|
|
json.dump(results, f, indent=2)
|
|
|
|
print(f"\n✓ Distribution analysis saved to: {output_dir}")
|
|
return results
|
|
|
|
def generate_distance_analysis(self, output_dir):
|
|
"""Analyze heading errors by distance ranges."""
|
|
print("\n" + "="*80)
|
|
print("Heading Error by Distance Analysis")
|
|
print("="*80)
|
|
|
|
output_dir = Path(output_dir) / 'distance'
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Define distance ranges
|
|
long_ranges = [(0, 20), (20, 40), (40, 60), (60, 80), (80, 100)]
|
|
lat_ranges = [(-30, -10), (-10, 0), (0, 10), (10, 30)]
|
|
|
|
for class_name, items in self.data['common'].items():
|
|
# Analyze by longitudinal distance
|
|
long_stats = {}
|
|
for range_start, range_end in long_ranges:
|
|
range_key = f"{range_start}-{range_end}m"
|
|
m1_errors = []
|
|
m2_errors = []
|
|
|
|
for item in items:
|
|
dist = item['longitudinal_dist']
|
|
if range_start <= dist < range_end:
|
|
m1_errors.append(item['model1_error'])
|
|
m2_errors.append(item['model2_error'])
|
|
|
|
if len(m1_errors) > 0:
|
|
long_stats[range_key] = {
|
|
'count': len(m1_errors),
|
|
'model1_mean': float(np.mean(m1_errors)),
|
|
'model2_mean': float(np.mean(m2_errors)),
|
|
'diff': float(np.mean(m2_errors) - np.mean(m1_errors))
|
|
}
|
|
|
|
# Plot longitudinal distance analysis
|
|
if long_stats:
|
|
fig, ax = plt.subplots(figsize=(12, 6))
|
|
ranges = list(long_stats.keys())
|
|
m1_means = [long_stats[r]['model1_mean'] for r in ranges]
|
|
m2_means = [long_stats[r]['model2_mean'] for r in ranges]
|
|
|
|
x = np.arange(len(ranges))
|
|
width = 0.35
|
|
|
|
ax.bar(x - width/2, m1_means, width, label=self.model1_name)
|
|
ax.bar(x + width/2, m2_means, width, label=self.model2_name)
|
|
|
|
ax.set_xlabel('Longitudinal Distance Range')
|
|
ax.set_ylabel('Mean Heading Error (rad)')
|
|
ax.set_title(f'Heading Error by Longitudinal Distance - {class_name}')
|
|
ax.set_xticks(x)
|
|
ax.set_xticklabels(ranges)
|
|
ax.legend()
|
|
ax.grid(True, alpha=0.3)
|
|
|
|
plt.tight_layout()
|
|
plt.savefig(output_dir / f'{class_name}_longitudinal.png', dpi=150)
|
|
plt.close()
|
|
|
|
print(f"\n{class_name.upper()} - Longitudinal Distance:")
|
|
for range_key, data in long_stats.items():
|
|
print(f" {range_key}: {self.model1_name}={data['model1_mean']:.4f}, "
|
|
f"{self.model2_name}={data['model2_mean']:.4f}, "
|
|
f"diff={data['diff']:+.4f} (n={data['count']})")
|
|
|
|
print(f"\n✓ Distance analysis saved to: {output_dir}")
|
|
|
|
def identify_bad_cases(self, output_dir, threshold=1.0):
|
|
"""Identify cases with large heading errors."""
|
|
print("\n" + "="*80)
|
|
print(f"Identifying Bad Cases (threshold > {threshold} rad)")
|
|
print("="*80)
|
|
|
|
output_dir = Path(output_dir) / 'bad_cases'
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
bad_cases = []
|
|
|
|
for class_name, items in self.data['common'].items():
|
|
for item in items:
|
|
# Check if either model has large error
|
|
if item['model2_error'] > threshold:
|
|
bad_cases.append({
|
|
'case': item['case'],
|
|
'frame': item['frame'],
|
|
'class': class_name,
|
|
'gt_rotation': item['gt_rotation'],
|
|
'model1_rotation': item['model1_rotation'],
|
|
'model2_rotation': item['model2_rotation'],
|
|
'model1_error': item['model1_error'],
|
|
'model2_error': item['model2_error'],
|
|
'error_increase': item['model2_error'] - item['model1_error'],
|
|
'longitudinal_dist': item['longitudinal_dist'],
|
|
'lateral_dist': item['lateral_dist'],
|
|
'iou': item['iou'],
|
|
'confidence': item['confidence']
|
|
})
|
|
|
|
# Sort by model2 error
|
|
bad_cases.sort(key=lambda x: x['model2_error'], reverse=True)
|
|
|
|
# Save to CSV
|
|
if bad_cases:
|
|
df = pd.DataFrame(bad_cases)
|
|
csv_path = output_dir / 'bad_cases.csv'
|
|
df.to_csv(csv_path, index=False)
|
|
|
|
print(f"\nFound {len(bad_cases)} bad cases:")
|
|
print(f" Saved to: {csv_path}")
|
|
print(f"\nTop 10 worst cases:")
|
|
print(df.head(10).to_string(index=False))
|
|
else:
|
|
print(f"\nNo bad cases found with threshold > {threshold} rad")
|
|
|
|
print(f"\n✓ Bad cases analysis saved to: {output_dir}")
|
|
return bad_cases
|
|
|
|
def generate_summary_report(self, output_dir):
|
|
"""Generate summary text report."""
|
|
report_path = output_dir / 'heading_analysis_summary.txt'
|
|
|
|
with open(report_path, 'w') as f:
|
|
f.write("="*80 + "\n")
|
|
f.write("HEADING ERROR ANALYSIS SUMMARY\n")
|
|
f.write("="*80 + "\n\n")
|
|
|
|
f.write(f"Model 1: {self.model1_name}\n")
|
|
f.write(f"Model 2: {self.model2_name}\n\n")
|
|
|
|
f.write("Overall Statistics by Class:\n")
|
|
f.write("-"*80 + "\n\n")
|
|
|
|
for class_name in self.data['model1'].keys():
|
|
m1_errors = np.array(self.data['model1'][class_name])
|
|
m2_errors = np.array(self.data['model2'][class_name])
|
|
|
|
f.write(f"{class_name.upper()} (n={len(m1_errors):,}):\n")
|
|
f.write(f" {self.model1_name}:\n")
|
|
f.write(f" Mean: {np.mean(m1_errors):.4f} rad ({np.degrees(np.mean(m1_errors)):.2f}°)\n")
|
|
f.write(f" Median: {np.median(m1_errors):.4f} rad ({np.degrees(np.median(m1_errors)):.2f}°)\n")
|
|
f.write(f" Std: {np.std(m1_errors):.4f} rad\n")
|
|
f.write(f" {self.model2_name}:\n")
|
|
f.write(f" Mean: {np.mean(m2_errors):.4f} rad ({np.degrees(np.mean(m2_errors)):.2f}°)\n")
|
|
f.write(f" Median: {np.median(m2_errors):.4f} rad ({np.degrees(np.median(m2_errors)):.2f}°)\n")
|
|
f.write(f" Std: {np.std(m2_errors):.4f} rad\n")
|
|
|
|
diff_mean = np.mean(m2_errors) - np.mean(m1_errors)
|
|
pct_change = (np.mean(m2_errors) / np.mean(m1_errors) - 1) * 100
|
|
f.write(f" Change:\n")
|
|
f.write(f" Mean: +{diff_mean:.4f} rad ({pct_change:+.2f}%)\n\n")
|
|
|
|
print(f"\n✓ Summary report saved to: {report_path}")
|
|
|
|
|
|
def main():
|
|
"""Main function."""
|
|
parser = argparse.ArgumentParser(description='Analyze heading errors between two models')
|
|
parser.add_argument('--common-matches', type=str, required=True,
|
|
help='Path to common_matches.json')
|
|
parser.add_argument('--model1-matches', type=str, required=True,
|
|
help='Path to model1 detailed_3d_matches.json')
|
|
parser.add_argument('--model2-matches', type=str, required=True,
|
|
help='Path to model2 detailed_3d_matches.json')
|
|
parser.add_argument('--model1-name', type=str, default='Model1',
|
|
help='Name of model 1')
|
|
parser.add_argument('--model2-name', type=str, default='Model2',
|
|
help='Name of model 2')
|
|
parser.add_argument('--output-dir', type=str, default='heading_analysis',
|
|
help='Output directory for analysis results')
|
|
parser.add_argument('--bad-case-threshold', type=float, default=1.0,
|
|
help='Threshold for identifying bad cases (radians)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Load data
|
|
print("Loading data...")
|
|
with open(args.common_matches, 'r') as f:
|
|
common_matches_data = json.load(f)
|
|
|
|
with open(args.model1_matches, 'r') as f:
|
|
model1_matches = json.load(f)
|
|
|
|
with open(args.model2_matches, 'r') as f:
|
|
model2_matches = json.load(f)
|
|
|
|
# Create analyzer
|
|
analyzer = HeadingErrorAnalyzer(
|
|
common_matches_data,
|
|
model1_matches,
|
|
model2_matches,
|
|
model1_name=args.model1_name,
|
|
model2_name=args.model2_name
|
|
)
|
|
|
|
# Create output directory
|
|
output_dir = Path(args.output_dir)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Run analyses
|
|
print("\n" + "="*80)
|
|
print("HEADING ERROR ANALYSIS")
|
|
print("="*80)
|
|
|
|
analyzer.generate_distribution_analysis(output_dir)
|
|
analyzer.generate_distance_analysis(output_dir)
|
|
analyzer.identify_bad_cases(output_dir, threshold=args.bad_case_threshold)
|
|
analyzer.generate_summary_report(output_dir)
|
|
|
|
print("\n" + "="*80)
|
|
print("ANALYSIS COMPLETE!")
|
|
print("="*80)
|
|
print(f"\nResults saved to: {output_dir}/")
|
|
print("\nGenerated files:")
|
|
print(" - heading_analysis_summary.txt: Text summary report")
|
|
print(" - distribution/: Error distribution analysis and plots")
|
|
print(" - distance/: Distance-based analysis and plots")
|
|
print(" - bad_cases/bad_cases.csv: List of cases with large errors")
|
|
print("")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|