Files
yolov26_3d/tools/temporal_analysis/split_tracking_json.py

240 lines
7.5 KiB
Python
Raw Permalink Normal View History

2026-06-24 09:35:46 +08:00
#!/usr/bin/env python3
"""Split tracking JSON into one JSON file per frame while preserving track IDs.
Input format:
[
{
"image_name": "1741824631_0",
"detections": [
{
"class_id": 0,
"confidence": 0.91,
"bbox": [x1, y1, x2, y2],
"track_id": 7,
...
}
]
},
...
]
Output format (one file per frame, evaluator-friendly):
{
"0": {
"type": "0",
"type_name": "vehicle",
"score": "0.91",
"box2d": [x1, y1, x2, y2],
"xyzlhwyaw": [...],
"face_cls": "front",
"cut_cls": "0",
"cut_cls_name": "nocut",
"track_id": 7
},
"1": { ... }
}
Usage:
python tools/temporal_analysis/split_tracking_json.py \
--input /path/to/roi0.json \
--output-dir /path/to/json_results
"""
import argparse
import json
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from merge_tracking_results import normalize_image_name
CLASS_ID_TO_NAME = {
0: "vehicle",
1: "pedestrian",
2: "bicycle",
3: "rider",
4: "roadblock",
5: "head",
6: "tsr",
7: "guideboard",
8: "plate",
9: "wheel",
10: "tl_border",
11: "tl_wick",
12: "tl_num",
13: "tricycle",
}
def format_scalar(value):
"""Keep ints as ints, convert everything else to plain JSON numbers/strings."""
if value is None:
return value
if isinstance(value, bool):
return value
if isinstance(value, int):
return value
if isinstance(value, float):
return float(value)
return value
def normalize_detection(det, keep_all_fields=False):
"""Convert one tracking detection to evaluator-friendly single-frame JSON schema."""
class_id = int(det.get("class_id", -1))
bbox = det.get("bbox", [])
confidence = det.get("confidence", 0.0)
item = {
"type": str(class_id),
"type_name": det.get("type_name") or CLASS_ID_TO_NAME.get(class_id, ""),
"score": confidence,
"box2d": [format_scalar(v) for v in bbox[:4]],
"face_cls": det.get("face_cls", "none"),
"cut_cls": str(det.get("cut_cls", -1)),
"cut_cls_name": det.get("cut_cls_name", "none"),
}
if "track_id" in det and det.get("track_id") is not None:
item["track_id"] = int(det["track_id"])
if "lane_assignment" in det and det.get("lane_assignment") is not None:
item["lane_assignment"] = int(det["lane_assignment"])
if "frameId" in det and det.get("frameId") is not None:
item["frameId"] = det["frameId"]
if "timestamp" in det and det.get("timestamp") is not None:
item["timestamp"] = format_scalar(det["timestamp"])
if "version" in det and det.get("version") is not None:
item["version"] = det["version"]
if "anchor" in det and det.get("anchor") is not None:
item["anchor"] = det["anchor"]
if "roi_id" in det and det.get("roi_id") is not None:
item["roi_id"] = int(det["roi_id"])
object_3d = det.get("object_3d")
if object_3d is not None:
item["xyzlhwyaw"] = [format_scalar(v) for v in object_3d[:7]]
object_3d_ego = det.get("object_3d_ego")
if object_3d_ego is not None:
item["xyzlhwyaw_ego"] = [format_scalar(v) for v in object_3d_ego[:7]]
if keep_all_fields:
for key, value in det.items():
if key in item or key in {"class_id", "confidence", "bbox", "object_3d", "object_3d_ego"}:
continue
item[key] = value
return item
def frame_output_name(frame):
"""Choose output filename stem for one frame."""
image_name = frame.get("image_name") or ""
stem = Path(image_name).stem if image_name else ""
return normalize_image_name(stem) if stem else "unknown"
def build_output_data(frame, keep_all_fields=False):
"""Convert one tracking frame to per-frame output payload."""
output_data = {}
for idx, det in enumerate(frame.get("detections", [])):
output_data[str(idx)] = normalize_detection(det, keep_all_fields=keep_all_fields)
return output_data
def write_one_frame(frame, output_dir, keep_all_fields=False, pretty=False):
"""Write one output JSON file and return its path."""
frame_name = frame_output_name(frame)
output_data = build_output_data(frame, keep_all_fields=keep_all_fields)
output_path = output_dir / f"{frame_name}.json"
dump_kwargs = {
"ensure_ascii": False,
}
if pretty:
dump_kwargs["indent"] = 2
else:
dump_kwargs["separators"] = (",", ":")
with open(output_path, "w", encoding="utf-8") as f:
json.dump(output_data, f, **dump_kwargs)
return output_path
def split_tracking_json(input_path, output_dir, keep_all_fields=False, num_workers=1, pretty=False):
"""Split a tracking JSON list into one frame file per item."""
input_path = Path(input_path)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
with open(input_path, "r", encoding="utf-8") as f:
frames = json.load(f)
if not isinstance(frames, list):
raise ValueError(f"Expected list of frames in {input_path}, got {type(frames).__name__}")
max_workers = max(1, int(num_workers))
if max_workers == 1 or len(frames) <= 1:
for frame in frames:
write_one_frame(frame, output_dir, keep_all_fields=keep_all_fields, pretty=pretty)
return len(frames)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(
write_one_frame,
frame,
output_dir,
keep_all_fields,
pretty,
)
for frame in frames
]
for future in futures:
future.result()
return len(frames)
def default_output_dir(input_path):
"""Return a default output directory next to the input JSON."""
input_path = Path(input_path)
return input_path.parent / f"{input_path.stem}_split"
def main():
parser = argparse.ArgumentParser(
description="Split tracking JSON into evaluator-friendly per-frame JSON files."
)
parser.add_argument("--input", required=True, help="Path to roi0.json / tracking.json / merge.json")
parser.add_argument("--output-dir", default=None,
help="Directory to write per-frame JSON files (default: <input_stem>_split)")
parser.add_argument("--keep-all-fields", action="store_true",
help="Keep extra tracking fields beyond the evaluator-friendly schema")
parser.add_argument("--num-workers", type=int, default=max(1, min(16, os.cpu_count() or 1)),
help="Number of parallel workers for per-frame JSON writing (default: %(default)s)")
parser.add_argument("--pretty", action="store_true",
help="Write indented JSON for readability; default uses compact JSON for speed")
args = parser.parse_args()
output_dir = Path(args.output_dir) if args.output_dir else default_output_dir(args.input)
written = split_tracking_json(
input_path=args.input,
output_dir=output_dir,
keep_all_fields=args.keep_all_fields,
num_workers=args.num_workers,
pretty=args.pretty,
)
print(f"Input tracking JSON : {args.input}")
print(f"Output directory : {output_dir}")
print(f"Frames written : {written}")
print(f"Worker count : {args.num_workers}")
print(f"Pretty JSON : {args.pretty}")
if __name__ == "__main__":
main()