#!/usr/bin/env python3 """Convert merge_tracking.json to ObjectPerceptionObjectList protobuf format. Produces three files that can be consumed by make_jl.py: - ObjectPerceptionObjectList.data.json - ObjectPerceptionObjectList.bin - ObjectPerceptionObjectList.index.json Usage: python convert_merge_tracking.py merge_tracking.json [-o output_dir] [--cam-id N] """ import argparse import json import os import re import sys sys.path.append(os.path.join(os.path.dirname(__file__), "pyproto")) import object_pb2_new as object_pb2 import geometry_pb2 import camera_pb2 # mono3d_ground.yaml class_map numeric ids -> canonical class names. CLASS_ID_TO_NAME = { 0: "car", 1: "suv", 2: "pickup", 3: "medium_car", 4: "van", 5: "bus", 6: "truck", 7: "special_vehicle", 8: "unknown", 9: "pedestrian", 10: "bicyclist", 11: "bicycle", 12: "tricycle", 13: "traffic_sign", 14: "wheel", 15: "plate", 16: "face", 17: "car_fake", 18: "bicyclist_fake", 19: "pedestrian_fake", } # Fake detector categories are carried by ObjectCategory so they do not overwrite # the attribute-origin VehicleClass / PedCls semantics. CAR_FAKE_CATEGORY = getattr(object_pb2.Object, "kCategoryCarFake", object_pb2.Object.kCategoryCar) PEDESTRIAN_FAKE_CATEGORY = getattr(object_pb2.Object, "kCategoryPedestrianFake", object_pb2.Object.kCategoryPedestrian) CYCLIST_FAKE_CATEGORY = getattr(object_pb2.Object, "kCategoryCyclistFake", object_pb2.Object.kCategoryCyclist) if CAR_FAKE_CATEGORY == object_pb2.Object.kCategoryCar: print( "[WARN] object_pb2_new.py does not expose kCategoryCarFake/kCategoryPedestrianFake/kCategoryCyclistFake yet. " "Detector-origin fake categories will currently fall back to legacy non-fake ObjectCategory values until " "the protobuf Python bindings are regenerated from object.proto.", file=sys.stderr, ) # Canonical / legacy class names -> (ObjectType, ObjectCategory). CLASS_NAME_TO_PROTO = { "car": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryCar), "car_fake": (object_pb2.Object.kVehicle, CAR_FAKE_CATEGORY), "suv": (object_pb2.Object.kVehicle, object_pb2.Object.kCategorySuv), "pickup": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryPickup), "medium_car": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryMediumCar), "van": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryVan), "bus": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryBus), "truck": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck), "tanker": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck), "large_truck": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck), "construction_vehicle": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck), "special_vehicle": (object_pb2.Object.kVehicle, object_pb2.Object.kCategorySpecialVehicle), "unknown": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryUnknownVehicle), "pedestrian": (object_pb2.Object.kPed, object_pb2.Object.kCategoryPedestrian), "pedestrian_fake": (object_pb2.Object.kPed, PEDESTRIAN_FAKE_CATEGORY), "bicyclist": (object_pb2.Object.kCyclist, object_pb2.Object.kCategoryCyclist), "bicyclist_fake": (object_pb2.Object.kCyclist, CYCLIST_FAKE_CATEGORY), "motorcyclist": (object_pb2.Object.kCyclist, object_pb2.Object.kCategoryCyclist), "bicycle": (object_pb2.Object.kBike, object_pb2.Object.kCategoryBike), "motorcycle": (object_pb2.Object.kBike, object_pb2.Object.kCategoryBike), "tricycle": (object_pb2.Object.kThreeWheeledVehicle, object_pb2.Object.kCategoryTricycle), "tricyclist": (object_pb2.Object.kThreeWheeledVehicle, object_pb2.Object.kCategoryTricycle), # The new schema keeps traffic-sign category fine-grained in ObjectCategory only. "traffic_sign": (object_pb2.Object.kSmallTrafficSign, object_pb2.Object.kCategoryTrafficSign), "wheel": (object_pb2.Object.kVehicleWheel, object_pb2.Object.kCategoryVehicleWheel), "plate": (object_pb2.Object.kVehiclePlate, object_pb2.Object.kCategoryLicensePlate), "face": (object_pb2.Object.kPedHead, object_pb2.Object.kCategoryHead), # Legacy names from the pre-2026 class table. "vehicle": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryNone), "rider": (object_pb2.Object.kCyclist, object_pb2.Object.kCategoryCyclist), "roadblock": (object_pb2.Object.kRoadBarrier, object_pb2.Object.kCategoryNone), "head": (object_pb2.Object.kPedHead, object_pb2.Object.kCategoryHead), "tsr": (object_pb2.Object.kSmallTrafficSign, object_pb2.Object.kCategoryTrafficSign), "guideboard": (object_pb2.Object.kBigTrafficSign, object_pb2.Object.kCategoryTrafficSign), "tl_border": (object_pb2.Object.kTrafficLight, object_pb2.Object.kCategoryNone), "tl_wick": (object_pb2.Object.kTrafficLightBulb, object_pb2.Object.kCategoryNone), "tl_num": (object_pb2.Object.kTrafficLightDigit, object_pb2.Object.kCategoryNone), } # anchor string → AnchorPtInfo enum value ANCHOR_MAP = { "kMonocular3DRear": 22, # AnchorPtInfo.kMonocular3DRear "kMonocular3DFront": 21, # AnchorPtInfo.kMonocular3DFront "kMonocular3DCenter": 25, # AnchorPtInfo.kMonocular3DCenter "kMonocular3DLeft": 23, # AnchorPtInfo.kMonocular3DLeft "kMonocular3DRight": 24, # AnchorPtInfo.kMonocular3DRight } # face_cls → VehiclePose enum value FACE_CLS_MAP = { "tail": 2, # kMidTail "head": 5, # kMidHead "side": 14, # kSide "none": 0, # kInvalid } MEASURE_MONO_3D = object_pb2.Object.kMeasureMono3D VEHICLE_HIT_TYPES = { object_pb2.Object.kVehicle, object_pb2.Object.kThreeWheeledVehicle, } VEHICLE_CLASS_MIN = object_pb2.Object.kNegative VEHICLE_CLASS_MAX = object_pb2.Object.kFakeCar VEHICLE_CLASS_UNKNOWN = object_pb2.Object.kVehicleUnknown def parse_numeric_value(value): """Convert JSON scalar strings to int / float when possible.""" if value is None: return None if isinstance(value, (int, float)): return value value_str = str(value).strip() if not value_str: return None try: numeric = float(value_str) except ValueError: return None if numeric.is_integer(): return int(numeric) return numeric def safe_int(value): """Best-effort integer conversion.""" numeric = parse_numeric_value(value) if numeric is None: return None try: return int(numeric) except (TypeError, ValueError): return None def safe_float(value): """Best-effort float conversion.""" numeric = parse_numeric_value(value) if numeric is None: return None try: return float(numeric) except (TypeError, ValueError): return None def normalize_name_token(value): """Normalize class/task-like names to a lowercase underscore form.""" normalized = re.sub(r"[^a-z0-9]+", "_", str(value or "").strip().lower()) return normalized.strip("_") def parse_image_name(image_name): """Extract frame_id and cam_id from image_name. Example: "000005_camera4_000006_merged" → frame_id=5, cam_id=4 """ frame_id = 0 cam_id = 0 # Extract first number segment as frame index. # Fall back to the penultimate tail number for names like: # G1M3_xxx_uuid_000000_289998 m = re.match(r"(\d+)", image_name) if m: frame_id = int(m.group(1)) else: m = re.search(r"_(\d+)_(\d+)(?:_merged)?$", image_name) if m: frame_id = int(m.group(1)) # Extract camera number m = re.search(r"camera(\d+)", image_name) if not m: m = re.search(r"G\d+M(\d+)", image_name) if m: cam_id = int(m.group(1)) return frame_id, cam_id def get_detection_frame_id(det): """Read frame id from either the old or new input schema.""" frame_id = det.get("frame_id") if frame_id is None: frame_id = det.get("frameId") return frame_id def get_frame_timestamp(frame, detections): """Read the frame timestamp from frame-level data or the first detection.""" timestamp = safe_int(frame.get("timestamp")) if timestamp is not None: return timestamp if not detections: return None return safe_int(detections[0].get("timestamp")) def get_detection_class_id(det): """Read class id from old/new tracking or prediction schemas.""" for key in ("class_id", "cls_id", "type"): class_id = safe_int(det.get(key)) if class_id is not None: return class_id return None def get_detection_class_name(det): """Resolve the most reliable class name for mapping.""" for key in ("cls_name", "class_name", "type_name"): value = det.get(key) if value is None: continue class_name = str(value).strip().lower() if not class_name: continue if class_name in CLASS_NAME_TO_PROTO: return class_name class_id = get_detection_class_id(det) if class_id is not None and class_id in CLASS_ID_TO_NAME: return CLASS_ID_TO_NAME[class_id] return class_name class_id = get_detection_class_id(det) if class_id is None: return "" return CLASS_ID_TO_NAME.get(class_id, "") def is_detector_fake_vehicle_class(class_name): """Return whether a normalized detector class name denotes a fake vehicle class.""" return normalize_name_token(class_name) == "car_fake" def resolve_proto_class(det): """Resolve ObjectType / ObjectCategory from one detection.""" class_name = get_detection_class_name(det) if is_detector_fake_vehicle_class(class_name): return object_pb2.Object.kVehicle, CAR_FAKE_CATEGORY, class_name hit_type, object_category = CLASS_NAME_TO_PROTO.get( class_name, (object_pb2.Object.kNone, object_pb2.Object.kCategoryNone), ) return hit_type, object_category, class_name def is_valid_vehicle_class(value): """Check whether a numeric value fits the VehicleClass enum range.""" if value is None: return False return VEHICLE_CLASS_MIN <= int(value) <= VEHICLE_CLASS_MAX def resolve_vehicle_class_from_attribute(det): """Map raw vehicle attribute outputs into the proto VehicleClass enum.""" attribute = det.get("attribute") if not isinstance(attribute, dict): return None if normalize_name_token(attribute.get("task")) != "vehicle": return None attr_cls = safe_int(attribute.get("attr_cls")) if attr_cls is None: return None is_fake = safe_int(attribute.get("is_fake")) or 0 if is_fake == 1: return object_pb2.Object.kFakeCar if attr_cls <= 11: return attr_cls if attr_cls == 23: return object_pb2.Object.kSpecialCar return attr_cls + 3 def resolve_vehicle_class(det, hit_type): """Resolve VehicleClass for vehicle-like detections. Prefer the tracked `sub_cls` field because it already encodes the upstream attribute-to-subclass mapping. Fall back to re-deriving the same mapping from the raw `attribute` payload when `sub_cls` is absent or invalid. """ if hit_type not in VEHICLE_HIT_TYPES: return None sub_cls = safe_int(det.get("sub_cls")) if is_valid_vehicle_class(sub_cls): return int(sub_cls) attr_vehicle_class = resolve_vehicle_class_from_attribute(det) if is_valid_vehicle_class(attr_vehicle_class): return int(attr_vehicle_class) return VEHICLE_CLASS_UNKNOWN def get_detection_bbox(det): """Read a bbox from either tracking or prediction-style keys.""" for key in ("bbox", "box2d"): bbox = det.get(key) if not isinstance(bbox, (list, tuple)) or len(bbox) < 4: continue values = [safe_float(v) for v in bbox[:4]] if all(v is not None for v in values): return [float(v) for v in values] return None def build_mono_measure_component(det, anchor_str=None): """Build the Mono3D measurement component attached to a mono object.""" obj_3d_ego = det.get("object_3d_ego") if not obj_3d_ego or len(obj_3d_ego) < 7: return None x, y, z, l, h, w, yaw = obj_3d_ego[:7] comp = object_pb2.Object() comp.measure_type = MEASURE_MONO_3D wi = comp.world_info wi.pos.x = float(x) wi.pos.y = float(y) wi.pos.z = float(z) wi.size.l = float(l) wi.size.h = float(h) wi.size.w = float(w) wi.pose_angle.yaw = float(yaw) wi.measure_type = MEASURE_MONO_3D if anchor_str and anchor_str in ANCHOR_MAP: wi.anchor = ANCHOR_MAP[anchor_str] return comp def populate_object_fields(det, obj, anchor_str=None, cam_id=None, include_image_info=False, include_model_3d=False): """Populate a protobuf object with the common fields from one detection.""" if anchor_str is None: anchor_str = det.get("anchor") class_id = get_detection_class_id(det) hit_type, object_category, class_name = resolve_proto_class(det) obj.hit_type = hit_type obj.hit_id = hit_type obj.object_category = object_category if class_name: obj.hit_type_str = class_name track_id = det.get("track_id") if track_id is not None: obj.id = int(track_id) frame_id = get_detection_frame_id(det) if frame_id is not None: obj.frame_id = int(frame_id) timestamp = det.get("timestamp") if timestamp is not None: obj.timestamp = int(timestamp) lane_assignment = det.get("lane_assignment") if lane_assignment is not None: obj.lane_assignment.val = int(lane_assignment) if include_image_info: bbox = get_detection_bbox(det) if bbox is not None: x1, y1, x2, y2 = bbox obj.image_info.det_rect.x = float(x1) obj.image_info.det_rect.y = float(y1) obj.image_info.det_rect.w = float(x2 - x1) obj.image_info.det_rect.h = float(y2 - y1) if cam_id is not None: obj.image_info.camera_id.id = int(cam_id) if anchor_str and anchor_str in ANCHOR_MAP: obj.world_info.anchor = ANCHOR_MAP[anchor_str] obj.world_info.id = obj.id obj.world_info.hit_type = hit_type obj.world_info.object_category = object_category vehicle_class = resolve_vehicle_class(det, hit_type) if vehicle_class is not None: obj.world_info.cls.val = int(vehicle_class) obj.world_info.cls_ori.val = int(vehicle_class) elif class_id is not None: obj.world_info.cls.val = int(class_id) obj.world_info.cls_ori.val = int(class_id) face_cls = det.get("face_cls") if face_cls is not None: face_str = str(face_cls) if face_str in FACE_CLS_MAP: obj.world_info.pose.val = FACE_CLS_MAP[face_str] obj_3d_ego = det.get("object_3d_ego") if obj_3d_ego and len(obj_3d_ego) >= 7: x, y, z, l, h, w, yaw = obj_3d_ego[:7] wi = obj.world_info wi.pos.x = float(x) wi.pos.y = float(y) wi.pos.z = float(z) wi.size.l = float(l) wi.size.h = float(h) wi.size.w = float(w) wi.pose_angle.yaw = float(yaw) if include_model_3d: obj_3d = det.get("object_3d") if obj_3d and len(obj_3d) >= 7: x, y, z, l, h, w, yaw = obj_3d[:7] m3d = obj.world_info.monocular_3d.model_3d_pos m3d.x3d = float(x) m3d.y3d = float(y) m3d.z3d = float(z) m3d.heading = float(yaw) if anchor_str and anchor_str in ANCHOR_MAP: obj.world_info.monocular_3d.anchor = ANCHOR_MAP[anchor_str] def build_object(det, cam_id=None): """Build a final Object protobuf with source-object and Mono3D nesting.""" anchor_str = det.get("anchor") source_obj = object_pb2.Object() populate_object_fields( det, source_obj, anchor_str=anchor_str, cam_id=cam_id, include_image_info=True, include_model_3d=True, ) source_obj.measure_type = MEASURE_MONO_3D source_obj.world_info.measure_type = MEASURE_MONO_3D mono_measure = build_mono_measure_component(det, anchor_str) if mono_measure is not None: source_obj.key_components.add().CopyFrom(mono_measure) obj = object_pb2.Object() populate_object_fields(det, obj, anchor_str=anchor_str) obj.key_components.add().CopyFrom(source_obj) return obj def build_object_list(frame, cam_id_override=None): """Build an ObjectList protobuf message from a frame dict.""" image_name = frame.get("image_name", "") frame_id, cam_id = parse_image_name(image_name) if cam_id_override is not None: cam_id = cam_id_override obj_list = object_pb2.ObjectList() obj_list.frame_id = frame_id obj_list.cam_id.id = cam_id detections = frame.get("detections", []) frame_timestamp = get_frame_timestamp(frame, detections) if frame_timestamp is not None: obj_list.timestamp = int(frame_timestamp) # Extract frame_id and version from the first detection. if detections: det_frame_id = get_detection_frame_id(detections[0]) if det_frame_id is not None: obj_list.frame_id = int(det_frame_id) det_version = detections[0].get("version") if det_version is not None: obj_list.version = str(det_version) for det in detections: obj = build_object(det, cam_id=cam_id) obj_list.list.append(obj) return obj_list, frame_id def convert(input_path, output_dir, cam_id_override=None): """Convert merge_tracking.json to the three-file protobuf set.""" with open(input_path, "r") as f: frames = json.load(f) os.makedirs(output_dir, exist_ok=True) bin_path = os.path.join(output_dir, "ObjectPerceptionObjectList.bin") index_path = os.path.join(output_dir, "ObjectPerceptionObjectList.index.json") data_path = os.path.join(output_dir, "ObjectPerceptionObjectList.data.json") index_entries = [] offset = 0 with open(bin_path, "wb") as bin_file: for i, frame in enumerate(frames): obj_list, frame_idx = build_object_list(frame, cam_id_override) serialized = obj_list.SerializeToString() size = len(serialized) bin_file.write(serialized) index_entries.append([frame_idx, offset, size]) offset += size print(f"Convert frame {i} (frame_id={frame_idx}, " f"detections={len(frame.get('detections', []))})", end="\r", file=sys.stderr) print(f"\nTotal frames: {len(frames)}", file=sys.stderr) # Write index.json with open(index_path, "w") as f: json.dump({"index": index_entries}, f) # Write data.json with open(data_path, "w") as f: json.dump({ "data": ["ObjectPerceptionObjectList.bin"], "index": ["ObjectPerceptionObjectList.index.json"], "elem_count": len(frames), }, f, indent=2) print(f"Output files written to: {output_dir}", file=sys.stderr) print(f" {data_path}", file=sys.stderr) print(f" {bin_path}", file=sys.stderr) print(f" {index_path}", file=sys.stderr) return data_path def main(): ap = argparse.ArgumentParser( description="Convert merge_tracking.json to ObjectPerceptionObjectList protobuf format") ap.add_argument("input", help="Path to merge_tracking.json") ap.add_argument("-o", "--output-dir", default=".", help="Output directory (default: current directory)") ap.add_argument("--cam-id", type=int, default=None, help="Override camera ID (default: parsed from image_name)") opt = ap.parse_args() convert(opt.input, opt.output_dir, opt.cam_id) if __name__ == "__main__": main()