800 lines
30 KiB
Python
Executable File
800 lines
30 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Analyze 3D bad cases from saved detailed matches or by rebuilding matches.
|
|
|
|
Preferred input is ``detailed_3d_matches.json`` produced by the evaluator.
|
|
If that file is unavailable, this tool can rebuild the detailed matches from
|
|
the evaluation config and the underlying det/gt directories.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import math
|
|
import sys
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from statistics import mean, median
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
if str(REPO_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(REPO_ROOT))
|
|
|
|
from eval_tools.analysis.analyze_2d_fp_fn import (
|
|
build_config,
|
|
build_case_key,
|
|
class_name,
|
|
parse_class_ids,
|
|
round_float,
|
|
)
|
|
from eval_tools.evaluator.evaluator import Evaluator
|
|
|
|
|
|
DEFAULT_DISTANCE_RANGES = [
|
|
[0, 10],
|
|
[10, 20],
|
|
[20, 30],
|
|
[30, 40],
|
|
[40, 50],
|
|
[50, 60],
|
|
[60, 70],
|
|
[70, 80],
|
|
[80, 90],
|
|
[90, 100],
|
|
[100, 999],
|
|
]
|
|
DEFAULT_LATERAL_DISTANCE_RANGES = [
|
|
[-50, -40],
|
|
[-40, -30],
|
|
[-30, -20],
|
|
[-20, -10],
|
|
[-10, 0],
|
|
[0, 10],
|
|
[10, 20],
|
|
[20, 30],
|
|
[30, 40],
|
|
[40, 50],
|
|
]
|
|
METRIC_KEYS = (
|
|
"lateral_error",
|
|
"longitudinal_error",
|
|
"longitudinal_relative_error",
|
|
"heading_error",
|
|
"heading_error_relaxed",
|
|
"reversal",
|
|
)
|
|
CSV_FIELDNAMES = [
|
|
"case_name",
|
|
"frame_name",
|
|
"class_id",
|
|
"class_name",
|
|
"gt_id",
|
|
"confidence",
|
|
"iou",
|
|
"distance_longitudinal_m",
|
|
"distance_lateral_m",
|
|
"distance_bin",
|
|
"lateral_bin",
|
|
"metric_name",
|
|
"metric_value",
|
|
"metric_value_display",
|
|
"is_reversal",
|
|
"lateral_error_m",
|
|
"longitudinal_error_m",
|
|
"longitudinal_relative_error",
|
|
"heading_error_rad",
|
|
"heading_error_deg",
|
|
"heading_error_relaxed_rad",
|
|
"heading_error_relaxed_deg",
|
|
"gt_bbox",
|
|
"det_bbox",
|
|
"gt_center_3d",
|
|
"det_center_3d",
|
|
"gt_rotation_rad",
|
|
"det_rotation_rad",
|
|
]
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze 3D bad cases from detailed_3d_matches.json or rebuild them from config."
|
|
)
|
|
parser.add_argument(
|
|
"--detailed-matches",
|
|
type=str,
|
|
default=None,
|
|
help="Path to detailed_3d_matches.json generated by evaluator.",
|
|
)
|
|
parser.add_argument(
|
|
"--evaluation-report",
|
|
type=str,
|
|
default=None,
|
|
help="Path to evaluation_report.json. Used to infer sibling detailed_3d_matches.json.",
|
|
)
|
|
parser.add_argument("--config", type=str, default=None, help="Path to YAML evaluation config.")
|
|
parser.add_argument("--det-path", type=str, help="Detection results root directory")
|
|
parser.add_argument("--gt-path", type=str, help="Ground-truth labels root directory")
|
|
parser.add_argument("--path-depth", type=int, choices=[1, 2], help="Directory depth")
|
|
parser.add_argument(
|
|
"--det-format",
|
|
type=str,
|
|
choices=["auto", "json", "txt"],
|
|
help="Detection file format",
|
|
)
|
|
parser.add_argument(
|
|
"--gt-format",
|
|
type=str,
|
|
choices=["auto", "json", "txt"],
|
|
help="Ground-truth file format",
|
|
)
|
|
parser.add_argument("--img-width", type=int, help="Image width")
|
|
parser.add_argument("--img-height", type=int, help="Image height")
|
|
parser.add_argument(
|
|
"--coord-system",
|
|
type=str,
|
|
choices=["camera", "ego"],
|
|
help="Coordinate system used by the parser/evaluator",
|
|
)
|
|
parser.add_argument(
|
|
"--iou-threshold",
|
|
type=float,
|
|
help="IoU threshold used for evaluator loading",
|
|
)
|
|
parser.add_argument(
|
|
"--conf-threshold",
|
|
type=float,
|
|
help="Confidence threshold for rebuilding detailed matches",
|
|
)
|
|
parser.add_argument(
|
|
"--classes",
|
|
nargs="+",
|
|
default=None,
|
|
help="Optional class filter, e.g. car suv pedestrian or numeric IDs",
|
|
)
|
|
parser.add_argument(
|
|
"--metrics",
|
|
nargs="+",
|
|
default=list(METRIC_KEYS),
|
|
choices=METRIC_KEYS,
|
|
help="Metrics to rank and export.",
|
|
)
|
|
parser.add_argument(
|
|
"--top-k",
|
|
type=int,
|
|
default=200,
|
|
help="Top bad cases to keep per metric overall.",
|
|
)
|
|
parser.add_argument(
|
|
"--top-k-per-class",
|
|
type=int,
|
|
default=100,
|
|
help="Top bad cases to keep per metric and class.",
|
|
)
|
|
parser.add_argument(
|
|
"--top-k-frames",
|
|
type=int,
|
|
default=50,
|
|
help="Number of worst frames to keep in the summary.",
|
|
)
|
|
parser.add_argument(
|
|
"--min-confidence",
|
|
type=float,
|
|
default=None,
|
|
help="Optional minimum confidence filter on matched detections.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-confidence",
|
|
type=float,
|
|
default=None,
|
|
help="Optional maximum confidence filter on matched detections.",
|
|
)
|
|
parser.add_argument(
|
|
"--min-iou",
|
|
type=float,
|
|
default=None,
|
|
help="Optional minimum 2D IoU filter on matched samples.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-iou",
|
|
type=float,
|
|
default=None,
|
|
help="Optional maximum 2D IoU filter on matched samples.",
|
|
)
|
|
parser.add_argument(
|
|
"--bad-lateral-threshold",
|
|
type=float,
|
|
default=1.0,
|
|
help="Threshold in meters for counting bad lateral errors.",
|
|
)
|
|
parser.add_argument(
|
|
"--bad-longitudinal-threshold",
|
|
type=float,
|
|
default=3.0,
|
|
help="Threshold in meters for counting bad longitudinal errors.",
|
|
)
|
|
parser.add_argument(
|
|
"--bad-longitudinal-relative-threshold",
|
|
type=float,
|
|
default=0.2,
|
|
help="Threshold for counting bad longitudinal relative errors.",
|
|
)
|
|
parser.add_argument(
|
|
"--bad-heading-threshold-deg",
|
|
type=float,
|
|
default=15.0,
|
|
help="Threshold in degrees for counting bad heading errors.",
|
|
)
|
|
parser.add_argument(
|
|
"--num-workers",
|
|
type=int,
|
|
default=None,
|
|
help="Worker count for rebuilding detailed matches (default: evaluator auto-detect).",
|
|
)
|
|
parser.add_argument(
|
|
"--save-rebuilt-matches",
|
|
action="store_true",
|
|
help="When rebuilding detailed matches, also save them into the output directory.",
|
|
)
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
type=str,
|
|
default=None,
|
|
help="Output directory. Defaults to eval_tools/analysis/results_3d/<timestamp>.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def metric_display_value(metric_name, sample):
|
|
if metric_name == "lateral_error":
|
|
return float(sample["lateral_error_m"])
|
|
if metric_name == "longitudinal_error":
|
|
return float(sample["longitudinal_error_m"])
|
|
if metric_name == "longitudinal_relative_error":
|
|
return float(sample["longitudinal_relative_error"])
|
|
if metric_name == "heading_error":
|
|
return float(sample["heading_error_deg"])
|
|
if metric_name == "heading_error_relaxed":
|
|
return float(sample["heading_error_relaxed_deg"])
|
|
if metric_name == "reversal":
|
|
return 1.0 if sample["is_reversal"] else 0.0
|
|
raise KeyError(f"Unsupported metric: {metric_name}")
|
|
|
|
|
|
def metric_raw_value(metric_name, sample):
|
|
if metric_name == "lateral_error":
|
|
return float(sample["lateral_error_m"])
|
|
if metric_name == "longitudinal_error":
|
|
return float(sample["longitudinal_error_m"])
|
|
if metric_name == "longitudinal_relative_error":
|
|
return float(sample["longitudinal_relative_error"])
|
|
if metric_name == "heading_error":
|
|
return float(sample["heading_error_rad"])
|
|
if metric_name == "heading_error_relaxed":
|
|
return float(sample["heading_error_relaxed_rad"])
|
|
if metric_name == "reversal":
|
|
return 1.0 if sample["is_reversal"] else 0.0
|
|
raise KeyError(f"Unsupported metric: {metric_name}")
|
|
|
|
|
|
def metric_unit(metric_name):
|
|
if metric_name in ("lateral_error", "longitudinal_error"):
|
|
return "m"
|
|
if metric_name in ("heading_error", "heading_error_relaxed"):
|
|
return "deg"
|
|
return ""
|
|
|
|
|
|
def threshold_hit(metric_name, sample, thresholds):
|
|
if metric_name == "lateral_error":
|
|
return sample["lateral_error_m"] >= thresholds["bad_lateral_threshold"]
|
|
if metric_name == "longitudinal_error":
|
|
return sample["longitudinal_error_m"] >= thresholds["bad_longitudinal_threshold"]
|
|
if metric_name == "longitudinal_relative_error":
|
|
return sample["longitudinal_relative_error"] >= thresholds["bad_longitudinal_relative_threshold"]
|
|
if metric_name in ("heading_error", "heading_error_relaxed"):
|
|
limit = thresholds["bad_heading_threshold_deg"]
|
|
key = "heading_error_deg" if metric_name == "heading_error" else "heading_error_relaxed_deg"
|
|
return sample[key] >= limit
|
|
if metric_name == "reversal":
|
|
return bool(sample["is_reversal"])
|
|
raise KeyError(f"Unsupported metric: {metric_name}")
|
|
|
|
|
|
def make_stats(values):
|
|
if not values:
|
|
return {"mean": 0.0, "median": 0.0, "std": 0.0, "percentile_90": 0.0}
|
|
|
|
values = [float(v) for v in values]
|
|
avg = mean(values)
|
|
med = median(values)
|
|
variance = sum((v - avg) ** 2 for v in values) / len(values)
|
|
values_sorted = sorted(values)
|
|
p90_index = min(len(values_sorted) - 1, max(0, math.ceil(0.9 * len(values_sorted)) - 1))
|
|
return {
|
|
"mean": round_float(avg),
|
|
"median": round_float(med),
|
|
"std": round_float(math.sqrt(variance)),
|
|
"percentile_90": round_float(values_sorted[p90_index]),
|
|
}
|
|
|
|
|
|
def bucket_label(prefix, range_pair):
|
|
return f"{prefix}_{range_pair[0]}-{range_pair[1]}m"
|
|
|
|
|
|
def build_distance_ranges(config):
|
|
metrics_cfg = config.get("metrics_3d", {}) if config else {}
|
|
return metrics_cfg.get("distance_ranges") or DEFAULT_DISTANCE_RANGES
|
|
|
|
|
|
def build_lateral_ranges(config):
|
|
metrics_cfg = config.get("metrics_3d", {}) if config else {}
|
|
return metrics_cfg.get("lateral_distance_ranges") or DEFAULT_LATERAL_DISTANCE_RANGES
|
|
|
|
|
|
def classify_range(value, prefix, ranges):
|
|
if value is None:
|
|
return None
|
|
for range_pair in ranges:
|
|
lo, hi = float(range_pair[0]), float(range_pair[1])
|
|
if lo <= float(value) < hi:
|
|
return bucket_label(prefix, [int(lo) if lo.is_integer() else lo, int(hi) if hi.is_integer() else hi])
|
|
return None
|
|
|
|
|
|
def infer_detailed_matches_path(args):
|
|
if args.detailed_matches:
|
|
return Path(args.detailed_matches)
|
|
if args.evaluation_report:
|
|
report_path = Path(args.evaluation_report)
|
|
sibling = report_path.parent / "detailed_3d_matches.json"
|
|
if sibling.exists():
|
|
return sibling
|
|
return None
|
|
|
|
|
|
def load_saved_detailed_matches(path):
|
|
with open(path, "r") as file:
|
|
return json.load(file)
|
|
|
|
|
|
def rebuild_detailed_matches(args):
|
|
config = build_config(args)
|
|
evaluator = Evaluator(
|
|
config=config,
|
|
iou_threshold=float(config.get("matching", {}).get("iou_threshold", 0.5)),
|
|
num_workers=args.num_workers,
|
|
save_detailed_matches=True,
|
|
)
|
|
dataset_cfg = config["dataset"]
|
|
image_cfg = config["image"]
|
|
evaluator.load_data_from_paths(
|
|
det_root=dataset_cfg["det_path"],
|
|
gt_root=dataset_cfg["gt_path"],
|
|
img_width=image_cfg.get("width", 1920),
|
|
img_height=image_cfg.get("height", 1080),
|
|
path_depth=dataset_cfg.get("path_depth", 1),
|
|
det_format=dataset_cfg.get("det_format", "auto"),
|
|
gt_format=dataset_cfg.get("gt_format", "auto"),
|
|
)
|
|
evaluator.evaluate_3d()
|
|
return evaluator.detailed_3d_matches or {}, config
|
|
|
|
|
|
def load_detailed_matches(args):
|
|
detailed_path = infer_detailed_matches_path(args)
|
|
config = build_config(args) if (
|
|
args.config
|
|
or args.det_path
|
|
or args.gt_path
|
|
or args.path_depth is not None
|
|
or args.det_format
|
|
or args.gt_format
|
|
or args.img_width is not None
|
|
or args.img_height is not None
|
|
or args.coord_system
|
|
or args.iou_threshold is not None
|
|
or args.conf_threshold is not None
|
|
) else None
|
|
|
|
if detailed_path and detailed_path.exists():
|
|
return load_saved_detailed_matches(detailed_path), detailed_path, config
|
|
|
|
if config is None:
|
|
raise FileNotFoundError(
|
|
"Failed to locate detailed_3d_matches.json. Please provide --detailed-matches, "
|
|
"--evaluation-report with a sibling matches file, or --config to rebuild matches."
|
|
)
|
|
|
|
matches, rebuilt_config = rebuild_detailed_matches(args)
|
|
return matches, None, rebuilt_config
|
|
|
|
|
|
def collect_samples(detailed_matches, class_ids, distance_ranges, lateral_ranges, args):
|
|
selected_class_names = {class_name(class_id) for class_id in class_ids}
|
|
samples = []
|
|
for case_name, frames in detailed_matches.items():
|
|
for frame_name, class_groups in frames.items():
|
|
for class_name_str, items in class_groups.items():
|
|
if class_name_str not in selected_class_names:
|
|
continue
|
|
class_id = next((cid for cid in class_ids if class_name(cid) == class_name_str), None)
|
|
if class_id is None:
|
|
continue
|
|
for item in items:
|
|
confidence = float(item.get("confidence", 0.0))
|
|
iou = float(item.get("iou", 0.0))
|
|
if args.min_confidence is not None and confidence < args.min_confidence:
|
|
continue
|
|
if args.max_confidence is not None and confidence > args.max_confidence:
|
|
continue
|
|
if args.min_iou is not None and iou < args.min_iou:
|
|
continue
|
|
if args.max_iou is not None and iou > args.max_iou:
|
|
continue
|
|
|
|
distance = item.get("distance") or {}
|
|
errors = item.get("errors") or {}
|
|
longitudinal_distance = distance.get("longitudinal")
|
|
lateral_distance = distance.get("lateral")
|
|
sample = {
|
|
"case_name": case_name,
|
|
"frame_name": frame_name,
|
|
"class_id": class_id,
|
|
"class_name": class_name_str,
|
|
"gt_id": str(item.get("gt_id")),
|
|
"confidence": round_float(confidence),
|
|
"iou": round_float(iou),
|
|
"distance_longitudinal_m": None if longitudinal_distance is None else round_float(longitudinal_distance),
|
|
"distance_lateral_m": None if lateral_distance is None else round_float(lateral_distance),
|
|
"distance_bin": classify_range(longitudinal_distance, "long", distance_ranges),
|
|
"lateral_bin": classify_range(lateral_distance, "lat", lateral_ranges),
|
|
"gt_bbox": [round_float(v) for v in item.get("gt_bbox", [])],
|
|
"det_bbox": [round_float(v) for v in item.get("det_bbox", [])],
|
|
"gt_center_3d": [round_float(v) for v in item.get("gt_center_3d", [])],
|
|
"det_center_3d": [round_float(v) for v in item.get("det_center_3d", [])],
|
|
"gt_rotation_rad": round_float(item.get("gt_rotation", 0.0)),
|
|
"det_rotation_rad": round_float(item.get("det_rotation", 0.0)),
|
|
"lateral_error_m": round_float(errors.get("lateral", 0.0)),
|
|
"longitudinal_error_m": round_float(errors.get("longitudinal", 0.0)),
|
|
"longitudinal_relative_error": round_float(errors.get("longitudinal_relative", 0.0)),
|
|
"heading_error_rad": round_float(errors.get("heading", 0.0)),
|
|
"heading_error_deg": round_float(math.degrees(float(errors.get("heading", 0.0)))),
|
|
"heading_error_relaxed_rad": round_float(errors.get("heading_relaxed", errors.get("heading", 0.0))),
|
|
"heading_error_relaxed_deg": round_float(
|
|
math.degrees(float(errors.get("heading_relaxed", errors.get("heading", 0.0))))
|
|
),
|
|
"is_reversal": bool(errors.get("is_reversal", False)),
|
|
}
|
|
samples.append(sample)
|
|
return samples
|
|
|
|
|
|
def summarize_metric(samples, metric_name, thresholds):
|
|
values = [metric_raw_value(metric_name, sample) for sample in samples]
|
|
summary = {
|
|
"stats": make_stats(values),
|
|
"bad_count": sum(1 for sample in samples if threshold_hit(metric_name, sample, thresholds)),
|
|
"bad_percentage": round_float(
|
|
100.0 * sum(1 for sample in samples if threshold_hit(metric_name, sample, thresholds)) / len(samples)
|
|
) if samples else 0.0,
|
|
}
|
|
if metric_name == "reversal":
|
|
count = sum(1 for sample in samples if sample["is_reversal"])
|
|
summary = {
|
|
"count": count,
|
|
"percentage": round_float(100.0 * count / len(samples)) if samples else 0.0,
|
|
}
|
|
return summary
|
|
|
|
|
|
def summarize_sample_group(samples, metrics, thresholds):
|
|
result = {"num_samples": len(samples)}
|
|
for metric_name in metrics:
|
|
result[metric_name] = summarize_metric(samples, metric_name, thresholds)
|
|
return result
|
|
|
|
|
|
def build_top_frames(samples, metrics, thresholds, top_k_frames):
|
|
grouped = defaultdict(list)
|
|
for sample in samples:
|
|
grouped[(sample["case_name"], sample["frame_name"])].append(sample)
|
|
|
|
top_frames = []
|
|
for (case_name, frame_name), frame_samples in grouped.items():
|
|
bad_by_metric = {
|
|
metric_name: sum(1 for sample in frame_samples if threshold_hit(metric_name, sample, thresholds))
|
|
for metric_name in metrics
|
|
}
|
|
frame_record = {
|
|
"case_name": case_name,
|
|
"frame_name": frame_name,
|
|
"num_samples": len(frame_samples),
|
|
"bad_objects": sum(1 for sample in frame_samples if any(threshold_hit(metric_name, sample, thresholds) for metric_name in metrics)),
|
|
"bad_by_metric": bad_by_metric,
|
|
"mean_lateral_error_m": round_float(mean(sample["lateral_error_m"] for sample in frame_samples)),
|
|
"mean_longitudinal_error_m": round_float(mean(sample["longitudinal_error_m"] for sample in frame_samples)),
|
|
"mean_heading_error_deg": round_float(mean(sample["heading_error_deg"] for sample in frame_samples)),
|
|
}
|
|
top_frames.append(frame_record)
|
|
|
|
top_frames.sort(
|
|
key=lambda item: (
|
|
item["bad_objects"],
|
|
item["bad_by_metric"].get("reversal", 0),
|
|
item["mean_longitudinal_error_m"],
|
|
item["mean_heading_error_deg"],
|
|
),
|
|
reverse=True,
|
|
)
|
|
return top_frames[:top_k_frames]
|
|
|
|
|
|
def rank_key(metric_name, sample):
|
|
if metric_name == "reversal":
|
|
return (
|
|
1 if sample["is_reversal"] else 0,
|
|
sample["heading_error_deg"],
|
|
sample["confidence"],
|
|
sample["iou"],
|
|
)
|
|
return (
|
|
metric_display_value(metric_name, sample),
|
|
sample["confidence"],
|
|
sample["iou"],
|
|
)
|
|
|
|
|
|
def sample_to_example(sample, metric_name):
|
|
example = dict(sample)
|
|
example["metric_name"] = metric_name
|
|
example["metric_value"] = round_float(metric_raw_value(metric_name, sample))
|
|
example["metric_value_display"] = round_float(metric_display_value(metric_name, sample))
|
|
example["metric_unit"] = metric_unit(metric_name)
|
|
return example
|
|
|
|
|
|
def build_badcase_examples(samples, metrics, top_k, top_k_per_class):
|
|
badcase_examples = {}
|
|
badcase_examples_per_class = defaultdict(dict)
|
|
|
|
by_class = defaultdict(list)
|
|
for sample in samples:
|
|
by_class[sample["class_name"]].append(sample)
|
|
|
|
for metric_name in metrics:
|
|
ranked = sorted(samples, key=lambda sample: rank_key(metric_name, sample), reverse=True)
|
|
if metric_name == "reversal":
|
|
ranked = [sample for sample in ranked if sample["is_reversal"]]
|
|
badcase_examples[metric_name] = [sample_to_example(sample, metric_name) for sample in ranked[:top_k]]
|
|
|
|
for class_name_str, class_samples in by_class.items():
|
|
class_ranked = sorted(class_samples, key=lambda sample: rank_key(metric_name, sample), reverse=True)
|
|
if metric_name == "reversal":
|
|
class_ranked = [sample for sample in class_ranked if sample["is_reversal"]]
|
|
badcase_examples_per_class[class_name_str][metric_name] = [
|
|
sample_to_example(sample, metric_name) for sample in class_ranked[:top_k_per_class]
|
|
]
|
|
|
|
return badcase_examples, dict(sorted(badcase_examples_per_class.items()))
|
|
|
|
|
|
def build_bin_summary(samples, key_name, metrics, thresholds):
|
|
grouped = defaultdict(list)
|
|
for sample in samples:
|
|
bucket = sample.get(key_name)
|
|
if bucket:
|
|
grouped[bucket].append(sample)
|
|
return {
|
|
bucket: summarize_sample_group(bucket_samples, metrics, thresholds)
|
|
for bucket, bucket_samples in sorted(grouped.items())
|
|
}
|
|
|
|
|
|
def write_csv_exports(output_dir, badcase_examples):
|
|
csv_dir = output_dir / "csv"
|
|
csv_dir.mkdir(parents=True, exist_ok=True)
|
|
for metric_name, examples in badcase_examples.items():
|
|
csv_path = csv_dir / f"top_{metric_name}.csv"
|
|
with open(csv_path, "w", newline="") as file:
|
|
writer = csv.DictWriter(file, fieldnames=CSV_FIELDNAMES)
|
|
writer.writeheader()
|
|
for example in examples:
|
|
row = {field: example.get(field) for field in CSV_FIELDNAMES}
|
|
writer.writerow(row)
|
|
|
|
|
|
def write_markdown_report(report, output_path):
|
|
metadata = report["metadata"]
|
|
summary = report["summary"]
|
|
top_frames = report["top_frames"]
|
|
badcase_examples = report["badcase_examples"]
|
|
|
|
with open(output_path, "w") as file:
|
|
file.write("# 3D Badcase Analysis Report\n\n")
|
|
|
|
file.write("## Configuration\n\n")
|
|
file.write("| Item | Value |\n")
|
|
file.write("| --- | --- |\n")
|
|
for key in (
|
|
"source",
|
|
"detailed_matches_path",
|
|
"config_path",
|
|
"coord_system",
|
|
"iou_threshold",
|
|
"conf_threshold",
|
|
"classes",
|
|
"metrics",
|
|
):
|
|
value = metadata.get(key)
|
|
if isinstance(value, list):
|
|
value = ", ".join(str(v) for v in value)
|
|
file.write(f"| {key} | `{value}` |\n")
|
|
file.write("\n")
|
|
|
|
file.write("## Overall Summary\n\n")
|
|
file.write("| Metric | Samples | Mean | P90 | Bad Count | Bad % |\n")
|
|
file.write("| --- | ---: | ---: | ---: | ---: | ---: |\n")
|
|
for metric_name in metadata["metrics"]:
|
|
item = summary["metrics"][metric_name]
|
|
if metric_name == "reversal":
|
|
file.write(
|
|
f"| `{metric_name}` | {summary['num_samples']} | - | - | {item['count']} | {item['percentage']:.2f}% |\n"
|
|
)
|
|
else:
|
|
stats = item["stats"]
|
|
file.write(
|
|
f"| `{metric_name}` | {summary['num_samples']} | {stats['mean']:.4f} | {stats['percentile_90']:.4f} "
|
|
f"| {item['bad_count']} | {item['bad_percentage']:.2f}% |\n"
|
|
)
|
|
file.write("\n")
|
|
|
|
file.write("## Per-Class Summary\n\n")
|
|
file.write("| Class | Samples |")
|
|
for metric_name in metadata["metrics"]:
|
|
if metric_name == "reversal":
|
|
file.write(" Reversal % |")
|
|
else:
|
|
file.write(f" {metric_name} Mean |")
|
|
file.write("\n")
|
|
file.write("| --- | ---: |")
|
|
for _metric_name in metadata["metrics"]:
|
|
file.write(" ---: |")
|
|
file.write("\n")
|
|
for class_name_str, class_summary in report["per_class"].items():
|
|
file.write(f"| `{class_name_str}` | {class_summary['num_samples']} |")
|
|
for metric_name in metadata["metrics"]:
|
|
metric_summary = class_summary.get(metric_name, {})
|
|
if metric_name == "reversal":
|
|
file.write(f" {metric_summary.get('percentage', 0.0):.2f}% |")
|
|
else:
|
|
file.write(f" {metric_summary.get('stats', {}).get('mean', 0.0):.4f} |")
|
|
file.write("\n")
|
|
file.write("\n")
|
|
|
|
file.write("## Top Frames\n\n")
|
|
file.write("| Case / Frame | Samples | Bad Objects | Longitudinal Mean | Heading Mean(deg) |\n")
|
|
file.write("| --- | ---: | ---: | ---: | ---: |\n")
|
|
for item in top_frames:
|
|
file.write(
|
|
f"| `{item['case_name']}/{item['frame_name']}` | {item['num_samples']} | {item['bad_objects']} "
|
|
f"| {item['mean_longitudinal_error_m']:.4f} | {item['mean_heading_error_deg']:.4f} |\n"
|
|
)
|
|
file.write("\n")
|
|
|
|
file.write("## Top Badcases\n\n")
|
|
for metric_name, examples in badcase_examples.items():
|
|
file.write(f"### {metric_name}\n\n")
|
|
file.write("| Class | Case / Frame | Metric | Conf | IoU |\n")
|
|
file.write("| --- | --- | ---: | ---: | ---: |\n")
|
|
for example in examples[:10]:
|
|
file.write(
|
|
f"| `{example['class_name']}` | `{example['case_name']}/{example['frame_name']}` | "
|
|
f"{example['metric_value_display']:.4f} | {example['confidence']:.4f} | {example['iou']:.4f} |\n"
|
|
)
|
|
file.write("\n")
|
|
|
|
|
|
def default_output_dir():
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
return REPO_ROOT / "eval_tools" / "analysis" / "results_3d" / timestamp
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
output_dir = Path(args.output_dir) if args.output_dir else default_output_dir()
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
detailed_matches, detailed_matches_path, config = load_detailed_matches(args)
|
|
if not detailed_matches:
|
|
raise RuntimeError("No detailed 3D matches were available for analysis.")
|
|
|
|
class_ids = parse_class_ids(args.classes)
|
|
thresholds = {
|
|
"bad_lateral_threshold": float(args.bad_lateral_threshold),
|
|
"bad_longitudinal_threshold": float(args.bad_longitudinal_threshold),
|
|
"bad_longitudinal_relative_threshold": float(args.bad_longitudinal_relative_threshold),
|
|
"bad_heading_threshold_deg": float(args.bad_heading_threshold_deg),
|
|
}
|
|
distance_ranges = build_distance_ranges(config or {})
|
|
lateral_ranges = build_lateral_ranges(config or {})
|
|
samples = collect_samples(detailed_matches, class_ids, distance_ranges, lateral_ranges, args)
|
|
if not samples:
|
|
raise RuntimeError("No 3D samples remained after filtering.")
|
|
|
|
metrics = list(args.metrics)
|
|
summary = {
|
|
"num_cases": len({sample["case_name"] for sample in samples}),
|
|
"num_frames": len({(sample["case_name"], sample["frame_name"]) for sample in samples}),
|
|
"num_samples": len(samples),
|
|
"metrics": {metric_name: summarize_metric(samples, metric_name, thresholds) for metric_name in metrics},
|
|
}
|
|
per_class = {}
|
|
by_class = defaultdict(list)
|
|
for sample in samples:
|
|
by_class[sample["class_name"]].append(sample)
|
|
for class_name_str, class_samples in sorted(by_class.items()):
|
|
per_class[class_name_str] = summarize_sample_group(class_samples, metrics, thresholds)
|
|
|
|
badcase_examples, badcase_examples_per_class = build_badcase_examples(
|
|
samples=samples,
|
|
metrics=metrics,
|
|
top_k=args.top_k,
|
|
top_k_per_class=args.top_k_per_class,
|
|
)
|
|
top_frames = build_top_frames(samples, metrics, thresholds, args.top_k_frames)
|
|
|
|
report = {
|
|
"metadata": {
|
|
"created_at": datetime.now().isoformat(timespec="seconds"),
|
|
"source": "detailed_3d_matches" if detailed_matches_path is not None else "rebuilt_from_config",
|
|
"detailed_matches_path": str(detailed_matches_path.resolve()) if detailed_matches_path else None,
|
|
"config_path": args.config,
|
|
"coord_system": (config or {}).get("metrics_3d", {}).get("coordinate_system", "camera"),
|
|
"iou_threshold": (config or {}).get("matching", {}).get("iou_threshold"),
|
|
"conf_threshold": (config or {}).get("metrics_3d", {}).get(
|
|
"conf_threshold",
|
|
(config or {}).get("metrics_2d", {}).get("conf_threshold"),
|
|
),
|
|
"classes": [class_name(class_id) for class_id in class_ids],
|
|
"metrics": metrics,
|
|
"bad_thresholds": thresholds,
|
|
"distance_ranges": distance_ranges,
|
|
"lateral_distance_ranges": lateral_ranges,
|
|
"vehicle_size_split_3d": (config or {}).get("metrics_3d", {}).get("vehicle_size_split"),
|
|
},
|
|
"summary": summary,
|
|
"per_class": per_class,
|
|
"per_distance_bin": build_bin_summary(samples, "distance_bin", metrics, thresholds),
|
|
"per_lateral_bin": build_bin_summary(samples, "lateral_bin", metrics, thresholds),
|
|
"top_frames": top_frames,
|
|
"badcase_examples": badcase_examples,
|
|
"badcase_examples_per_class": badcase_examples_per_class,
|
|
}
|
|
|
|
report_path = output_dir / "analysis_report.json"
|
|
with open(report_path, "w") as file:
|
|
json.dump(report, file, indent=2)
|
|
|
|
markdown_path = output_dir / "analysis_report.md"
|
|
write_markdown_report(report, markdown_path)
|
|
write_csv_exports(output_dir, badcase_examples)
|
|
|
|
if args.save_rebuilt_matches and detailed_matches_path is None:
|
|
rebuilt_path = output_dir / "detailed_3d_matches.json"
|
|
with open(rebuilt_path, "w") as file:
|
|
json.dump(detailed_matches, file, indent=2)
|
|
print(f"Rebuilt detailed matches saved to: {rebuilt_path}")
|
|
|
|
print(f"JSON report saved to: {report_path}")
|
|
print(f"Markdown report saved to: {markdown_path}")
|
|
print(f"CSV exports saved to: {output_dir / 'csv'}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|