575 lines
20 KiB
Python
Executable File
575 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Convert merge_tracking.json to ObjectPerceptionObjectList protobuf format.
|
|
|
|
Produces three files that can be consumed by make_jl.py:
|
|
- ObjectPerceptionObjectList.data.json
|
|
- ObjectPerceptionObjectList.bin
|
|
- ObjectPerceptionObjectList.index.json
|
|
|
|
Usage:
|
|
python convert_merge_tracking.py merge_tracking.json [-o output_dir] [--cam-id N]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), "pyproto"))
|
|
import object_pb2_new as object_pb2
|
|
import geometry_pb2
|
|
import camera_pb2
|
|
|
|
# mono3d_ground.yaml class_map numeric ids -> canonical class names.
|
|
CLASS_ID_TO_NAME = {
|
|
0: "car",
|
|
1: "suv",
|
|
2: "pickup",
|
|
3: "medium_car",
|
|
4: "van",
|
|
5: "bus",
|
|
6: "truck",
|
|
7: "special_vehicle",
|
|
8: "unknown",
|
|
9: "pedestrian",
|
|
10: "bicyclist",
|
|
11: "bicycle",
|
|
12: "tricycle",
|
|
13: "traffic_sign",
|
|
14: "wheel",
|
|
15: "plate",
|
|
16: "face",
|
|
17: "car_fake",
|
|
18: "bicyclist_fake",
|
|
19: "pedestrian_fake",
|
|
}
|
|
|
|
# Fake detector categories are carried by ObjectCategory so they do not overwrite
|
|
# the attribute-origin VehicleClass / PedCls semantics.
|
|
CAR_FAKE_CATEGORY = getattr(object_pb2.Object, "kCategoryCarFake", object_pb2.Object.kCategoryCar)
|
|
PEDESTRIAN_FAKE_CATEGORY = getattr(object_pb2.Object, "kCategoryPedestrianFake", object_pb2.Object.kCategoryPedestrian)
|
|
CYCLIST_FAKE_CATEGORY = getattr(object_pb2.Object, "kCategoryCyclistFake", object_pb2.Object.kCategoryCyclist)
|
|
|
|
if CAR_FAKE_CATEGORY == object_pb2.Object.kCategoryCar:
|
|
print(
|
|
"[WARN] object_pb2_new.py does not expose kCategoryCarFake/kCategoryPedestrianFake/kCategoryCyclistFake yet. "
|
|
"Detector-origin fake categories will currently fall back to legacy non-fake ObjectCategory values until "
|
|
"the protobuf Python bindings are regenerated from object.proto.",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
# Canonical / legacy class names -> (ObjectType, ObjectCategory).
|
|
CLASS_NAME_TO_PROTO = {
|
|
"car": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryCar),
|
|
"car_fake": (object_pb2.Object.kVehicle, CAR_FAKE_CATEGORY),
|
|
"suv": (object_pb2.Object.kVehicle, object_pb2.Object.kCategorySuv),
|
|
"pickup": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryPickup),
|
|
"medium_car": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryMediumCar),
|
|
"van": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryVan),
|
|
"bus": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryBus),
|
|
"truck": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck),
|
|
"tanker": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck),
|
|
"large_truck": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck),
|
|
"construction_vehicle": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck),
|
|
"special_vehicle": (object_pb2.Object.kVehicle, object_pb2.Object.kCategorySpecialVehicle),
|
|
"unknown": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryUnknownVehicle),
|
|
"pedestrian": (object_pb2.Object.kPed, object_pb2.Object.kCategoryPedestrian),
|
|
"pedestrian_fake": (object_pb2.Object.kPed, PEDESTRIAN_FAKE_CATEGORY),
|
|
"bicyclist": (object_pb2.Object.kCyclist, object_pb2.Object.kCategoryCyclist),
|
|
"bicyclist_fake": (object_pb2.Object.kCyclist, CYCLIST_FAKE_CATEGORY),
|
|
"motorcyclist": (object_pb2.Object.kCyclist, object_pb2.Object.kCategoryCyclist),
|
|
"bicycle": (object_pb2.Object.kBike, object_pb2.Object.kCategoryBike),
|
|
"motorcycle": (object_pb2.Object.kBike, object_pb2.Object.kCategoryBike),
|
|
"tricycle": (object_pb2.Object.kThreeWheeledVehicle, object_pb2.Object.kCategoryTricycle),
|
|
"tricyclist": (object_pb2.Object.kThreeWheeledVehicle, object_pb2.Object.kCategoryTricycle),
|
|
# The new schema keeps traffic-sign category fine-grained in ObjectCategory only.
|
|
"traffic_sign": (object_pb2.Object.kSmallTrafficSign, object_pb2.Object.kCategoryTrafficSign),
|
|
"wheel": (object_pb2.Object.kVehicleWheel, object_pb2.Object.kCategoryVehicleWheel),
|
|
"plate": (object_pb2.Object.kVehiclePlate, object_pb2.Object.kCategoryLicensePlate),
|
|
"face": (object_pb2.Object.kPedHead, object_pb2.Object.kCategoryHead),
|
|
# Legacy names from the pre-2026 class table.
|
|
"vehicle": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryNone),
|
|
"rider": (object_pb2.Object.kCyclist, object_pb2.Object.kCategoryCyclist),
|
|
"roadblock": (object_pb2.Object.kRoadBarrier, object_pb2.Object.kCategoryNone),
|
|
"head": (object_pb2.Object.kPedHead, object_pb2.Object.kCategoryHead),
|
|
"tsr": (object_pb2.Object.kSmallTrafficSign, object_pb2.Object.kCategoryTrafficSign),
|
|
"guideboard": (object_pb2.Object.kBigTrafficSign, object_pb2.Object.kCategoryTrafficSign),
|
|
"tl_border": (object_pb2.Object.kTrafficLight, object_pb2.Object.kCategoryNone),
|
|
"tl_wick": (object_pb2.Object.kTrafficLightBulb, object_pb2.Object.kCategoryNone),
|
|
"tl_num": (object_pb2.Object.kTrafficLightDigit, object_pb2.Object.kCategoryNone),
|
|
}
|
|
|
|
# anchor string → AnchorPtInfo enum value
|
|
ANCHOR_MAP = {
|
|
"kMonocular3DRear": 22, # AnchorPtInfo.kMonocular3DRear
|
|
"kMonocular3DFront": 21, # AnchorPtInfo.kMonocular3DFront
|
|
"kMonocular3DCenter": 25, # AnchorPtInfo.kMonocular3DCenter
|
|
"kMonocular3DLeft": 23, # AnchorPtInfo.kMonocular3DLeft
|
|
"kMonocular3DRight": 24, # AnchorPtInfo.kMonocular3DRight
|
|
}
|
|
|
|
# face_cls → VehiclePose enum value
|
|
FACE_CLS_MAP = {
|
|
"tail": 2, # kMidTail
|
|
"head": 5, # kMidHead
|
|
"side": 14, # kSide
|
|
"none": 0, # kInvalid
|
|
}
|
|
|
|
MEASURE_MONO_3D = object_pb2.Object.kMeasureMono3D
|
|
VEHICLE_HIT_TYPES = {
|
|
object_pb2.Object.kVehicle,
|
|
object_pb2.Object.kThreeWheeledVehicle,
|
|
}
|
|
VEHICLE_CLASS_MIN = object_pb2.Object.kNegative
|
|
VEHICLE_CLASS_MAX = object_pb2.Object.kFakeCar
|
|
VEHICLE_CLASS_UNKNOWN = object_pb2.Object.kVehicleUnknown
|
|
|
|
|
|
def parse_numeric_value(value):
|
|
"""Convert JSON scalar strings to int / float when possible."""
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, (int, float)):
|
|
return value
|
|
|
|
value_str = str(value).strip()
|
|
if not value_str:
|
|
return None
|
|
|
|
try:
|
|
numeric = float(value_str)
|
|
except ValueError:
|
|
return None
|
|
|
|
if numeric.is_integer():
|
|
return int(numeric)
|
|
return numeric
|
|
|
|
|
|
def safe_int(value):
|
|
"""Best-effort integer conversion."""
|
|
numeric = parse_numeric_value(value)
|
|
if numeric is None:
|
|
return None
|
|
try:
|
|
return int(numeric)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def safe_float(value):
|
|
"""Best-effort float conversion."""
|
|
numeric = parse_numeric_value(value)
|
|
if numeric is None:
|
|
return None
|
|
try:
|
|
return float(numeric)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def normalize_name_token(value):
|
|
"""Normalize class/task-like names to a lowercase underscore form."""
|
|
normalized = re.sub(r"[^a-z0-9]+", "_", str(value or "").strip().lower())
|
|
return normalized.strip("_")
|
|
|
|
|
|
def parse_image_name(image_name):
|
|
"""Extract frame_id and cam_id from image_name.
|
|
|
|
Example: "000005_camera4_000006_merged" → frame_id=5, cam_id=4
|
|
"""
|
|
frame_id = 0
|
|
cam_id = 0
|
|
|
|
# Extract first number segment as frame index.
|
|
# Fall back to the penultimate tail number for names like:
|
|
# G1M3_xxx_uuid_000000_289998
|
|
m = re.match(r"(\d+)", image_name)
|
|
if m:
|
|
frame_id = int(m.group(1))
|
|
else:
|
|
m = re.search(r"_(\d+)_(\d+)(?:_merged)?$", image_name)
|
|
if m:
|
|
frame_id = int(m.group(1))
|
|
|
|
# Extract camera number
|
|
m = re.search(r"camera(\d+)", image_name)
|
|
if not m:
|
|
m = re.search(r"G\d+M(\d+)", image_name)
|
|
if m:
|
|
cam_id = int(m.group(1))
|
|
|
|
return frame_id, cam_id
|
|
|
|
|
|
def get_detection_frame_id(det):
|
|
"""Read frame id from either the old or new input schema."""
|
|
frame_id = det.get("frame_id")
|
|
if frame_id is None:
|
|
frame_id = det.get("frameId")
|
|
return frame_id
|
|
|
|
|
|
def get_frame_timestamp(frame, detections):
|
|
"""Read the frame timestamp from frame-level data or the first detection."""
|
|
timestamp = safe_int(frame.get("timestamp"))
|
|
if timestamp is not None:
|
|
return timestamp
|
|
|
|
if not detections:
|
|
return None
|
|
return safe_int(detections[0].get("timestamp"))
|
|
|
|
|
|
def get_detection_class_id(det):
|
|
"""Read class id from old/new tracking or prediction schemas."""
|
|
for key in ("class_id", "cls_id", "type"):
|
|
class_id = safe_int(det.get(key))
|
|
if class_id is not None:
|
|
return class_id
|
|
return None
|
|
|
|
|
|
def get_detection_class_name(det):
|
|
"""Resolve the most reliable class name for mapping."""
|
|
for key in ("cls_name", "class_name", "type_name"):
|
|
value = det.get(key)
|
|
if value is None:
|
|
continue
|
|
class_name = str(value).strip().lower()
|
|
if not class_name:
|
|
continue
|
|
if class_name in CLASS_NAME_TO_PROTO:
|
|
return class_name
|
|
|
|
class_id = get_detection_class_id(det)
|
|
if class_id is not None and class_id in CLASS_ID_TO_NAME:
|
|
return CLASS_ID_TO_NAME[class_id]
|
|
return class_name
|
|
|
|
class_id = get_detection_class_id(det)
|
|
if class_id is None:
|
|
return ""
|
|
return CLASS_ID_TO_NAME.get(class_id, "")
|
|
|
|
|
|
def is_detector_fake_vehicle_class(class_name):
|
|
"""Return whether a normalized detector class name denotes a fake vehicle class."""
|
|
return normalize_name_token(class_name) == "car_fake"
|
|
|
|
|
|
def resolve_proto_class(det):
|
|
"""Resolve ObjectType / ObjectCategory from one detection."""
|
|
class_name = get_detection_class_name(det)
|
|
if is_detector_fake_vehicle_class(class_name):
|
|
return object_pb2.Object.kVehicle, CAR_FAKE_CATEGORY, class_name
|
|
hit_type, object_category = CLASS_NAME_TO_PROTO.get(
|
|
class_name,
|
|
(object_pb2.Object.kNone, object_pb2.Object.kCategoryNone),
|
|
)
|
|
return hit_type, object_category, class_name
|
|
|
|
|
|
def is_valid_vehicle_class(value):
|
|
"""Check whether a numeric value fits the VehicleClass enum range."""
|
|
if value is None:
|
|
return False
|
|
return VEHICLE_CLASS_MIN <= int(value) <= VEHICLE_CLASS_MAX
|
|
|
|
|
|
def resolve_vehicle_class_from_attribute(det):
|
|
"""Map raw vehicle attribute outputs into the proto VehicleClass enum."""
|
|
attribute = det.get("attribute")
|
|
if not isinstance(attribute, dict):
|
|
return None
|
|
|
|
if normalize_name_token(attribute.get("task")) != "vehicle":
|
|
return None
|
|
|
|
attr_cls = safe_int(attribute.get("attr_cls"))
|
|
if attr_cls is None:
|
|
return None
|
|
|
|
is_fake = safe_int(attribute.get("is_fake")) or 0
|
|
if is_fake == 1:
|
|
return object_pb2.Object.kFakeCar
|
|
if attr_cls <= 11:
|
|
return attr_cls
|
|
if attr_cls == 23:
|
|
return object_pb2.Object.kSpecialCar
|
|
return attr_cls + 3
|
|
|
|
|
|
def resolve_vehicle_class(det, hit_type):
|
|
"""Resolve VehicleClass for vehicle-like detections.
|
|
|
|
Prefer the tracked `sub_cls` field because it already encodes the upstream
|
|
attribute-to-subclass mapping. Fall back to re-deriving the same mapping
|
|
from the raw `attribute` payload when `sub_cls` is absent or invalid.
|
|
"""
|
|
if hit_type not in VEHICLE_HIT_TYPES:
|
|
return None
|
|
|
|
sub_cls = safe_int(det.get("sub_cls"))
|
|
if is_valid_vehicle_class(sub_cls):
|
|
return int(sub_cls)
|
|
|
|
attr_vehicle_class = resolve_vehicle_class_from_attribute(det)
|
|
if is_valid_vehicle_class(attr_vehicle_class):
|
|
return int(attr_vehicle_class)
|
|
|
|
return VEHICLE_CLASS_UNKNOWN
|
|
|
|
|
|
def get_detection_bbox(det):
|
|
"""Read a bbox from either tracking or prediction-style keys."""
|
|
for key in ("bbox", "box2d"):
|
|
bbox = det.get(key)
|
|
if not isinstance(bbox, (list, tuple)) or len(bbox) < 4:
|
|
continue
|
|
values = [safe_float(v) for v in bbox[:4]]
|
|
if all(v is not None for v in values):
|
|
return [float(v) for v in values]
|
|
return None
|
|
|
|
|
|
def build_mono_measure_component(det, anchor_str=None):
|
|
"""Build the Mono3D measurement component attached to a mono object."""
|
|
obj_3d_ego = det.get("object_3d_ego")
|
|
if not obj_3d_ego or len(obj_3d_ego) < 7:
|
|
return None
|
|
|
|
x, y, z, l, h, w, yaw = obj_3d_ego[:7]
|
|
comp = object_pb2.Object()
|
|
comp.measure_type = MEASURE_MONO_3D
|
|
|
|
wi = comp.world_info
|
|
wi.pos.x = float(x)
|
|
wi.pos.y = float(y)
|
|
wi.pos.z = float(z)
|
|
wi.size.l = float(l)
|
|
wi.size.h = float(h)
|
|
wi.size.w = float(w)
|
|
wi.pose_angle.yaw = float(yaw)
|
|
wi.measure_type = MEASURE_MONO_3D
|
|
if anchor_str and anchor_str in ANCHOR_MAP:
|
|
wi.anchor = ANCHOR_MAP[anchor_str]
|
|
|
|
return comp
|
|
|
|
|
|
def populate_object_fields(det, obj, anchor_str=None, cam_id=None,
|
|
include_image_info=False, include_model_3d=False):
|
|
"""Populate a protobuf object with the common fields from one detection."""
|
|
if anchor_str is None:
|
|
anchor_str = det.get("anchor")
|
|
|
|
class_id = get_detection_class_id(det)
|
|
hit_type, object_category, class_name = resolve_proto_class(det)
|
|
obj.hit_type = hit_type
|
|
obj.hit_id = hit_type
|
|
obj.object_category = object_category
|
|
if class_name:
|
|
obj.hit_type_str = class_name
|
|
|
|
track_id = det.get("track_id")
|
|
if track_id is not None:
|
|
obj.id = int(track_id)
|
|
|
|
frame_id = get_detection_frame_id(det)
|
|
if frame_id is not None:
|
|
obj.frame_id = int(frame_id)
|
|
|
|
timestamp = det.get("timestamp")
|
|
if timestamp is not None:
|
|
obj.timestamp = int(timestamp)
|
|
|
|
lane_assignment = det.get("lane_assignment")
|
|
if lane_assignment is not None:
|
|
obj.lane_assignment.val = int(lane_assignment)
|
|
|
|
if include_image_info:
|
|
bbox = get_detection_bbox(det)
|
|
if bbox is not None:
|
|
x1, y1, x2, y2 = bbox
|
|
obj.image_info.det_rect.x = float(x1)
|
|
obj.image_info.det_rect.y = float(y1)
|
|
obj.image_info.det_rect.w = float(x2 - x1)
|
|
obj.image_info.det_rect.h = float(y2 - y1)
|
|
if cam_id is not None:
|
|
obj.image_info.camera_id.id = int(cam_id)
|
|
|
|
if anchor_str and anchor_str in ANCHOR_MAP:
|
|
obj.world_info.anchor = ANCHOR_MAP[anchor_str]
|
|
|
|
obj.world_info.id = obj.id
|
|
obj.world_info.hit_type = hit_type
|
|
obj.world_info.object_category = object_category
|
|
vehicle_class = resolve_vehicle_class(det, hit_type)
|
|
if vehicle_class is not None:
|
|
obj.world_info.cls.val = int(vehicle_class)
|
|
obj.world_info.cls_ori.val = int(vehicle_class)
|
|
elif class_id is not None:
|
|
obj.world_info.cls.val = int(class_id)
|
|
obj.world_info.cls_ori.val = int(class_id)
|
|
|
|
face_cls = det.get("face_cls")
|
|
if face_cls is not None:
|
|
face_str = str(face_cls)
|
|
if face_str in FACE_CLS_MAP:
|
|
obj.world_info.pose.val = FACE_CLS_MAP[face_str]
|
|
|
|
obj_3d_ego = det.get("object_3d_ego")
|
|
if obj_3d_ego and len(obj_3d_ego) >= 7:
|
|
x, y, z, l, h, w, yaw = obj_3d_ego[:7]
|
|
wi = obj.world_info
|
|
wi.pos.x = float(x)
|
|
wi.pos.y = float(y)
|
|
wi.pos.z = float(z)
|
|
wi.size.l = float(l)
|
|
wi.size.h = float(h)
|
|
wi.size.w = float(w)
|
|
wi.pose_angle.yaw = float(yaw)
|
|
|
|
if include_model_3d:
|
|
obj_3d = det.get("object_3d")
|
|
if obj_3d and len(obj_3d) >= 7:
|
|
x, y, z, l, h, w, yaw = obj_3d[:7]
|
|
m3d = obj.world_info.monocular_3d.model_3d_pos
|
|
m3d.x3d = float(x)
|
|
m3d.y3d = float(y)
|
|
m3d.z3d = float(z)
|
|
m3d.heading = float(yaw)
|
|
if anchor_str and anchor_str in ANCHOR_MAP:
|
|
obj.world_info.monocular_3d.anchor = ANCHOR_MAP[anchor_str]
|
|
|
|
|
|
|
|
def build_object(det, cam_id=None):
|
|
"""Build a final Object protobuf with source-object and Mono3D nesting."""
|
|
anchor_str = det.get("anchor")
|
|
|
|
source_obj = object_pb2.Object()
|
|
populate_object_fields(
|
|
det,
|
|
source_obj,
|
|
anchor_str=anchor_str,
|
|
cam_id=cam_id,
|
|
include_image_info=True,
|
|
include_model_3d=True,
|
|
)
|
|
source_obj.measure_type = MEASURE_MONO_3D
|
|
source_obj.world_info.measure_type = MEASURE_MONO_3D
|
|
|
|
mono_measure = build_mono_measure_component(det, anchor_str)
|
|
if mono_measure is not None:
|
|
source_obj.key_components.add().CopyFrom(mono_measure)
|
|
|
|
obj = object_pb2.Object()
|
|
populate_object_fields(det, obj, anchor_str=anchor_str)
|
|
obj.key_components.add().CopyFrom(source_obj)
|
|
return obj
|
|
|
|
|
|
def build_object_list(frame, cam_id_override=None):
|
|
"""Build an ObjectList protobuf message from a frame dict."""
|
|
image_name = frame.get("image_name", "")
|
|
frame_id, cam_id = parse_image_name(image_name)
|
|
|
|
if cam_id_override is not None:
|
|
cam_id = cam_id_override
|
|
|
|
obj_list = object_pb2.ObjectList()
|
|
obj_list.frame_id = frame_id
|
|
obj_list.cam_id.id = cam_id
|
|
|
|
detections = frame.get("detections", [])
|
|
frame_timestamp = get_frame_timestamp(frame, detections)
|
|
if frame_timestamp is not None:
|
|
obj_list.timestamp = int(frame_timestamp)
|
|
|
|
# Extract frame_id and version from the first detection.
|
|
if detections:
|
|
det_frame_id = get_detection_frame_id(detections[0])
|
|
if det_frame_id is not None:
|
|
obj_list.frame_id = int(det_frame_id)
|
|
det_version = detections[0].get("version")
|
|
if det_version is not None:
|
|
obj_list.version = str(det_version)
|
|
|
|
for det in detections:
|
|
obj = build_object(det, cam_id=cam_id)
|
|
obj_list.list.append(obj)
|
|
|
|
return obj_list, frame_id
|
|
|
|
|
|
def convert(input_path, output_dir, cam_id_override=None):
|
|
"""Convert merge_tracking.json to the three-file protobuf set."""
|
|
with open(input_path, "r") as f:
|
|
frames = json.load(f)
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
bin_path = os.path.join(output_dir, "ObjectPerceptionObjectList.bin")
|
|
index_path = os.path.join(output_dir, "ObjectPerceptionObjectList.index.json")
|
|
data_path = os.path.join(output_dir, "ObjectPerceptionObjectList.data.json")
|
|
|
|
index_entries = []
|
|
offset = 0
|
|
|
|
with open(bin_path, "wb") as bin_file:
|
|
for i, frame in enumerate(frames):
|
|
obj_list, frame_idx = build_object_list(frame, cam_id_override)
|
|
serialized = obj_list.SerializeToString()
|
|
size = len(serialized)
|
|
|
|
bin_file.write(serialized)
|
|
index_entries.append([frame_idx, offset, size])
|
|
offset += size
|
|
|
|
print(f"Convert frame {i} (frame_id={frame_idx}, "
|
|
f"detections={len(frame.get('detections', []))})",
|
|
end="\r", file=sys.stderr)
|
|
|
|
print(f"\nTotal frames: {len(frames)}", file=sys.stderr)
|
|
|
|
# Write index.json
|
|
with open(index_path, "w") as f:
|
|
json.dump({"index": index_entries}, f)
|
|
|
|
# Write data.json
|
|
with open(data_path, "w") as f:
|
|
json.dump({
|
|
"data": ["ObjectPerceptionObjectList.bin"],
|
|
"index": ["ObjectPerceptionObjectList.index.json"],
|
|
"elem_count": len(frames),
|
|
}, f, indent=2)
|
|
|
|
print(f"Output files written to: {output_dir}", file=sys.stderr)
|
|
print(f" {data_path}", file=sys.stderr)
|
|
print(f" {bin_path}", file=sys.stderr)
|
|
print(f" {index_path}", file=sys.stderr)
|
|
|
|
return data_path
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(
|
|
description="Convert merge_tracking.json to ObjectPerceptionObjectList protobuf format")
|
|
ap.add_argument("input", help="Path to merge_tracking.json")
|
|
ap.add_argument("-o", "--output-dir", default=".",
|
|
help="Output directory (default: current directory)")
|
|
ap.add_argument("--cam-id", type=int, default=None,
|
|
help="Override camera ID (default: parsed from image_name)")
|
|
opt = ap.parse_args()
|
|
|
|
convert(opt.input, opt.output_dir, opt.cam_id)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|