Files
yolov26_3d/tools/temporal_analysis/track_event_objects.py
2026-06-24 09:35:46 +08:00

563 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
"""Track exported inference outputs at event scope across multiple clips.
This tool aggregates all clip-level per-frame JSON predictions under one
event directory, orders frames globally by timestamp parsed from filenames,
and then reuses the existing tracking logic from track_objects.py to produce
one tracking result set per event.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
FILE = Path(__file__).resolve()
THIS_DIR = FILE.parent
if str(THIS_DIR) not in sys.path:
sys.path.insert(0, str(THIS_DIR))
from merge_tracking_results import TRACK_ID_OFFSET_PER_SOURCE # noqa: E402
from track_objects import ( # noqa: E402
TRACKED_CLASS_IDS,
count_unique_tracks,
parse_det_format,
save_tracking_results,
track_objects,
)
SOURCE_SPECS = (
("roi0", "roi0.json"),
("roi1", "roi1.json"),
("merge", "merge.json"),
)
DEFAULT_EVENT_OUTPUT_DIRNAME = "event_tracking"
@dataclass
class EventFrameRecord:
key: str
clip_case_name: str
clip_token: str
original_image_name: str
original_frame_id: Optional[int]
timestamp: Optional[int]
source_files: dict[str, Path] = field(default_factory=dict)
event_frame_index: int = -1
event_frame_id: int = -1
image_name: str = ""
def _safe_int(value: Any) -> Optional[int]:
if value is None:
return None
try:
return int(str(value).strip())
except (TypeError, ValueError):
return None
def _normalize_output_dir_token(value: Any) -> str:
token = re.sub(r'[\\/:*?"<>|\s]+', "_", str(value or "").strip())
return token.strip("._")
def _extract_date_name_from_records(records: Any) -> Optional[str]:
if not isinstance(records, list):
return None
for record in records:
if not isinstance(record, dict):
continue
source_record = record.get("source_record")
if not isinstance(source_record, dict):
continue
for key in ("date_name", "datename", "datetime", "date"):
value = source_record.get(key)
normalized = _normalize_output_dir_token(value)
if normalized:
return normalized
return None
def parse_frame_name_metadata(image_name: str) -> tuple[str, Optional[int], Optional[int]]:
"""Parse clip token, frame_id, and timestamp from an exported frame stem."""
stem = str(image_name or "").strip()
if not stem:
return "", None, None
parts = stem.split("_")
numeric_tail = []
while parts and parts[-1].isdigit() and len(numeric_tail) < 2:
numeric_tail.append(parts.pop())
numeric_tail.reverse()
clip_token = "_".join(parts).strip("_")
if len(numeric_tail) >= 2:
return clip_token, _safe_int(numeric_tail[0]), _safe_int(numeric_tail[1])
if len(numeric_tail) == 1:
return clip_token, _safe_int(numeric_tail[0]), None
return stem, None, None
def build_event_sort_key(record: EventFrameRecord) -> tuple[Any, ...]:
timestamp_missing = record.timestamp is None
timestamp_value = record.timestamp if record.timestamp is not None else float("inf")
frame_id_missing = record.original_frame_id is None
frame_id_value = record.original_frame_id if record.original_frame_id is not None else float("inf")
return (
timestamp_missing,
timestamp_value,
record.clip_case_name,
frame_id_missing,
frame_id_value,
record.original_image_name,
)
def find_event_case_dirs(event_dir: Path) -> list[Path]:
"""Return all clip-case directories directly under one event directory."""
case_dirs = []
for predictions_dir in sorted(event_dir.glob("*/predictions")):
if not predictions_dir.is_dir():
continue
case_dirs.append(predictions_dir.parent)
return case_dirs
def collect_event_frames(
event_dir: Path,
pattern: str,
*,
verbose: bool = True,
) -> tuple[list[EventFrameRecord], list[Path]]:
"""Collect and globally order all per-frame JSON files under one event."""
case_dirs = find_event_case_dirs(event_dir)
if not case_dirs:
raise FileNotFoundError(f"No clip case predictions found under event directory: {event_dir}")
frame_map: dict[str, EventFrameRecord] = {}
source_file_count = 0
for case_dir in case_dirs:
predictions_dir = case_dir / "predictions"
for source_name, _ in SOURCE_SPECS:
source_dir = predictions_dir / source_name
if not source_dir.is_dir():
continue
for json_file in sorted(source_dir.glob(pattern)):
source_file_count += 1
original_image_name = json_file.stem
clip_token, frame_id, timestamp = parse_frame_name_metadata(original_image_name)
key = f"{case_dir.name}:{original_image_name}"
record = frame_map.get(key)
if record is None:
record = EventFrameRecord(
key=key,
clip_case_name=case_dir.name,
clip_token=clip_token,
original_image_name=original_image_name,
original_frame_id=frame_id,
timestamp=timestamp,
)
frame_map[key] = record
elif record.timestamp is None and timestamp is not None:
record.timestamp = timestamp
elif record.original_frame_id is None and frame_id is not None:
record.original_frame_id = frame_id
record.source_files[source_name] = json_file
ordered_frames = sorted(frame_map.values(), key=build_event_sort_key)
if not ordered_frames:
raise FileNotFoundError(
f"No source frame JSON files matching {pattern!r} were found under event directory: {event_dir}"
)
for frame_index, record in enumerate(ordered_frames):
event_frame_id = frame_index + 1
timestamp_token = record.timestamp if record.timestamp is not None else event_frame_id
record.event_frame_index = frame_index
record.event_frame_id = event_frame_id
record.image_name = f"camera4_{event_frame_id:06d}_{int(timestamp_token)}"
if verbose:
print(f"Discovered {len(case_dirs)} clip case(s) under {event_dir}")
print(f"Collected {source_file_count} source frame JSON file(s)")
print(f"Built {len(ordered_frames)} event frame(s) after cross-clip ordering")
return ordered_frames, case_dirs
def load_event_metadata(event_dir: Path) -> dict[str, Any]:
"""Load optional event manifest metadata for reporting only."""
manifest_path = event_dir / "_status" / "event_manifest.json"
payload: dict[str, Any] = {}
if manifest_path.is_file():
with manifest_path.open("r", encoding="utf-8") as file:
payload = json.load(file)
event_id = payload.get("event_id", event_dir.name)
scene = payload.get("scene", event_dir.parent.name)
date_name = _extract_date_name_from_records(payload.get("records"))
if not date_name:
scene_manifest_path = event_dir.parent / "_status" / "scene_event_manifest.json"
if scene_manifest_path.is_file():
with scene_manifest_path.open("r", encoding="utf-8") as file:
scene_payload = json.load(file)
scene_records = scene_payload.get("records", [])
matched_records = [
record for record in scene_records
if isinstance(record, dict) and str(record.get("event_id", "")).strip() == str(event_id)
]
date_name = _extract_date_name_from_records(matched_records)
if not date_name:
date_name = DEFAULT_EVENT_OUTPUT_DIRNAME
return {
"event_id": event_id,
"scene": scene,
"manifest_path": str(manifest_path) if manifest_path.is_file() else "",
"clip_ids": payload.get("clip_ids", []),
"clip_count": int(payload.get("clip_count", 0) or 0),
"date_name": date_name,
}
def build_frame_info(record: EventFrameRecord, source_name: str) -> dict[str, Any]:
"""Build frame metadata that will be copied through track_objects.py."""
return {
"event_frame_index": record.event_frame_index,
"event_frame_id": record.event_frame_id,
"source": source_name,
"clip_case_name": record.clip_case_name,
"clip_token": record.clip_token,
"original_image_name": record.original_image_name,
"original_frame_id": record.original_frame_id,
"timestamp": record.timestamp,
"source_json_path": str(record.source_files[source_name]),
}
def load_source_predictions(
ordered_frames: list[EventFrameRecord],
source_name: str,
*,
model_version: Optional[str] = None,
) -> list[dict[str, Any]]:
"""Load all available frames for one source in event-global temporal order."""
predictions_data: list[dict[str, Any]] = []
for record in ordered_frames:
source_file = record.source_files.get(source_name)
if source_file is None:
continue
with source_file.open("r", encoding="utf-8") as file:
det_dict = json.load(file)
frame_info = build_frame_info(record, source_name)
frame_data = parse_det_format(
det_dict,
image_name=record.image_name,
timestamp_lookup=None,
model_version=model_version,
frame_info=frame_info,
)
frame_data["frame_info"] = frame_info
predictions_data.append(frame_data)
return predictions_data
def merge_event_tracking_results(
*,
ordered_frames: list[EventFrameRecord],
tracking_results_by_source: dict[str, list[dict[str, Any]]],
) -> list[dict[str, Any]]:
"""Merge per-source event tracking results while preserving event order."""
frame_maps = {
source_name: {
frame.get("image_name"): frame
for frame in tracking_results
}
for source_name, tracking_results in tracking_results_by_source.items()
}
merged_frames: list[dict[str, Any]] = []
for ordered_frame in ordered_frames:
image_name = ordered_frame.image_name
merged_detections = []
merged_stats = {}
frame_info = None
for source_idx, (source_name, _) in enumerate(SOURCE_SPECS):
frame = frame_maps.get(source_name, {}).get(image_name)
if frame is None:
continue
frame_info = frame_info or frame.get("frame_info")
for det in frame.get("detections", []):
tagged = dict(det)
tagged["lane_assignment"] = source_idx
if "track_id" in tagged and tagged["track_id"] is not None:
tagged["track_id"] = tagged["track_id"] + source_idx * TRACK_ID_OFFSET_PER_SOURCE
merged_detections.append(tagged)
if "tracking_stats" in frame:
merged_stats[source_name] = frame["tracking_stats"]
if not merged_detections and not merged_stats:
continue
merged_frame = {
"image_name": image_name,
"detections": merged_detections,
}
if frame_info is not None:
merged_frame["frame_info"] = frame_info
if merged_stats:
merged_frame["tracking_stats"] = merged_stats
merged_frames.append(merged_frame)
return merged_frames
def build_frame_manifest_payload(
*,
event_dir: Path,
output_dir: Path,
event_metadata: dict[str, Any],
case_dirs: list[Path],
ordered_frames: list[EventFrameRecord],
source_summaries: dict[str, dict[str, Any]],
merge_output_path: Path,
) -> dict[str, Any]:
return {
"event_dir": str(event_dir),
"output_dir": str(output_dir),
"event_id": event_metadata.get("event_id", event_dir.name),
"scene": event_metadata.get("scene", event_dir.parent.name),
"date_name": event_metadata.get("date_name", DEFAULT_EVENT_OUTPUT_DIRNAME),
"event_manifest_path": event_metadata.get("manifest_path", ""),
"clip_ids": event_metadata.get("clip_ids", []),
"clip_count": event_metadata.get("clip_count", len(case_dirs)),
"clip_case_dirs": [str(case_dir) for case_dir in case_dirs],
"source_summaries": source_summaries,
"merge_output_path": str(merge_output_path),
"event_frame_count": len(ordered_frames),
"frames": [
{
"event_frame_index": record.event_frame_index,
"event_frame_id": record.event_frame_id,
"image_name": record.image_name,
"timestamp": record.timestamp,
"clip_case_name": record.clip_case_name,
"clip_token": record.clip_token,
"original_image_name": record.original_image_name,
"original_frame_id": record.original_frame_id,
"source_files": {
source_name: str(path)
for source_name, path in sorted(record.source_files.items())
},
}
for record in ordered_frames
],
}
def run_event_tracking(
*,
event_dir: Path,
output_dir: Path,
file_pattern: str,
classes: list[int],
iou_threshold: float,
max_age: int,
min_hits: int,
distance_threshold: float,
use_3d: bool,
max_3d_distance: float,
model_version: Optional[str],
merge_output_name: str,
manifest_name: str,
verbose: bool = True,
) -> dict[str, Any]:
event_metadata = load_event_metadata(event_dir)
ordered_frames, case_dirs = collect_event_frames(event_dir, file_pattern, verbose=verbose)
output_dir.mkdir(parents=True, exist_ok=True)
tracking_results_by_source: dict[str, list[dict[str, Any]]] = {}
source_summaries: dict[str, dict[str, Any]] = {}
for source_name, output_name in SOURCE_SPECS:
predictions_data = load_source_predictions(
ordered_frames,
source_name,
model_version=model_version,
)
output_path = output_dir / output_name
if not predictions_data:
source_summaries[source_name] = {
"ok": False,
"reason": "no_frames",
"frames": 0,
"unique_tracks": 0,
"output_path": str(output_path),
}
if verbose:
print(f"Warning: no frames found for source {source_name} under {event_dir}")
continue
if verbose:
print("")
print(f"--- Tracking {source_name} at event scope ---")
print(f"Frames: {len(predictions_data)}")
print(f"Output: {output_path}")
tracking_results = track_objects(
predictions_data,
target_classes=classes,
iou_threshold=iou_threshold,
max_age=max_age,
min_hits=min_hits,
distance_threshold=distance_threshold,
use_3d=use_3d,
max_3d_distance=max_3d_distance,
verbose=verbose,
)
save_tracking_results(tracking_results, output_path)
tracking_results_by_source[source_name] = tracking_results
source_summaries[source_name] = {
"ok": True,
"frames": len(predictions_data),
"unique_tracks": count_unique_tracks(tracking_results),
"output_path": str(output_path),
}
combined_output_path = output_dir / merge_output_name
if not tracking_results_by_source:
raise RuntimeError(f"No valid source predictions were loaded for event: {event_dir}")
combined_tracking = merge_event_tracking_results(
ordered_frames=ordered_frames,
tracking_results_by_source=tracking_results_by_source,
)
save_tracking_results(combined_tracking, combined_output_path)
manifest_path = output_dir / manifest_name
manifest_payload = build_frame_manifest_payload(
event_dir=event_dir,
output_dir=output_dir,
event_metadata=event_metadata,
case_dirs=case_dirs,
ordered_frames=ordered_frames,
source_summaries=source_summaries,
merge_output_path=combined_output_path,
)
with manifest_path.open("w", encoding="utf-8") as file:
json.dump(manifest_payload, file, indent=2, ensure_ascii=False)
if verbose:
print("")
print("==========================================")
print(f"Event : {event_metadata.get('event_id', event_dir.name)}")
print(f"Scene : {event_metadata.get('scene', event_dir.parent.name)}")
print(f"Date : {event_metadata.get('date_name', DEFAULT_EVENT_OUTPUT_DIRNAME)}")
print(f"Clips : {len(case_dirs)}")
print(f"Frames : {len(ordered_frames)}")
print(f"Merge : {combined_output_path}")
print(f"Manifest: {manifest_path}")
for source_name, _ in SOURCE_SPECS:
summary = source_summaries.get(source_name, {})
status = "ok" if summary.get("ok") else summary.get("reason", "skipped")
print(
f" - {source_name}: {status}, frames={summary.get('frames', 0)}, "
f"tracks={summary.get('unique_tracks', 0)}"
)
print("==========================================")
return {
"event_dir": str(event_dir),
"output_dir": str(output_dir),
"manifest_path": str(manifest_path),
"merge_output_path": str(combined_output_path),
"event_frame_count": len(ordered_frames),
"clip_case_count": len(case_dirs),
"source_summaries": source_summaries,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Track all clip-level exported inference results under one event directory."
)
parser.add_argument("--event-dir", required=True, help="Event directory containing multiple clip-case outputs")
parser.add_argument(
"--output-dir",
default=None,
help="Output directory for event-level tracking results (default: <event-dir>/event_tracking)",
)
parser.add_argument("--file-pattern", default="*.json", help="Glob pattern for per-frame JSONs in each source dir")
parser.add_argument(
"--classes",
type=int,
nargs="+",
default=None,
help="Class IDs to track (default: track_objects.py defaults)",
)
parser.add_argument("--iou-threshold", type=float, default=0.3)
parser.add_argument("--max-age", type=int, default=5)
parser.add_argument("--min-hits", type=int, default=1)
parser.add_argument("--distance-threshold", type=float, default=100.0)
parser.add_argument("--model-version", type=str, default=None)
parser.add_argument("--use-3d", action="store_true")
parser.add_argument("--max-3d-distance", type=float, default=10.0)
parser.add_argument("--merge-output-name", type=str, default="combined_tracking.json")
parser.add_argument("--manifest-name", type=str, default="frame_order_manifest.json")
parser.add_argument("--quiet", action="store_true", help="Reduce progress logging")
return parser.parse_args()
def main() -> None:
args = parse_args()
event_dir = Path(args.event_dir).resolve()
if not event_dir.is_dir():
raise FileNotFoundError(f"Event directory does not exist: {event_dir}")
output_dir = (
Path(args.output_dir).resolve()
if args.output_dir is not None
else event_dir / load_event_metadata(event_dir).get("date_name", DEFAULT_EVENT_OUTPUT_DIRNAME)
)
classes = list(TRACKED_CLASS_IDS) if args.classes is None else [int(cls_id) for cls_id in args.classes]
run_event_tracking(
event_dir=event_dir,
output_dir=output_dir,
file_pattern=args.file_pattern,
classes=classes,
iou_threshold=args.iou_threshold,
max_age=args.max_age,
min_hits=args.min_hits,
distance_threshold=args.distance_threshold,
use_3d=args.use_3d,
max_3d_distance=args.max_3d_distance,
model_version=args.model_version,
merge_output_name=args.merge_output_name,
manifest_name=args.manifest_name,
verbose=not args.quiet,
)
if __name__ == "__main__":
main()