563 lines
20 KiB
Python
Executable File
563 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Track exported inference outputs at event scope across multiple clips.
|
|
|
|
This tool aggregates all clip-level per-frame JSON predictions under one
|
|
event directory, orders frames globally by timestamp parsed from filenames,
|
|
and then reuses the existing tracking logic from track_objects.py to produce
|
|
one tracking result set per event.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
|
|
FILE = Path(__file__).resolve()
|
|
THIS_DIR = FILE.parent
|
|
if str(THIS_DIR) not in sys.path:
|
|
sys.path.insert(0, str(THIS_DIR))
|
|
|
|
from merge_tracking_results import TRACK_ID_OFFSET_PER_SOURCE # noqa: E402
|
|
from track_objects import ( # noqa: E402
|
|
TRACKED_CLASS_IDS,
|
|
count_unique_tracks,
|
|
parse_det_format,
|
|
save_tracking_results,
|
|
track_objects,
|
|
)
|
|
|
|
|
|
SOURCE_SPECS = (
|
|
("roi0", "roi0.json"),
|
|
("roi1", "roi1.json"),
|
|
("merge", "merge.json"),
|
|
)
|
|
DEFAULT_EVENT_OUTPUT_DIRNAME = "event_tracking"
|
|
|
|
|
|
@dataclass
|
|
class EventFrameRecord:
|
|
key: str
|
|
clip_case_name: str
|
|
clip_token: str
|
|
original_image_name: str
|
|
original_frame_id: Optional[int]
|
|
timestamp: Optional[int]
|
|
source_files: dict[str, Path] = field(default_factory=dict)
|
|
event_frame_index: int = -1
|
|
event_frame_id: int = -1
|
|
image_name: str = ""
|
|
|
|
|
|
def _safe_int(value: Any) -> Optional[int]:
|
|
if value is None:
|
|
return None
|
|
try:
|
|
return int(str(value).strip())
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _normalize_output_dir_token(value: Any) -> str:
|
|
token = re.sub(r'[\\/:*?"<>|\s]+', "_", str(value or "").strip())
|
|
return token.strip("._")
|
|
|
|
|
|
def _extract_date_name_from_records(records: Any) -> Optional[str]:
|
|
if not isinstance(records, list):
|
|
return None
|
|
|
|
for record in records:
|
|
if not isinstance(record, dict):
|
|
continue
|
|
source_record = record.get("source_record")
|
|
if not isinstance(source_record, dict):
|
|
continue
|
|
for key in ("date_name", "datename", "datetime", "date"):
|
|
value = source_record.get(key)
|
|
normalized = _normalize_output_dir_token(value)
|
|
if normalized:
|
|
return normalized
|
|
return None
|
|
|
|
|
|
def parse_frame_name_metadata(image_name: str) -> tuple[str, Optional[int], Optional[int]]:
|
|
"""Parse clip token, frame_id, and timestamp from an exported frame stem."""
|
|
stem = str(image_name or "").strip()
|
|
if not stem:
|
|
return "", None, None
|
|
|
|
parts = stem.split("_")
|
|
numeric_tail = []
|
|
while parts and parts[-1].isdigit() and len(numeric_tail) < 2:
|
|
numeric_tail.append(parts.pop())
|
|
|
|
numeric_tail.reverse()
|
|
clip_token = "_".join(parts).strip("_")
|
|
if len(numeric_tail) >= 2:
|
|
return clip_token, _safe_int(numeric_tail[0]), _safe_int(numeric_tail[1])
|
|
if len(numeric_tail) == 1:
|
|
return clip_token, _safe_int(numeric_tail[0]), None
|
|
return stem, None, None
|
|
|
|
|
|
def build_event_sort_key(record: EventFrameRecord) -> tuple[Any, ...]:
|
|
timestamp_missing = record.timestamp is None
|
|
timestamp_value = record.timestamp if record.timestamp is not None else float("inf")
|
|
frame_id_missing = record.original_frame_id is None
|
|
frame_id_value = record.original_frame_id if record.original_frame_id is not None else float("inf")
|
|
return (
|
|
timestamp_missing,
|
|
timestamp_value,
|
|
record.clip_case_name,
|
|
frame_id_missing,
|
|
frame_id_value,
|
|
record.original_image_name,
|
|
)
|
|
|
|
|
|
def find_event_case_dirs(event_dir: Path) -> list[Path]:
|
|
"""Return all clip-case directories directly under one event directory."""
|
|
case_dirs = []
|
|
for predictions_dir in sorted(event_dir.glob("*/predictions")):
|
|
if not predictions_dir.is_dir():
|
|
continue
|
|
case_dirs.append(predictions_dir.parent)
|
|
return case_dirs
|
|
|
|
|
|
def collect_event_frames(
|
|
event_dir: Path,
|
|
pattern: str,
|
|
*,
|
|
verbose: bool = True,
|
|
) -> tuple[list[EventFrameRecord], list[Path]]:
|
|
"""Collect and globally order all per-frame JSON files under one event."""
|
|
case_dirs = find_event_case_dirs(event_dir)
|
|
if not case_dirs:
|
|
raise FileNotFoundError(f"No clip case predictions found under event directory: {event_dir}")
|
|
|
|
frame_map: dict[str, EventFrameRecord] = {}
|
|
source_file_count = 0
|
|
|
|
for case_dir in case_dirs:
|
|
predictions_dir = case_dir / "predictions"
|
|
for source_name, _ in SOURCE_SPECS:
|
|
source_dir = predictions_dir / source_name
|
|
if not source_dir.is_dir():
|
|
continue
|
|
|
|
for json_file in sorted(source_dir.glob(pattern)):
|
|
source_file_count += 1
|
|
original_image_name = json_file.stem
|
|
clip_token, frame_id, timestamp = parse_frame_name_metadata(original_image_name)
|
|
key = f"{case_dir.name}:{original_image_name}"
|
|
record = frame_map.get(key)
|
|
if record is None:
|
|
record = EventFrameRecord(
|
|
key=key,
|
|
clip_case_name=case_dir.name,
|
|
clip_token=clip_token,
|
|
original_image_name=original_image_name,
|
|
original_frame_id=frame_id,
|
|
timestamp=timestamp,
|
|
)
|
|
frame_map[key] = record
|
|
elif record.timestamp is None and timestamp is not None:
|
|
record.timestamp = timestamp
|
|
elif record.original_frame_id is None and frame_id is not None:
|
|
record.original_frame_id = frame_id
|
|
|
|
record.source_files[source_name] = json_file
|
|
|
|
ordered_frames = sorted(frame_map.values(), key=build_event_sort_key)
|
|
if not ordered_frames:
|
|
raise FileNotFoundError(
|
|
f"No source frame JSON files matching {pattern!r} were found under event directory: {event_dir}"
|
|
)
|
|
|
|
for frame_index, record in enumerate(ordered_frames):
|
|
event_frame_id = frame_index + 1
|
|
timestamp_token = record.timestamp if record.timestamp is not None else event_frame_id
|
|
record.event_frame_index = frame_index
|
|
record.event_frame_id = event_frame_id
|
|
record.image_name = f"camera4_{event_frame_id:06d}_{int(timestamp_token)}"
|
|
|
|
if verbose:
|
|
print(f"Discovered {len(case_dirs)} clip case(s) under {event_dir}")
|
|
print(f"Collected {source_file_count} source frame JSON file(s)")
|
|
print(f"Built {len(ordered_frames)} event frame(s) after cross-clip ordering")
|
|
|
|
return ordered_frames, case_dirs
|
|
|
|
|
|
def load_event_metadata(event_dir: Path) -> dict[str, Any]:
|
|
"""Load optional event manifest metadata for reporting only."""
|
|
manifest_path = event_dir / "_status" / "event_manifest.json"
|
|
payload: dict[str, Any] = {}
|
|
if manifest_path.is_file():
|
|
with manifest_path.open("r", encoding="utf-8") as file:
|
|
payload = json.load(file)
|
|
|
|
event_id = payload.get("event_id", event_dir.name)
|
|
scene = payload.get("scene", event_dir.parent.name)
|
|
date_name = _extract_date_name_from_records(payload.get("records"))
|
|
|
|
if not date_name:
|
|
scene_manifest_path = event_dir.parent / "_status" / "scene_event_manifest.json"
|
|
if scene_manifest_path.is_file():
|
|
with scene_manifest_path.open("r", encoding="utf-8") as file:
|
|
scene_payload = json.load(file)
|
|
scene_records = scene_payload.get("records", [])
|
|
matched_records = [
|
|
record for record in scene_records
|
|
if isinstance(record, dict) and str(record.get("event_id", "")).strip() == str(event_id)
|
|
]
|
|
date_name = _extract_date_name_from_records(matched_records)
|
|
|
|
if not date_name:
|
|
date_name = DEFAULT_EVENT_OUTPUT_DIRNAME
|
|
|
|
return {
|
|
"event_id": event_id,
|
|
"scene": scene,
|
|
"manifest_path": str(manifest_path) if manifest_path.is_file() else "",
|
|
"clip_ids": payload.get("clip_ids", []),
|
|
"clip_count": int(payload.get("clip_count", 0) or 0),
|
|
"date_name": date_name,
|
|
}
|
|
|
|
|
|
def build_frame_info(record: EventFrameRecord, source_name: str) -> dict[str, Any]:
|
|
"""Build frame metadata that will be copied through track_objects.py."""
|
|
return {
|
|
"event_frame_index": record.event_frame_index,
|
|
"event_frame_id": record.event_frame_id,
|
|
"source": source_name,
|
|
"clip_case_name": record.clip_case_name,
|
|
"clip_token": record.clip_token,
|
|
"original_image_name": record.original_image_name,
|
|
"original_frame_id": record.original_frame_id,
|
|
"timestamp": record.timestamp,
|
|
"source_json_path": str(record.source_files[source_name]),
|
|
}
|
|
|
|
|
|
def load_source_predictions(
|
|
ordered_frames: list[EventFrameRecord],
|
|
source_name: str,
|
|
*,
|
|
model_version: Optional[str] = None,
|
|
) -> list[dict[str, Any]]:
|
|
"""Load all available frames for one source in event-global temporal order."""
|
|
predictions_data: list[dict[str, Any]] = []
|
|
for record in ordered_frames:
|
|
source_file = record.source_files.get(source_name)
|
|
if source_file is None:
|
|
continue
|
|
|
|
with source_file.open("r", encoding="utf-8") as file:
|
|
det_dict = json.load(file)
|
|
frame_info = build_frame_info(record, source_name)
|
|
frame_data = parse_det_format(
|
|
det_dict,
|
|
image_name=record.image_name,
|
|
timestamp_lookup=None,
|
|
model_version=model_version,
|
|
frame_info=frame_info,
|
|
)
|
|
frame_data["frame_info"] = frame_info
|
|
predictions_data.append(frame_data)
|
|
return predictions_data
|
|
|
|
|
|
def merge_event_tracking_results(
|
|
*,
|
|
ordered_frames: list[EventFrameRecord],
|
|
tracking_results_by_source: dict[str, list[dict[str, Any]]],
|
|
) -> list[dict[str, Any]]:
|
|
"""Merge per-source event tracking results while preserving event order."""
|
|
frame_maps = {
|
|
source_name: {
|
|
frame.get("image_name"): frame
|
|
for frame in tracking_results
|
|
}
|
|
for source_name, tracking_results in tracking_results_by_source.items()
|
|
}
|
|
|
|
merged_frames: list[dict[str, Any]] = []
|
|
for ordered_frame in ordered_frames:
|
|
image_name = ordered_frame.image_name
|
|
merged_detections = []
|
|
merged_stats = {}
|
|
frame_info = None
|
|
|
|
for source_idx, (source_name, _) in enumerate(SOURCE_SPECS):
|
|
frame = frame_maps.get(source_name, {}).get(image_name)
|
|
if frame is None:
|
|
continue
|
|
|
|
frame_info = frame_info or frame.get("frame_info")
|
|
for det in frame.get("detections", []):
|
|
tagged = dict(det)
|
|
tagged["lane_assignment"] = source_idx
|
|
if "track_id" in tagged and tagged["track_id"] is not None:
|
|
tagged["track_id"] = tagged["track_id"] + source_idx * TRACK_ID_OFFSET_PER_SOURCE
|
|
merged_detections.append(tagged)
|
|
|
|
if "tracking_stats" in frame:
|
|
merged_stats[source_name] = frame["tracking_stats"]
|
|
|
|
if not merged_detections and not merged_stats:
|
|
continue
|
|
|
|
merged_frame = {
|
|
"image_name": image_name,
|
|
"detections": merged_detections,
|
|
}
|
|
if frame_info is not None:
|
|
merged_frame["frame_info"] = frame_info
|
|
if merged_stats:
|
|
merged_frame["tracking_stats"] = merged_stats
|
|
merged_frames.append(merged_frame)
|
|
|
|
return merged_frames
|
|
|
|
|
|
def build_frame_manifest_payload(
|
|
*,
|
|
event_dir: Path,
|
|
output_dir: Path,
|
|
event_metadata: dict[str, Any],
|
|
case_dirs: list[Path],
|
|
ordered_frames: list[EventFrameRecord],
|
|
source_summaries: dict[str, dict[str, Any]],
|
|
merge_output_path: Path,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"event_dir": str(event_dir),
|
|
"output_dir": str(output_dir),
|
|
"event_id": event_metadata.get("event_id", event_dir.name),
|
|
"scene": event_metadata.get("scene", event_dir.parent.name),
|
|
"date_name": event_metadata.get("date_name", DEFAULT_EVENT_OUTPUT_DIRNAME),
|
|
"event_manifest_path": event_metadata.get("manifest_path", ""),
|
|
"clip_ids": event_metadata.get("clip_ids", []),
|
|
"clip_count": event_metadata.get("clip_count", len(case_dirs)),
|
|
"clip_case_dirs": [str(case_dir) for case_dir in case_dirs],
|
|
"source_summaries": source_summaries,
|
|
"merge_output_path": str(merge_output_path),
|
|
"event_frame_count": len(ordered_frames),
|
|
"frames": [
|
|
{
|
|
"event_frame_index": record.event_frame_index,
|
|
"event_frame_id": record.event_frame_id,
|
|
"image_name": record.image_name,
|
|
"timestamp": record.timestamp,
|
|
"clip_case_name": record.clip_case_name,
|
|
"clip_token": record.clip_token,
|
|
"original_image_name": record.original_image_name,
|
|
"original_frame_id": record.original_frame_id,
|
|
"source_files": {
|
|
source_name: str(path)
|
|
for source_name, path in sorted(record.source_files.items())
|
|
},
|
|
}
|
|
for record in ordered_frames
|
|
],
|
|
}
|
|
|
|
|
|
def run_event_tracking(
|
|
*,
|
|
event_dir: Path,
|
|
output_dir: Path,
|
|
file_pattern: str,
|
|
classes: list[int],
|
|
iou_threshold: float,
|
|
max_age: int,
|
|
min_hits: int,
|
|
distance_threshold: float,
|
|
use_3d: bool,
|
|
max_3d_distance: float,
|
|
model_version: Optional[str],
|
|
merge_output_name: str,
|
|
manifest_name: str,
|
|
verbose: bool = True,
|
|
) -> dict[str, Any]:
|
|
event_metadata = load_event_metadata(event_dir)
|
|
ordered_frames, case_dirs = collect_event_frames(event_dir, file_pattern, verbose=verbose)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
tracking_results_by_source: dict[str, list[dict[str, Any]]] = {}
|
|
source_summaries: dict[str, dict[str, Any]] = {}
|
|
|
|
for source_name, output_name in SOURCE_SPECS:
|
|
predictions_data = load_source_predictions(
|
|
ordered_frames,
|
|
source_name,
|
|
model_version=model_version,
|
|
)
|
|
output_path = output_dir / output_name
|
|
|
|
if not predictions_data:
|
|
source_summaries[source_name] = {
|
|
"ok": False,
|
|
"reason": "no_frames",
|
|
"frames": 0,
|
|
"unique_tracks": 0,
|
|
"output_path": str(output_path),
|
|
}
|
|
if verbose:
|
|
print(f"Warning: no frames found for source {source_name} under {event_dir}")
|
|
continue
|
|
|
|
if verbose:
|
|
print("")
|
|
print(f"--- Tracking {source_name} at event scope ---")
|
|
print(f"Frames: {len(predictions_data)}")
|
|
print(f"Output: {output_path}")
|
|
|
|
tracking_results = track_objects(
|
|
predictions_data,
|
|
target_classes=classes,
|
|
iou_threshold=iou_threshold,
|
|
max_age=max_age,
|
|
min_hits=min_hits,
|
|
distance_threshold=distance_threshold,
|
|
use_3d=use_3d,
|
|
max_3d_distance=max_3d_distance,
|
|
verbose=verbose,
|
|
)
|
|
save_tracking_results(tracking_results, output_path)
|
|
tracking_results_by_source[source_name] = tracking_results
|
|
source_summaries[source_name] = {
|
|
"ok": True,
|
|
"frames": len(predictions_data),
|
|
"unique_tracks": count_unique_tracks(tracking_results),
|
|
"output_path": str(output_path),
|
|
}
|
|
|
|
combined_output_path = output_dir / merge_output_name
|
|
if not tracking_results_by_source:
|
|
raise RuntimeError(f"No valid source predictions were loaded for event: {event_dir}")
|
|
|
|
combined_tracking = merge_event_tracking_results(
|
|
ordered_frames=ordered_frames,
|
|
tracking_results_by_source=tracking_results_by_source,
|
|
)
|
|
save_tracking_results(combined_tracking, combined_output_path)
|
|
|
|
manifest_path = output_dir / manifest_name
|
|
manifest_payload = build_frame_manifest_payload(
|
|
event_dir=event_dir,
|
|
output_dir=output_dir,
|
|
event_metadata=event_metadata,
|
|
case_dirs=case_dirs,
|
|
ordered_frames=ordered_frames,
|
|
source_summaries=source_summaries,
|
|
merge_output_path=combined_output_path,
|
|
)
|
|
with manifest_path.open("w", encoding="utf-8") as file:
|
|
json.dump(manifest_payload, file, indent=2, ensure_ascii=False)
|
|
|
|
if verbose:
|
|
print("")
|
|
print("==========================================")
|
|
print(f"Event : {event_metadata.get('event_id', event_dir.name)}")
|
|
print(f"Scene : {event_metadata.get('scene', event_dir.parent.name)}")
|
|
print(f"Date : {event_metadata.get('date_name', DEFAULT_EVENT_OUTPUT_DIRNAME)}")
|
|
print(f"Clips : {len(case_dirs)}")
|
|
print(f"Frames : {len(ordered_frames)}")
|
|
print(f"Merge : {combined_output_path}")
|
|
print(f"Manifest: {manifest_path}")
|
|
for source_name, _ in SOURCE_SPECS:
|
|
summary = source_summaries.get(source_name, {})
|
|
status = "ok" if summary.get("ok") else summary.get("reason", "skipped")
|
|
print(
|
|
f" - {source_name}: {status}, frames={summary.get('frames', 0)}, "
|
|
f"tracks={summary.get('unique_tracks', 0)}"
|
|
)
|
|
print("==========================================")
|
|
|
|
return {
|
|
"event_dir": str(event_dir),
|
|
"output_dir": str(output_dir),
|
|
"manifest_path": str(manifest_path),
|
|
"merge_output_path": str(combined_output_path),
|
|
"event_frame_count": len(ordered_frames),
|
|
"clip_case_count": len(case_dirs),
|
|
"source_summaries": source_summaries,
|
|
}
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Track all clip-level exported inference results under one event directory."
|
|
)
|
|
parser.add_argument("--event-dir", required=True, help="Event directory containing multiple clip-case outputs")
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
default=None,
|
|
help="Output directory for event-level tracking results (default: <event-dir>/event_tracking)",
|
|
)
|
|
parser.add_argument("--file-pattern", default="*.json", help="Glob pattern for per-frame JSONs in each source dir")
|
|
parser.add_argument(
|
|
"--classes",
|
|
type=int,
|
|
nargs="+",
|
|
default=None,
|
|
help="Class IDs to track (default: track_objects.py defaults)",
|
|
)
|
|
parser.add_argument("--iou-threshold", type=float, default=0.3)
|
|
parser.add_argument("--max-age", type=int, default=5)
|
|
parser.add_argument("--min-hits", type=int, default=1)
|
|
parser.add_argument("--distance-threshold", type=float, default=100.0)
|
|
parser.add_argument("--model-version", type=str, default=None)
|
|
parser.add_argument("--use-3d", action="store_true")
|
|
parser.add_argument("--max-3d-distance", type=float, default=10.0)
|
|
parser.add_argument("--merge-output-name", type=str, default="combined_tracking.json")
|
|
parser.add_argument("--manifest-name", type=str, default="frame_order_manifest.json")
|
|
parser.add_argument("--quiet", action="store_true", help="Reduce progress logging")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
event_dir = Path(args.event_dir).resolve()
|
|
if not event_dir.is_dir():
|
|
raise FileNotFoundError(f"Event directory does not exist: {event_dir}")
|
|
|
|
output_dir = (
|
|
Path(args.output_dir).resolve()
|
|
if args.output_dir is not None
|
|
else event_dir / load_event_metadata(event_dir).get("date_name", DEFAULT_EVENT_OUTPUT_DIRNAME)
|
|
)
|
|
classes = list(TRACKED_CLASS_IDS) if args.classes is None else [int(cls_id) for cls_id in args.classes]
|
|
|
|
run_event_tracking(
|
|
event_dir=event_dir,
|
|
output_dir=output_dir,
|
|
file_pattern=args.file_pattern,
|
|
classes=classes,
|
|
iou_threshold=args.iou_threshold,
|
|
max_age=args.max_age,
|
|
min_hits=args.min_hits,
|
|
distance_threshold=args.distance_threshold,
|
|
use_3d=args.use_3d,
|
|
max_3d_distance=args.max_3d_distance,
|
|
model_version=args.model_version,
|
|
merge_output_name=args.merge_output_name,
|
|
manifest_name=args.manifest_name,
|
|
verbose=not args.quiet,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|