438 lines
16 KiB
Python
438 lines
16 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Analyze whether detections are lost between raw per-frame JSONs and tracking output.
|
||
|
|
|
||
|
|
Primary use case:
|
||
|
|
compare {case_dir}/roi0/*.json against {case_dir}/roi0.json
|
||
|
|
|
||
|
|
The script reports:
|
||
|
|
- frame-level coverage differences
|
||
|
|
- total / per-class detection count deltas
|
||
|
|
- per-frame missing/new detection counts
|
||
|
|
- detailed missing/new samples for debugging
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python tools/temporal_analysis/analyze_tracking_loss.py \
|
||
|
|
--case-dir /path/to/case --source roi0
|
||
|
|
|
||
|
|
python tools/temporal_analysis/analyze_tracking_loss.py \
|
||
|
|
--raw-dir /path/to/roi0 \
|
||
|
|
--tracking /path/to/roi0.json \
|
||
|
|
--output /path/to/roi0_loss_report.json
|
||
|
|
"""
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
from collections import Counter, defaultdict
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
from merge_tracking_results import normalize_image_name
|
||
|
|
|
||
|
|
|
||
|
|
def round_float(value, digits=6):
|
||
|
|
"""Round float-like values to stabilize comparison keys."""
|
||
|
|
return round(float(value), digits)
|
||
|
|
|
||
|
|
|
||
|
|
def normalize_bbox(bbox):
|
||
|
|
"""Convert bbox to a rounded tuple."""
|
||
|
|
return tuple(round_float(v) for v in bbox[:4])
|
||
|
|
|
||
|
|
|
||
|
|
def normalize_vector(values, limit=None):
|
||
|
|
"""Convert an optional numeric vector to a rounded tuple."""
|
||
|
|
if values is None:
|
||
|
|
return None
|
||
|
|
seq = values if limit is None else values[:limit]
|
||
|
|
return tuple(round_float(v) for v in seq)
|
||
|
|
|
||
|
|
|
||
|
|
def detection_signature(det):
|
||
|
|
"""Build a stable comparison signature for one detection."""
|
||
|
|
return (
|
||
|
|
int(det.get("class_id", -1)),
|
||
|
|
normalize_bbox(det.get("bbox", [0, 0, 0, 0])),
|
||
|
|
round_float(det.get("confidence", 0.0)),
|
||
|
|
str(det.get("type_name", "")),
|
||
|
|
str(det.get("face_cls", "")),
|
||
|
|
int(det.get("cut_cls", -1)),
|
||
|
|
str(det.get("cut_cls_name", "")),
|
||
|
|
str(det.get("anchor", "")),
|
||
|
|
normalize_vector(det.get("object_3d"), limit=7),
|
||
|
|
normalize_vector(det.get("object_3d_ego"), limit=7),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def serialize_detection(det):
|
||
|
|
"""Return a compact JSON-friendly detection summary."""
|
||
|
|
data = {
|
||
|
|
"class_id": det.get("class_id"),
|
||
|
|
"bbox": det.get("bbox"),
|
||
|
|
"confidence": det.get("confidence"),
|
||
|
|
}
|
||
|
|
optional_keys = [
|
||
|
|
"track_id",
|
||
|
|
"type_name",
|
||
|
|
"face_cls",
|
||
|
|
"cut_cls",
|
||
|
|
"cut_cls_name",
|
||
|
|
"anchor",
|
||
|
|
"frameId",
|
||
|
|
]
|
||
|
|
for key in optional_keys:
|
||
|
|
if key in det:
|
||
|
|
data[key] = det.get(key)
|
||
|
|
return data
|
||
|
|
|
||
|
|
|
||
|
|
def parse_det_format(det_dict, image_name=None):
|
||
|
|
"""Parse raw single-frame detection JSON into the tracking input schema."""
|
||
|
|
if "detections" in det_dict and isinstance(det_dict["detections"], dict):
|
||
|
|
raw_detections = det_dict["detections"]
|
||
|
|
else:
|
||
|
|
raw_detections = det_dict
|
||
|
|
|
||
|
|
face_map = {
|
||
|
|
"front": "kMonocular3DFront",
|
||
|
|
"tail": "kMonocular3DRear",
|
||
|
|
"back": "kMonocular3DRear",
|
||
|
|
"left": "kMonocular3DLeft",
|
||
|
|
"right": "kMonocular3DRight",
|
||
|
|
"center": "kMonocular3DCenter",
|
||
|
|
"none": "kMonocular3DCenter",
|
||
|
|
}
|
||
|
|
|
||
|
|
detections = []
|
||
|
|
for det in raw_detections.values():
|
||
|
|
class_id = int(det["type"])
|
||
|
|
bbox = [float(v) for v in det["box2d"]]
|
||
|
|
score = float(det["score"])
|
||
|
|
|
||
|
|
xyzlhwyaw_raw = det.get("xyzlhwyaw", [])
|
||
|
|
object_3d = None
|
||
|
|
if xyzlhwyaw_raw and float(xyzlhwyaw_raw[0]) != -1:
|
||
|
|
object_3d = [float(v) for v in xyzlhwyaw_raw]
|
||
|
|
|
||
|
|
xyzlhwyaw_ego_raw = det.get("xyzlhwyaw_ego", [])
|
||
|
|
object_3d_ego = None
|
||
|
|
if xyzlhwyaw_ego_raw and float(xyzlhwyaw_ego_raw[0]) != -1:
|
||
|
|
object_3d_ego = [float(v) for v in xyzlhwyaw_ego_raw]
|
||
|
|
|
||
|
|
detection = {
|
||
|
|
"bbox": bbox,
|
||
|
|
"confidence": score,
|
||
|
|
"class_id": class_id,
|
||
|
|
"type_name": det.get("type_name", ""),
|
||
|
|
"face_cls": det.get("face_cls", "none"),
|
||
|
|
"cut_cls": int(det.get("cut_cls", -1)),
|
||
|
|
"cut_cls_name": det.get("cut_cls_name", "none"),
|
||
|
|
"frameId": normalize_image_name(image_name).split("_")[-1] if image_name else None,
|
||
|
|
"version": "20260228",
|
||
|
|
"timestamp": 0,
|
||
|
|
}
|
||
|
|
detection["anchor"] = face_map.get(detection["face_cls"], "kMonocular3DCenter")
|
||
|
|
|
||
|
|
if object_3d is not None:
|
||
|
|
detection["object_3d"] = object_3d
|
||
|
|
if object_3d_ego is not None:
|
||
|
|
detection["object_3d_ego"] = object_3d_ego
|
||
|
|
|
||
|
|
detections.append(detection)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"image_name": image_name,
|
||
|
|
"detections": detections,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def load_predictions_from_dir(input_dir, pattern="*.json"):
|
||
|
|
"""Load raw per-frame JSON files using the same schema as tracking input."""
|
||
|
|
input_dir = Path(input_dir)
|
||
|
|
json_files = sorted(input_dir.glob(pattern))
|
||
|
|
predictions_data = []
|
||
|
|
for json_file in json_files:
|
||
|
|
with open(json_file, "r", encoding="utf-8") as f:
|
||
|
|
det_dict = json.load(f)
|
||
|
|
predictions_data.append(parse_det_format(det_dict, image_name=json_file.stem))
|
||
|
|
return predictions_data
|
||
|
|
|
||
|
|
|
||
|
|
def load_tracking_frames(tracking_path):
|
||
|
|
"""Load tracking frames from track_objects.py output."""
|
||
|
|
with open(tracking_path, "r", encoding="utf-8") as f:
|
||
|
|
data = json.load(f)
|
||
|
|
if not isinstance(data, list):
|
||
|
|
raise ValueError(f"Expected a list of frames in {tracking_path}, got {type(data).__name__}")
|
||
|
|
return data
|
||
|
|
|
||
|
|
|
||
|
|
def index_frames(frames):
|
||
|
|
"""Index frames by normalized image name."""
|
||
|
|
indexed = {}
|
||
|
|
ordered_keys = []
|
||
|
|
for frame in frames:
|
||
|
|
raw_name = frame.get("image_name") or ""
|
||
|
|
key = normalize_image_name(Path(raw_name).stem if raw_name else "")
|
||
|
|
indexed[key] = frame
|
||
|
|
ordered_keys.append(key)
|
||
|
|
return indexed, ordered_keys
|
||
|
|
|
||
|
|
|
||
|
|
def counter_to_detections(detections):
|
||
|
|
"""Build a multiset of detection signatures."""
|
||
|
|
counter = Counter()
|
||
|
|
det_by_sig = defaultdict(list)
|
||
|
|
for det in detections:
|
||
|
|
sig = detection_signature(det)
|
||
|
|
counter[sig] += 1
|
||
|
|
det_by_sig[sig].append(det)
|
||
|
|
return counter, det_by_sig
|
||
|
|
|
||
|
|
|
||
|
|
def expand_counter_delta(delta_counter, det_by_sig):
|
||
|
|
"""Materialize counter deltas back into example detections."""
|
||
|
|
items = []
|
||
|
|
for sig, count in delta_counter.items():
|
||
|
|
for idx in range(count):
|
||
|
|
items.append(serialize_detection(det_by_sig[sig][idx]))
|
||
|
|
return items
|
||
|
|
|
||
|
|
|
||
|
|
def compute_class_counts(detections):
|
||
|
|
"""Count detections by class_id."""
|
||
|
|
counts = Counter()
|
||
|
|
for det in detections:
|
||
|
|
counts[int(det.get("class_id", -1))] += 1
|
||
|
|
return counts
|
||
|
|
|
||
|
|
|
||
|
|
def analyze_pair(raw_frames, tracking_frames, top_k_frames=20, top_k_samples=200):
|
||
|
|
"""Compare raw parsed frames with tracking output frames."""
|
||
|
|
raw_index, raw_order = index_frames(raw_frames)
|
||
|
|
tracking_index, tracking_order = index_frames(tracking_frames)
|
||
|
|
all_keys = []
|
||
|
|
seen = set()
|
||
|
|
for key in raw_order + tracking_order:
|
||
|
|
if key not in seen:
|
||
|
|
all_keys.append(key)
|
||
|
|
seen.add(key)
|
||
|
|
|
||
|
|
frame_reports = []
|
||
|
|
totals = {
|
||
|
|
"raw_frames": len(raw_frames),
|
||
|
|
"tracking_frames": len(tracking_frames),
|
||
|
|
"shared_frames": 0,
|
||
|
|
"raw_only_frames": 0,
|
||
|
|
"tracking_only_frames": 0,
|
||
|
|
"raw_detections": 0,
|
||
|
|
"tracking_detections": 0,
|
||
|
|
"matched_detections": 0,
|
||
|
|
"missing_detections": 0,
|
||
|
|
"new_detections": 0,
|
||
|
|
}
|
||
|
|
per_class = defaultdict(lambda: {"raw": 0, "tracking": 0, "missing": 0, "new": 0})
|
||
|
|
missing_samples = []
|
||
|
|
new_samples = []
|
||
|
|
|
||
|
|
for key in all_keys:
|
||
|
|
raw_frame = raw_index.get(key)
|
||
|
|
tracking_frame = tracking_index.get(key)
|
||
|
|
raw_dets = raw_frame.get("detections", []) if raw_frame else []
|
||
|
|
tracking_dets = tracking_frame.get("detections", []) if tracking_frame else []
|
||
|
|
|
||
|
|
if raw_frame and tracking_frame:
|
||
|
|
totals["shared_frames"] += 1
|
||
|
|
elif raw_frame:
|
||
|
|
totals["raw_only_frames"] += 1
|
||
|
|
else:
|
||
|
|
totals["tracking_only_frames"] += 1
|
||
|
|
|
||
|
|
raw_count = len(raw_dets)
|
||
|
|
tracking_count = len(tracking_dets)
|
||
|
|
totals["raw_detections"] += raw_count
|
||
|
|
totals["tracking_detections"] += tracking_count
|
||
|
|
|
||
|
|
raw_by_class = compute_class_counts(raw_dets)
|
||
|
|
tracking_by_class = compute_class_counts(tracking_dets)
|
||
|
|
for cls_id, count in raw_by_class.items():
|
||
|
|
per_class[cls_id]["raw"] += count
|
||
|
|
for cls_id, count in tracking_by_class.items():
|
||
|
|
per_class[cls_id]["tracking"] += count
|
||
|
|
|
||
|
|
raw_counter, raw_det_by_sig = counter_to_detections(raw_dets)
|
||
|
|
tracking_counter, tracking_det_by_sig = counter_to_detections(tracking_dets)
|
||
|
|
matched_counter = raw_counter & tracking_counter
|
||
|
|
missing_counter = raw_counter - tracking_counter
|
||
|
|
new_counter = tracking_counter - raw_counter
|
||
|
|
|
||
|
|
matched_count = sum(matched_counter.values())
|
||
|
|
missing_count = sum(missing_counter.values())
|
||
|
|
new_count = sum(new_counter.values())
|
||
|
|
|
||
|
|
totals["matched_detections"] += matched_count
|
||
|
|
totals["missing_detections"] += missing_count
|
||
|
|
totals["new_detections"] += new_count
|
||
|
|
|
||
|
|
missing_examples = expand_counter_delta(missing_counter, raw_det_by_sig)
|
||
|
|
new_examples = expand_counter_delta(new_counter, tracking_det_by_sig)
|
||
|
|
|
||
|
|
for det in missing_examples:
|
||
|
|
cls_id = int(det.get("class_id", -1))
|
||
|
|
per_class[cls_id]["missing"] += 1
|
||
|
|
for det in new_examples:
|
||
|
|
cls_id = int(det.get("class_id", -1))
|
||
|
|
per_class[cls_id]["new"] += 1
|
||
|
|
|
||
|
|
frame_report = {
|
||
|
|
"image_name": key,
|
||
|
|
"raw_present": raw_frame is not None,
|
||
|
|
"tracking_present": tracking_frame is not None,
|
||
|
|
"raw_count": raw_count,
|
||
|
|
"tracking_count": tracking_count,
|
||
|
|
"matched_count": matched_count,
|
||
|
|
"missing_count": missing_count,
|
||
|
|
"new_count": new_count,
|
||
|
|
}
|
||
|
|
|
||
|
|
if missing_examples:
|
||
|
|
for det in missing_examples[: max(0, top_k_samples - len(missing_samples))]:
|
||
|
|
missing_samples.append({"image_name": key, "detection": det})
|
||
|
|
if new_examples:
|
||
|
|
for det in new_examples[: max(0, top_k_samples - len(new_samples))]:
|
||
|
|
new_samples.append({"image_name": key, "detection": det})
|
||
|
|
|
||
|
|
frame_reports.append(frame_report)
|
||
|
|
|
||
|
|
frame_reports_sorted = sorted(
|
||
|
|
frame_reports,
|
||
|
|
key=lambda item: (item["missing_count"], item["new_count"], abs(item["raw_count"] - item["tracking_count"])),
|
||
|
|
reverse=True,
|
||
|
|
)
|
||
|
|
per_class_report = {
|
||
|
|
str(cls_id): values
|
||
|
|
for cls_id, values in sorted(per_class.items(), key=lambda item: item[0])
|
||
|
|
}
|
||
|
|
|
||
|
|
loss_rate = (
|
||
|
|
totals["missing_detections"] / totals["raw_detections"]
|
||
|
|
if totals["raw_detections"] > 0 else 0.0
|
||
|
|
)
|
||
|
|
new_rate = (
|
||
|
|
totals["new_detections"] / totals["tracking_detections"]
|
||
|
|
if totals["tracking_detections"] > 0 else 0.0
|
||
|
|
)
|
||
|
|
|
||
|
|
return {
|
||
|
|
"summary": {
|
||
|
|
**totals,
|
||
|
|
"loss_rate_vs_raw": loss_rate,
|
||
|
|
"new_rate_vs_tracking": new_rate,
|
||
|
|
},
|
||
|
|
"per_class": per_class_report,
|
||
|
|
"top_frames_by_diff": frame_reports_sorted[:top_k_frames],
|
||
|
|
"missing_samples": missing_samples,
|
||
|
|
"new_samples": new_samples,
|
||
|
|
"all_frame_reports": frame_reports,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def resolve_inputs(case_dir, source, raw_dir, tracking_path):
|
||
|
|
"""Resolve raw/tracking inputs from CLI arguments."""
|
||
|
|
if case_dir is not None:
|
||
|
|
case_dir = Path(case_dir)
|
||
|
|
resolved_raw = case_dir / source
|
||
|
|
resolved_tracking = case_dir / f"{source}.json"
|
||
|
|
else:
|
||
|
|
resolved_raw = Path(raw_dir) if raw_dir is not None else None
|
||
|
|
resolved_tracking = Path(tracking_path) if tracking_path is not None else None
|
||
|
|
|
||
|
|
if resolved_raw is None or resolved_tracking is None:
|
||
|
|
raise ValueError("Provide either --case-dir or both --raw-dir and --tracking.")
|
||
|
|
if not resolved_raw.is_dir():
|
||
|
|
raise FileNotFoundError(f"Raw input directory not found: {resolved_raw}")
|
||
|
|
if not resolved_tracking.is_file():
|
||
|
|
raise FileNotFoundError(f"Tracking JSON not found: {resolved_tracking}")
|
||
|
|
return resolved_raw, resolved_tracking
|
||
|
|
|
||
|
|
|
||
|
|
def print_report(report, raw_dir, tracking_path, source):
|
||
|
|
"""Pretty-print the key report numbers."""
|
||
|
|
summary = report["summary"]
|
||
|
|
print("")
|
||
|
|
print("======================================================================")
|
||
|
|
print(f"Tracking loss analysis for source: {source}")
|
||
|
|
print("======================================================================")
|
||
|
|
print(f"Raw directory : {raw_dir}")
|
||
|
|
print(f"Tracking JSON : {tracking_path}")
|
||
|
|
print(f"Raw frames : {summary['raw_frames']}")
|
||
|
|
print(f"Tracking frames : {summary['tracking_frames']}")
|
||
|
|
print(f"Shared frames : {summary['shared_frames']}")
|
||
|
|
print(f"Raw-only frames : {summary['raw_only_frames']}")
|
||
|
|
print(f"Track-only frames: {summary['tracking_only_frames']}")
|
||
|
|
print(f"Raw detections : {summary['raw_detections']}")
|
||
|
|
print(f"Track detections: {summary['tracking_detections']}")
|
||
|
|
print(f"Matched : {summary['matched_detections']}")
|
||
|
|
print(f"Missing : {summary['missing_detections']} ({summary['loss_rate_vs_raw']:.2%} of raw)")
|
||
|
|
print(f"New : {summary['new_detections']} ({summary['new_rate_vs_tracking']:.2%} of tracking)")
|
||
|
|
|
||
|
|
print("\nPer-class summary:")
|
||
|
|
for cls_id, stats in report["per_class"].items():
|
||
|
|
print(
|
||
|
|
f" class {cls_id}: raw={stats['raw']}, tracking={stats['tracking']}, "
|
||
|
|
f"missing={stats['missing']}, new={stats['new']}"
|
||
|
|
)
|
||
|
|
|
||
|
|
print("\nTop frames by difference:")
|
||
|
|
if not report["top_frames_by_diff"]:
|
||
|
|
print(" (no frames)")
|
||
|
|
for item in report["top_frames_by_diff"]:
|
||
|
|
print(
|
||
|
|
f" {item['image_name']}: raw={item['raw_count']}, tracking={item['tracking_count']}, "
|
||
|
|
f"missing={item['missing_count']}, new={item['new_count']}"
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Analyze whether detections are lost from raw per-frame JSONs to tracking JSON."
|
||
|
|
)
|
||
|
|
parser.add_argument("--case-dir", type=str, default=None,
|
||
|
|
help="Case directory containing <source>/ and <source>.json")
|
||
|
|
parser.add_argument("--source", type=str, default="roi0",
|
||
|
|
help="Source name under case-dir, e.g. roi0 / roi1 / merge")
|
||
|
|
parser.add_argument("--raw-dir", type=str, default=None,
|
||
|
|
help="Raw per-frame JSON directory, used when --case-dir is omitted")
|
||
|
|
parser.add_argument("--tracking", type=str, default=None,
|
||
|
|
help="Tracking JSON path, used when --case-dir is omitted")
|
||
|
|
parser.add_argument("--file-pattern", type=str, default="*.json",
|
||
|
|
help="Glob pattern for raw frame JSON files (default: %(default)s)")
|
||
|
|
parser.add_argument("--output", type=str, default=None,
|
||
|
|
help="Optional JSON report output path")
|
||
|
|
parser.add_argument("--top-k-frames", type=int, default=20,
|
||
|
|
help="Number of most-different frames to include in the summary (default: %(default)s)")
|
||
|
|
parser.add_argument("--top-k-samples", type=int, default=200,
|
||
|
|
help="Maximum missing/new samples to store in the JSON report (default: %(default)s)")
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
raw_dir, tracking_path = resolve_inputs(args.case_dir, args.source, args.raw_dir, args.tracking)
|
||
|
|
raw_frames = load_predictions_from_dir(raw_dir, pattern=args.file_pattern)
|
||
|
|
tracking_frames = load_tracking_frames(tracking_path)
|
||
|
|
|
||
|
|
report = analyze_pair(
|
||
|
|
raw_frames=raw_frames,
|
||
|
|
tracking_frames=tracking_frames,
|
||
|
|
top_k_frames=args.top_k_frames,
|
||
|
|
top_k_samples=args.top_k_samples,
|
||
|
|
)
|
||
|
|
print_report(report, raw_dir, tracking_path, args.source)
|
||
|
|
|
||
|
|
if args.output:
|
||
|
|
output_path = Path(args.output)
|
||
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
||
|
|
json.dump(report, f, indent=2, ensure_ascii=False)
|
||
|
|
print(f"\nReport written to: {output_path}")
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|