Files

575 lines
20 KiB
Python
Raw Permalink Normal View History

2026-06-24 09:35:46 +08:00
#!/usr/bin/env python3
"""Convert merge_tracking.json to ObjectPerceptionObjectList protobuf format.
Produces three files that can be consumed by make_jl.py:
- ObjectPerceptionObjectList.data.json
- ObjectPerceptionObjectList.bin
- ObjectPerceptionObjectList.index.json
Usage:
python convert_merge_tracking.py merge_tracking.json [-o output_dir] [--cam-id N]
"""
import argparse
import json
import os
import re
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "pyproto"))
import object_pb2_new as object_pb2
import geometry_pb2
import camera_pb2
# mono3d_ground.yaml class_map numeric ids -> canonical class names.
CLASS_ID_TO_NAME = {
0: "car",
1: "suv",
2: "pickup",
3: "medium_car",
4: "van",
5: "bus",
6: "truck",
7: "special_vehicle",
8: "unknown",
9: "pedestrian",
10: "bicyclist",
11: "bicycle",
12: "tricycle",
13: "traffic_sign",
14: "wheel",
15: "plate",
16: "face",
17: "car_fake",
18: "bicyclist_fake",
19: "pedestrian_fake",
}
# Fake detector categories are carried by ObjectCategory so they do not overwrite
# the attribute-origin VehicleClass / PedCls semantics.
CAR_FAKE_CATEGORY = getattr(object_pb2.Object, "kCategoryCarFake", object_pb2.Object.kCategoryCar)
PEDESTRIAN_FAKE_CATEGORY = getattr(object_pb2.Object, "kCategoryPedestrianFake", object_pb2.Object.kCategoryPedestrian)
CYCLIST_FAKE_CATEGORY = getattr(object_pb2.Object, "kCategoryCyclistFake", object_pb2.Object.kCategoryCyclist)
if CAR_FAKE_CATEGORY == object_pb2.Object.kCategoryCar:
print(
"[WARN] object_pb2_new.py does not expose kCategoryCarFake/kCategoryPedestrianFake/kCategoryCyclistFake yet. "
"Detector-origin fake categories will currently fall back to legacy non-fake ObjectCategory values until "
"the protobuf Python bindings are regenerated from object.proto.",
file=sys.stderr,
)
# Canonical / legacy class names -> (ObjectType, ObjectCategory).
CLASS_NAME_TO_PROTO = {
"car": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryCar),
"car_fake": (object_pb2.Object.kVehicle, CAR_FAKE_CATEGORY),
"suv": (object_pb2.Object.kVehicle, object_pb2.Object.kCategorySuv),
"pickup": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryPickup),
"medium_car": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryMediumCar),
"van": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryVan),
"bus": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryBus),
"truck": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck),
"tanker": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck),
"large_truck": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck),
"construction_vehicle": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryTruck),
"special_vehicle": (object_pb2.Object.kVehicle, object_pb2.Object.kCategorySpecialVehicle),
"unknown": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryUnknownVehicle),
"pedestrian": (object_pb2.Object.kPed, object_pb2.Object.kCategoryPedestrian),
"pedestrian_fake": (object_pb2.Object.kPed, PEDESTRIAN_FAKE_CATEGORY),
"bicyclist": (object_pb2.Object.kCyclist, object_pb2.Object.kCategoryCyclist),
"bicyclist_fake": (object_pb2.Object.kCyclist, CYCLIST_FAKE_CATEGORY),
"motorcyclist": (object_pb2.Object.kCyclist, object_pb2.Object.kCategoryCyclist),
"bicycle": (object_pb2.Object.kBike, object_pb2.Object.kCategoryBike),
"motorcycle": (object_pb2.Object.kBike, object_pb2.Object.kCategoryBike),
"tricycle": (object_pb2.Object.kThreeWheeledVehicle, object_pb2.Object.kCategoryTricycle),
"tricyclist": (object_pb2.Object.kThreeWheeledVehicle, object_pb2.Object.kCategoryTricycle),
# The new schema keeps traffic-sign category fine-grained in ObjectCategory only.
"traffic_sign": (object_pb2.Object.kSmallTrafficSign, object_pb2.Object.kCategoryTrafficSign),
"wheel": (object_pb2.Object.kVehicleWheel, object_pb2.Object.kCategoryVehicleWheel),
"plate": (object_pb2.Object.kVehiclePlate, object_pb2.Object.kCategoryLicensePlate),
"face": (object_pb2.Object.kPedHead, object_pb2.Object.kCategoryHead),
# Legacy names from the pre-2026 class table.
"vehicle": (object_pb2.Object.kVehicle, object_pb2.Object.kCategoryNone),
"rider": (object_pb2.Object.kCyclist, object_pb2.Object.kCategoryCyclist),
"roadblock": (object_pb2.Object.kRoadBarrier, object_pb2.Object.kCategoryNone),
"head": (object_pb2.Object.kPedHead, object_pb2.Object.kCategoryHead),
"tsr": (object_pb2.Object.kSmallTrafficSign, object_pb2.Object.kCategoryTrafficSign),
"guideboard": (object_pb2.Object.kBigTrafficSign, object_pb2.Object.kCategoryTrafficSign),
"tl_border": (object_pb2.Object.kTrafficLight, object_pb2.Object.kCategoryNone),
"tl_wick": (object_pb2.Object.kTrafficLightBulb, object_pb2.Object.kCategoryNone),
"tl_num": (object_pb2.Object.kTrafficLightDigit, object_pb2.Object.kCategoryNone),
}
# anchor string → AnchorPtInfo enum value
ANCHOR_MAP = {
"kMonocular3DRear": 22, # AnchorPtInfo.kMonocular3DRear
"kMonocular3DFront": 21, # AnchorPtInfo.kMonocular3DFront
"kMonocular3DCenter": 25, # AnchorPtInfo.kMonocular3DCenter
"kMonocular3DLeft": 23, # AnchorPtInfo.kMonocular3DLeft
"kMonocular3DRight": 24, # AnchorPtInfo.kMonocular3DRight
}
# face_cls → VehiclePose enum value
FACE_CLS_MAP = {
"tail": 2, # kMidTail
"head": 5, # kMidHead
"side": 14, # kSide
"none": 0, # kInvalid
}
MEASURE_MONO_3D = object_pb2.Object.kMeasureMono3D
VEHICLE_HIT_TYPES = {
object_pb2.Object.kVehicle,
object_pb2.Object.kThreeWheeledVehicle,
}
VEHICLE_CLASS_MIN = object_pb2.Object.kNegative
VEHICLE_CLASS_MAX = object_pb2.Object.kFakeCar
VEHICLE_CLASS_UNKNOWN = object_pb2.Object.kVehicleUnknown
def parse_numeric_value(value):
"""Convert JSON scalar strings to int / float when possible."""
if value is None:
return None
if isinstance(value, (int, float)):
return value
value_str = str(value).strip()
if not value_str:
return None
try:
numeric = float(value_str)
except ValueError:
return None
if numeric.is_integer():
return int(numeric)
return numeric
def safe_int(value):
"""Best-effort integer conversion."""
numeric = parse_numeric_value(value)
if numeric is None:
return None
try:
return int(numeric)
except (TypeError, ValueError):
return None
def safe_float(value):
"""Best-effort float conversion."""
numeric = parse_numeric_value(value)
if numeric is None:
return None
try:
return float(numeric)
except (TypeError, ValueError):
return None
def normalize_name_token(value):
"""Normalize class/task-like names to a lowercase underscore form."""
normalized = re.sub(r"[^a-z0-9]+", "_", str(value or "").strip().lower())
return normalized.strip("_")
def parse_image_name(image_name):
"""Extract frame_id and cam_id from image_name.
Example: "000005_camera4_000006_merged" frame_id=5, cam_id=4
"""
frame_id = 0
cam_id = 0
# Extract first number segment as frame index.
# Fall back to the penultimate tail number for names like:
# G1M3_xxx_uuid_000000_289998
m = re.match(r"(\d+)", image_name)
if m:
frame_id = int(m.group(1))
else:
m = re.search(r"_(\d+)_(\d+)(?:_merged)?$", image_name)
if m:
frame_id = int(m.group(1))
# Extract camera number
m = re.search(r"camera(\d+)", image_name)
if not m:
m = re.search(r"G\d+M(\d+)", image_name)
if m:
cam_id = int(m.group(1))
return frame_id, cam_id
def get_detection_frame_id(det):
"""Read frame id from either the old or new input schema."""
frame_id = det.get("frame_id")
if frame_id is None:
frame_id = det.get("frameId")
return frame_id
def get_frame_timestamp(frame, detections):
"""Read the frame timestamp from frame-level data or the first detection."""
timestamp = safe_int(frame.get("timestamp"))
if timestamp is not None:
return timestamp
if not detections:
return None
return safe_int(detections[0].get("timestamp"))
def get_detection_class_id(det):
"""Read class id from old/new tracking or prediction schemas."""
for key in ("class_id", "cls_id", "type"):
class_id = safe_int(det.get(key))
if class_id is not None:
return class_id
return None
def get_detection_class_name(det):
"""Resolve the most reliable class name for mapping."""
for key in ("cls_name", "class_name", "type_name"):
value = det.get(key)
if value is None:
continue
class_name = str(value).strip().lower()
if not class_name:
continue
if class_name in CLASS_NAME_TO_PROTO:
return class_name
class_id = get_detection_class_id(det)
if class_id is not None and class_id in CLASS_ID_TO_NAME:
return CLASS_ID_TO_NAME[class_id]
return class_name
class_id = get_detection_class_id(det)
if class_id is None:
return ""
return CLASS_ID_TO_NAME.get(class_id, "")
def is_detector_fake_vehicle_class(class_name):
"""Return whether a normalized detector class name denotes a fake vehicle class."""
return normalize_name_token(class_name) == "car_fake"
def resolve_proto_class(det):
"""Resolve ObjectType / ObjectCategory from one detection."""
class_name = get_detection_class_name(det)
if is_detector_fake_vehicle_class(class_name):
return object_pb2.Object.kVehicle, CAR_FAKE_CATEGORY, class_name
hit_type, object_category = CLASS_NAME_TO_PROTO.get(
class_name,
(object_pb2.Object.kNone, object_pb2.Object.kCategoryNone),
)
return hit_type, object_category, class_name
def is_valid_vehicle_class(value):
"""Check whether a numeric value fits the VehicleClass enum range."""
if value is None:
return False
return VEHICLE_CLASS_MIN <= int(value) <= VEHICLE_CLASS_MAX
def resolve_vehicle_class_from_attribute(det):
"""Map raw vehicle attribute outputs into the proto VehicleClass enum."""
attribute = det.get("attribute")
if not isinstance(attribute, dict):
return None
if normalize_name_token(attribute.get("task")) != "vehicle":
return None
attr_cls = safe_int(attribute.get("attr_cls"))
if attr_cls is None:
return None
is_fake = safe_int(attribute.get("is_fake")) or 0
if is_fake == 1:
return object_pb2.Object.kFakeCar
if attr_cls <= 11:
return attr_cls
if attr_cls == 23:
return object_pb2.Object.kSpecialCar
return attr_cls + 3
def resolve_vehicle_class(det, hit_type):
"""Resolve VehicleClass for vehicle-like detections.
Prefer the tracked `sub_cls` field because it already encodes the upstream
attribute-to-subclass mapping. Fall back to re-deriving the same mapping
from the raw `attribute` payload when `sub_cls` is absent or invalid.
"""
if hit_type not in VEHICLE_HIT_TYPES:
return None
sub_cls = safe_int(det.get("sub_cls"))
if is_valid_vehicle_class(sub_cls):
return int(sub_cls)
attr_vehicle_class = resolve_vehicle_class_from_attribute(det)
if is_valid_vehicle_class(attr_vehicle_class):
return int(attr_vehicle_class)
return VEHICLE_CLASS_UNKNOWN
def get_detection_bbox(det):
"""Read a bbox from either tracking or prediction-style keys."""
for key in ("bbox", "box2d"):
bbox = det.get(key)
if not isinstance(bbox, (list, tuple)) or len(bbox) < 4:
continue
values = [safe_float(v) for v in bbox[:4]]
if all(v is not None for v in values):
return [float(v) for v in values]
return None
def build_mono_measure_component(det, anchor_str=None):
"""Build the Mono3D measurement component attached to a mono object."""
obj_3d_ego = det.get("object_3d_ego")
if not obj_3d_ego or len(obj_3d_ego) < 7:
return None
x, y, z, l, h, w, yaw = obj_3d_ego[:7]
comp = object_pb2.Object()
comp.measure_type = MEASURE_MONO_3D
wi = comp.world_info
wi.pos.x = float(x)
wi.pos.y = float(y)
wi.pos.z = float(z)
wi.size.l = float(l)
wi.size.h = float(h)
wi.size.w = float(w)
wi.pose_angle.yaw = float(yaw)
wi.measure_type = MEASURE_MONO_3D
if anchor_str and anchor_str in ANCHOR_MAP:
wi.anchor = ANCHOR_MAP[anchor_str]
return comp
def populate_object_fields(det, obj, anchor_str=None, cam_id=None,
include_image_info=False, include_model_3d=False):
"""Populate a protobuf object with the common fields from one detection."""
if anchor_str is None:
anchor_str = det.get("anchor")
class_id = get_detection_class_id(det)
hit_type, object_category, class_name = resolve_proto_class(det)
obj.hit_type = hit_type
obj.hit_id = hit_type
obj.object_category = object_category
if class_name:
obj.hit_type_str = class_name
track_id = det.get("track_id")
if track_id is not None:
obj.id = int(track_id)
frame_id = get_detection_frame_id(det)
if frame_id is not None:
obj.frame_id = int(frame_id)
timestamp = det.get("timestamp")
if timestamp is not None:
obj.timestamp = int(timestamp)
lane_assignment = det.get("lane_assignment")
if lane_assignment is not None:
obj.lane_assignment.val = int(lane_assignment)
if include_image_info:
bbox = get_detection_bbox(det)
if bbox is not None:
x1, y1, x2, y2 = bbox
obj.image_info.det_rect.x = float(x1)
obj.image_info.det_rect.y = float(y1)
obj.image_info.det_rect.w = float(x2 - x1)
obj.image_info.det_rect.h = float(y2 - y1)
if cam_id is not None:
obj.image_info.camera_id.id = int(cam_id)
if anchor_str and anchor_str in ANCHOR_MAP:
obj.world_info.anchor = ANCHOR_MAP[anchor_str]
obj.world_info.id = obj.id
obj.world_info.hit_type = hit_type
obj.world_info.object_category = object_category
vehicle_class = resolve_vehicle_class(det, hit_type)
if vehicle_class is not None:
obj.world_info.cls.val = int(vehicle_class)
obj.world_info.cls_ori.val = int(vehicle_class)
elif class_id is not None:
obj.world_info.cls.val = int(class_id)
obj.world_info.cls_ori.val = int(class_id)
face_cls = det.get("face_cls")
if face_cls is not None:
face_str = str(face_cls)
if face_str in FACE_CLS_MAP:
obj.world_info.pose.val = FACE_CLS_MAP[face_str]
obj_3d_ego = det.get("object_3d_ego")
if obj_3d_ego and len(obj_3d_ego) >= 7:
x, y, z, l, h, w, yaw = obj_3d_ego[:7]
wi = obj.world_info
wi.pos.x = float(x)
wi.pos.y = float(y)
wi.pos.z = float(z)
wi.size.l = float(l)
wi.size.h = float(h)
wi.size.w = float(w)
wi.pose_angle.yaw = float(yaw)
if include_model_3d:
obj_3d = det.get("object_3d")
if obj_3d and len(obj_3d) >= 7:
x, y, z, l, h, w, yaw = obj_3d[:7]
m3d = obj.world_info.monocular_3d.model_3d_pos
m3d.x3d = float(x)
m3d.y3d = float(y)
m3d.z3d = float(z)
m3d.heading = float(yaw)
if anchor_str and anchor_str in ANCHOR_MAP:
obj.world_info.monocular_3d.anchor = ANCHOR_MAP[anchor_str]
def build_object(det, cam_id=None):
"""Build a final Object protobuf with source-object and Mono3D nesting."""
anchor_str = det.get("anchor")
source_obj = object_pb2.Object()
populate_object_fields(
det,
source_obj,
anchor_str=anchor_str,
cam_id=cam_id,
include_image_info=True,
include_model_3d=True,
)
source_obj.measure_type = MEASURE_MONO_3D
source_obj.world_info.measure_type = MEASURE_MONO_3D
mono_measure = build_mono_measure_component(det, anchor_str)
if mono_measure is not None:
source_obj.key_components.add().CopyFrom(mono_measure)
obj = object_pb2.Object()
populate_object_fields(det, obj, anchor_str=anchor_str)
obj.key_components.add().CopyFrom(source_obj)
return obj
def build_object_list(frame, cam_id_override=None):
"""Build an ObjectList protobuf message from a frame dict."""
image_name = frame.get("image_name", "")
frame_id, cam_id = parse_image_name(image_name)
if cam_id_override is not None:
cam_id = cam_id_override
obj_list = object_pb2.ObjectList()
obj_list.frame_id = frame_id
obj_list.cam_id.id = cam_id
detections = frame.get("detections", [])
frame_timestamp = get_frame_timestamp(frame, detections)
if frame_timestamp is not None:
obj_list.timestamp = int(frame_timestamp)
# Extract frame_id and version from the first detection.
if detections:
det_frame_id = get_detection_frame_id(detections[0])
if det_frame_id is not None:
obj_list.frame_id = int(det_frame_id)
det_version = detections[0].get("version")
if det_version is not None:
obj_list.version = str(det_version)
for det in detections:
obj = build_object(det, cam_id=cam_id)
obj_list.list.append(obj)
return obj_list, frame_id
def convert(input_path, output_dir, cam_id_override=None):
"""Convert merge_tracking.json to the three-file protobuf set."""
with open(input_path, "r") as f:
frames = json.load(f)
os.makedirs(output_dir, exist_ok=True)
bin_path = os.path.join(output_dir, "ObjectPerceptionObjectList.bin")
index_path = os.path.join(output_dir, "ObjectPerceptionObjectList.index.json")
data_path = os.path.join(output_dir, "ObjectPerceptionObjectList.data.json")
index_entries = []
offset = 0
with open(bin_path, "wb") as bin_file:
for i, frame in enumerate(frames):
obj_list, frame_idx = build_object_list(frame, cam_id_override)
serialized = obj_list.SerializeToString()
size = len(serialized)
bin_file.write(serialized)
index_entries.append([frame_idx, offset, size])
offset += size
print(f"Convert frame {i} (frame_id={frame_idx}, "
f"detections={len(frame.get('detections', []))})",
end="\r", file=sys.stderr)
print(f"\nTotal frames: {len(frames)}", file=sys.stderr)
# Write index.json
with open(index_path, "w") as f:
json.dump({"index": index_entries}, f)
# Write data.json
with open(data_path, "w") as f:
json.dump({
"data": ["ObjectPerceptionObjectList.bin"],
"index": ["ObjectPerceptionObjectList.index.json"],
"elem_count": len(frames),
}, f, indent=2)
print(f"Output files written to: {output_dir}", file=sys.stderr)
print(f" {data_path}", file=sys.stderr)
print(f" {bin_path}", file=sys.stderr)
print(f" {index_path}", file=sys.stderr)
return data_path
def main():
ap = argparse.ArgumentParser(
description="Convert merge_tracking.json to ObjectPerceptionObjectList protobuf format")
ap.add_argument("input", help="Path to merge_tracking.json")
ap.add_argument("-o", "--output-dir", default=".",
help="Output directory (default: current directory)")
ap.add_argument("--cam-id", type=int, default=None,
help="Override camera ID (default: parsed from image_name)")
opt = ap.parse_args()
convert(opt.input, opt.output_dir, opt.cam_id)
if __name__ == "__main__":
main()