Files
yolov26_3d/tools/temporal_analysis/analyze_tracking_loss.py

438 lines
16 KiB
Python
Raw Permalink Normal View History

2026-06-24 09:35:46 +08:00
#!/usr/bin/env python3
"""Analyze whether detections are lost between raw per-frame JSONs and tracking output.
Primary use case:
compare {case_dir}/roi0/*.json against {case_dir}/roi0.json
The script reports:
- frame-level coverage differences
- total / per-class detection count deltas
- per-frame missing/new detection counts
- detailed missing/new samples for debugging
Usage:
python tools/temporal_analysis/analyze_tracking_loss.py \
--case-dir /path/to/case --source roi0
python tools/temporal_analysis/analyze_tracking_loss.py \
--raw-dir /path/to/roi0 \
--tracking /path/to/roi0.json \
--output /path/to/roi0_loss_report.json
"""
import argparse
import json
from collections import Counter, defaultdict
from pathlib import Path
from merge_tracking_results import normalize_image_name
def round_float(value, digits=6):
"""Round float-like values to stabilize comparison keys."""
return round(float(value), digits)
def normalize_bbox(bbox):
"""Convert bbox to a rounded tuple."""
return tuple(round_float(v) for v in bbox[:4])
def normalize_vector(values, limit=None):
"""Convert an optional numeric vector to a rounded tuple."""
if values is None:
return None
seq = values if limit is None else values[:limit]
return tuple(round_float(v) for v in seq)
def detection_signature(det):
"""Build a stable comparison signature for one detection."""
return (
int(det.get("class_id", -1)),
normalize_bbox(det.get("bbox", [0, 0, 0, 0])),
round_float(det.get("confidence", 0.0)),
str(det.get("type_name", "")),
str(det.get("face_cls", "")),
int(det.get("cut_cls", -1)),
str(det.get("cut_cls_name", "")),
str(det.get("anchor", "")),
normalize_vector(det.get("object_3d"), limit=7),
normalize_vector(det.get("object_3d_ego"), limit=7),
)
def serialize_detection(det):
"""Return a compact JSON-friendly detection summary."""
data = {
"class_id": det.get("class_id"),
"bbox": det.get("bbox"),
"confidence": det.get("confidence"),
}
optional_keys = [
"track_id",
"type_name",
"face_cls",
"cut_cls",
"cut_cls_name",
"anchor",
"frameId",
]
for key in optional_keys:
if key in det:
data[key] = det.get(key)
return data
def parse_det_format(det_dict, image_name=None):
"""Parse raw single-frame detection JSON into the tracking input schema."""
if "detections" in det_dict and isinstance(det_dict["detections"], dict):
raw_detections = det_dict["detections"]
else:
raw_detections = det_dict
face_map = {
"front": "kMonocular3DFront",
"tail": "kMonocular3DRear",
"back": "kMonocular3DRear",
"left": "kMonocular3DLeft",
"right": "kMonocular3DRight",
"center": "kMonocular3DCenter",
"none": "kMonocular3DCenter",
}
detections = []
for det in raw_detections.values():
class_id = int(det["type"])
bbox = [float(v) for v in det["box2d"]]
score = float(det["score"])
xyzlhwyaw_raw = det.get("xyzlhwyaw", [])
object_3d = None
if xyzlhwyaw_raw and float(xyzlhwyaw_raw[0]) != -1:
object_3d = [float(v) for v in xyzlhwyaw_raw]
xyzlhwyaw_ego_raw = det.get("xyzlhwyaw_ego", [])
object_3d_ego = None
if xyzlhwyaw_ego_raw and float(xyzlhwyaw_ego_raw[0]) != -1:
object_3d_ego = [float(v) for v in xyzlhwyaw_ego_raw]
detection = {
"bbox": bbox,
"confidence": score,
"class_id": class_id,
"type_name": det.get("type_name", ""),
"face_cls": det.get("face_cls", "none"),
"cut_cls": int(det.get("cut_cls", -1)),
"cut_cls_name": det.get("cut_cls_name", "none"),
"frameId": normalize_image_name(image_name).split("_")[-1] if image_name else None,
"version": "20260228",
"timestamp": 0,
}
detection["anchor"] = face_map.get(detection["face_cls"], "kMonocular3DCenter")
if object_3d is not None:
detection["object_3d"] = object_3d
if object_3d_ego is not None:
detection["object_3d_ego"] = object_3d_ego
detections.append(detection)
return {
"image_name": image_name,
"detections": detections,
}
def load_predictions_from_dir(input_dir, pattern="*.json"):
"""Load raw per-frame JSON files using the same schema as tracking input."""
input_dir = Path(input_dir)
json_files = sorted(input_dir.glob(pattern))
predictions_data = []
for json_file in json_files:
with open(json_file, "r", encoding="utf-8") as f:
det_dict = json.load(f)
predictions_data.append(parse_det_format(det_dict, image_name=json_file.stem))
return predictions_data
def load_tracking_frames(tracking_path):
"""Load tracking frames from track_objects.py output."""
with open(tracking_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError(f"Expected a list of frames in {tracking_path}, got {type(data).__name__}")
return data
def index_frames(frames):
"""Index frames by normalized image name."""
indexed = {}
ordered_keys = []
for frame in frames:
raw_name = frame.get("image_name") or ""
key = normalize_image_name(Path(raw_name).stem if raw_name else "")
indexed[key] = frame
ordered_keys.append(key)
return indexed, ordered_keys
def counter_to_detections(detections):
"""Build a multiset of detection signatures."""
counter = Counter()
det_by_sig = defaultdict(list)
for det in detections:
sig = detection_signature(det)
counter[sig] += 1
det_by_sig[sig].append(det)
return counter, det_by_sig
def expand_counter_delta(delta_counter, det_by_sig):
"""Materialize counter deltas back into example detections."""
items = []
for sig, count in delta_counter.items():
for idx in range(count):
items.append(serialize_detection(det_by_sig[sig][idx]))
return items
def compute_class_counts(detections):
"""Count detections by class_id."""
counts = Counter()
for det in detections:
counts[int(det.get("class_id", -1))] += 1
return counts
def analyze_pair(raw_frames, tracking_frames, top_k_frames=20, top_k_samples=200):
"""Compare raw parsed frames with tracking output frames."""
raw_index, raw_order = index_frames(raw_frames)
tracking_index, tracking_order = index_frames(tracking_frames)
all_keys = []
seen = set()
for key in raw_order + tracking_order:
if key not in seen:
all_keys.append(key)
seen.add(key)
frame_reports = []
totals = {
"raw_frames": len(raw_frames),
"tracking_frames": len(tracking_frames),
"shared_frames": 0,
"raw_only_frames": 0,
"tracking_only_frames": 0,
"raw_detections": 0,
"tracking_detections": 0,
"matched_detections": 0,
"missing_detections": 0,
"new_detections": 0,
}
per_class = defaultdict(lambda: {"raw": 0, "tracking": 0, "missing": 0, "new": 0})
missing_samples = []
new_samples = []
for key in all_keys:
raw_frame = raw_index.get(key)
tracking_frame = tracking_index.get(key)
raw_dets = raw_frame.get("detections", []) if raw_frame else []
tracking_dets = tracking_frame.get("detections", []) if tracking_frame else []
if raw_frame and tracking_frame:
totals["shared_frames"] += 1
elif raw_frame:
totals["raw_only_frames"] += 1
else:
totals["tracking_only_frames"] += 1
raw_count = len(raw_dets)
tracking_count = len(tracking_dets)
totals["raw_detections"] += raw_count
totals["tracking_detections"] += tracking_count
raw_by_class = compute_class_counts(raw_dets)
tracking_by_class = compute_class_counts(tracking_dets)
for cls_id, count in raw_by_class.items():
per_class[cls_id]["raw"] += count
for cls_id, count in tracking_by_class.items():
per_class[cls_id]["tracking"] += count
raw_counter, raw_det_by_sig = counter_to_detections(raw_dets)
tracking_counter, tracking_det_by_sig = counter_to_detections(tracking_dets)
matched_counter = raw_counter & tracking_counter
missing_counter = raw_counter - tracking_counter
new_counter = tracking_counter - raw_counter
matched_count = sum(matched_counter.values())
missing_count = sum(missing_counter.values())
new_count = sum(new_counter.values())
totals["matched_detections"] += matched_count
totals["missing_detections"] += missing_count
totals["new_detections"] += new_count
missing_examples = expand_counter_delta(missing_counter, raw_det_by_sig)
new_examples = expand_counter_delta(new_counter, tracking_det_by_sig)
for det in missing_examples:
cls_id = int(det.get("class_id", -1))
per_class[cls_id]["missing"] += 1
for det in new_examples:
cls_id = int(det.get("class_id", -1))
per_class[cls_id]["new"] += 1
frame_report = {
"image_name": key,
"raw_present": raw_frame is not None,
"tracking_present": tracking_frame is not None,
"raw_count": raw_count,
"tracking_count": tracking_count,
"matched_count": matched_count,
"missing_count": missing_count,
"new_count": new_count,
}
if missing_examples:
for det in missing_examples[: max(0, top_k_samples - len(missing_samples))]:
missing_samples.append({"image_name": key, "detection": det})
if new_examples:
for det in new_examples[: max(0, top_k_samples - len(new_samples))]:
new_samples.append({"image_name": key, "detection": det})
frame_reports.append(frame_report)
frame_reports_sorted = sorted(
frame_reports,
key=lambda item: (item["missing_count"], item["new_count"], abs(item["raw_count"] - item["tracking_count"])),
reverse=True,
)
per_class_report = {
str(cls_id): values
for cls_id, values in sorted(per_class.items(), key=lambda item: item[0])
}
loss_rate = (
totals["missing_detections"] / totals["raw_detections"]
if totals["raw_detections"] > 0 else 0.0
)
new_rate = (
totals["new_detections"] / totals["tracking_detections"]
if totals["tracking_detections"] > 0 else 0.0
)
return {
"summary": {
**totals,
"loss_rate_vs_raw": loss_rate,
"new_rate_vs_tracking": new_rate,
},
"per_class": per_class_report,
"top_frames_by_diff": frame_reports_sorted[:top_k_frames],
"missing_samples": missing_samples,
"new_samples": new_samples,
"all_frame_reports": frame_reports,
}
def resolve_inputs(case_dir, source, raw_dir, tracking_path):
"""Resolve raw/tracking inputs from CLI arguments."""
if case_dir is not None:
case_dir = Path(case_dir)
resolved_raw = case_dir / source
resolved_tracking = case_dir / f"{source}.json"
else:
resolved_raw = Path(raw_dir) if raw_dir is not None else None
resolved_tracking = Path(tracking_path) if tracking_path is not None else None
if resolved_raw is None or resolved_tracking is None:
raise ValueError("Provide either --case-dir or both --raw-dir and --tracking.")
if not resolved_raw.is_dir():
raise FileNotFoundError(f"Raw input directory not found: {resolved_raw}")
if not resolved_tracking.is_file():
raise FileNotFoundError(f"Tracking JSON not found: {resolved_tracking}")
return resolved_raw, resolved_tracking
def print_report(report, raw_dir, tracking_path, source):
"""Pretty-print the key report numbers."""
summary = report["summary"]
print("")
print("======================================================================")
print(f"Tracking loss analysis for source: {source}")
print("======================================================================")
print(f"Raw directory : {raw_dir}")
print(f"Tracking JSON : {tracking_path}")
print(f"Raw frames : {summary['raw_frames']}")
print(f"Tracking frames : {summary['tracking_frames']}")
print(f"Shared frames : {summary['shared_frames']}")
print(f"Raw-only frames : {summary['raw_only_frames']}")
print(f"Track-only frames: {summary['tracking_only_frames']}")
print(f"Raw detections : {summary['raw_detections']}")
print(f"Track detections: {summary['tracking_detections']}")
print(f"Matched : {summary['matched_detections']}")
print(f"Missing : {summary['missing_detections']} ({summary['loss_rate_vs_raw']:.2%} of raw)")
print(f"New : {summary['new_detections']} ({summary['new_rate_vs_tracking']:.2%} of tracking)")
print("\nPer-class summary:")
for cls_id, stats in report["per_class"].items():
print(
f" class {cls_id}: raw={stats['raw']}, tracking={stats['tracking']}, "
f"missing={stats['missing']}, new={stats['new']}"
)
print("\nTop frames by difference:")
if not report["top_frames_by_diff"]:
print(" (no frames)")
for item in report["top_frames_by_diff"]:
print(
f" {item['image_name']}: raw={item['raw_count']}, tracking={item['tracking_count']}, "
f"missing={item['missing_count']}, new={item['new_count']}"
)
def main():
parser = argparse.ArgumentParser(
description="Analyze whether detections are lost from raw per-frame JSONs to tracking JSON."
)
parser.add_argument("--case-dir", type=str, default=None,
help="Case directory containing <source>/ and <source>.json")
parser.add_argument("--source", type=str, default="roi0",
help="Source name under case-dir, e.g. roi0 / roi1 / merge")
parser.add_argument("--raw-dir", type=str, default=None,
help="Raw per-frame JSON directory, used when --case-dir is omitted")
parser.add_argument("--tracking", type=str, default=None,
help="Tracking JSON path, used when --case-dir is omitted")
parser.add_argument("--file-pattern", type=str, default="*.json",
help="Glob pattern for raw frame JSON files (default: %(default)s)")
parser.add_argument("--output", type=str, default=None,
help="Optional JSON report output path")
parser.add_argument("--top-k-frames", type=int, default=20,
help="Number of most-different frames to include in the summary (default: %(default)s)")
parser.add_argument("--top-k-samples", type=int, default=200,
help="Maximum missing/new samples to store in the JSON report (default: %(default)s)")
args = parser.parse_args()
raw_dir, tracking_path = resolve_inputs(args.case_dir, args.source, args.raw_dir, args.tracking)
raw_frames = load_predictions_from_dir(raw_dir, pattern=args.file_pattern)
tracking_frames = load_tracking_frames(tracking_path)
report = analyze_pair(
raw_frames=raw_frames,
tracking_frames=tracking_frames,
top_k_frames=args.top_k_frames,
top_k_samples=args.top_k_samples,
)
print_report(report, raw_dir, tracking_path, args.source)
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\nReport written to: {output_path}")
if __name__ == "__main__":
main()