732 lines
28 KiB
Python
Executable File
732 lines
28 KiB
Python
Executable File
# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
|
|
|
|
"""Ground 3D detection data utilities.
|
|
|
|
Functions for on-the-fly label parsing, calibration reading, and virtual camera augmentation
|
|
for joint 2D+3D ground detection training.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
def parse_ground_3d_label_file(lb_file, class_map, difficulty_weights, face_3d_classes, complete_3d_classes, min_wh=2.0):
|
|
"""Parse a ground 3D label file on-the-fly. Returns 2D labels dict + 3D array.
|
|
|
|
Reads label files with variable column counts:
|
|
- 6-col: [class_name, x, y, w, h, difficulty] — 2D only
|
|
- 19-col: [class_name, x, y, w, h, ...3D(13cols)..., difficulty] — complete 3D (no face)
|
|
- 51-col: [class_name, x, y, w, h, ...3D+faces(45cols)..., difficulty] — face 3D
|
|
|
|
The 48-dim internal format per object:
|
|
[0]: class_id, [1-4]: 2D bbox (xywh norm)
|
|
[5-7]: 3D center (x3d, y3d, z3d), [8-10]: dims (l, h, w)
|
|
[11]: rot_y, [12-13]: 3D box center projection (xc, yc), [14]: alpha
|
|
[15-22]: front face, [23-30]: rear face, [31-38]: left face, [39-46]: right face
|
|
[47]: difficulty level
|
|
|
|
Args:
|
|
lb_file (str): Path to label file.
|
|
class_map (dict): Mapping from class names to class IDs.
|
|
difficulty_weights (list): Loss weights for difficulty levels [easy, normal, medium, hard].
|
|
face_3d_classes (set): Class IDs with 4-face annotations (51-col).
|
|
complete_3d_classes (set): Class IDs with whole-box 3D only (19-col).
|
|
min_wh (float): Minimum box width/height in normalized coords for filtering.
|
|
|
|
Returns:
|
|
lb_2d (dict): Dict with cls (n,1), bboxes (n,4), difficulties (n,1),
|
|
segments, keypoints, normalized, bbox_format.
|
|
lb_3d (np.ndarray): 3D portion shape (n, 42). Objects without 3D GT keep NaN values;
|
|
images without labels return an empty `(0, 42)` array.
|
|
"""
|
|
empty_2d = {
|
|
"cls": np.zeros((0, 1), dtype=np.float32),
|
|
"bboxes": np.zeros((0, 4), dtype=np.float32),
|
|
"difficulties": np.zeros((0, 1), dtype=np.float32),
|
|
"difficulty_levels": np.zeros((0, 1), dtype=np.int64),
|
|
"segments": [],
|
|
"keypoints": None,
|
|
"normalized": True,
|
|
"bbox_format": "xywh",
|
|
}
|
|
empty_3d = np.full((0, 42), np.nan, dtype=np.float32)
|
|
|
|
if not os.path.isfile(lb_file):
|
|
return empty_2d, empty_3d
|
|
|
|
# Read file once
|
|
with open(lb_file, encoding="utf-8") as f:
|
|
lines = f.read().strip().splitlines()
|
|
|
|
if not lines:
|
|
return empty_2d, empty_3d
|
|
|
|
labels = []
|
|
# Cache method lookups to avoid repeated attribute access
|
|
class_map_get = class_map.get
|
|
face_3d_classes_contains = face_3d_classes.__contains__
|
|
complete_3d_classes_contains = complete_3d_classes.__contains__
|
|
|
|
for line in lines:
|
|
parts = line.split() # split() without strip() is faster
|
|
if not parts:
|
|
continue
|
|
|
|
cls_name = parts[0]
|
|
cls_id = class_map_get(cls_name)
|
|
if cls_id is None:
|
|
continue # skip unknown classes
|
|
|
|
ncols = len(parts)
|
|
if ncols == 6:
|
|
# 2D only: [class, x, y, w, h, difficulty]
|
|
temp = np.full(48, np.nan, dtype=np.float32)
|
|
temp[0] = cls_id
|
|
temp[1:5] = [float(parts[i]) for i in range(1, 5)]
|
|
temp[47] = float(parts[5])
|
|
labels.append(temp)
|
|
elif ncols == 19 and complete_3d_classes_contains(cls_id):
|
|
# Complete 3D without face: 19 cols, difficulty at index 18
|
|
# Use indices 1-13 + 16 (skip 14,15 which are unused), map to temp[1:15]
|
|
temp = np.full(48, np.nan, dtype=np.float32)
|
|
temp[0] = cls_id
|
|
useful_indices = list(range(1, 14)) + [16]
|
|
temp[1:15] = [float(parts[i]) for i in useful_indices]
|
|
temp[47] = float(parts[18])
|
|
labels.append(temp)
|
|
elif ncols == 51 and face_3d_classes_contains(cls_id):
|
|
# Face 3D: 51 cols, difficulty at index 50
|
|
# [cls_id] + indices 1-13 + 16 → temp[0:15], then indices 18-49 → temp[15:47], difficulty → temp[47]
|
|
temp_list = [cls_id] + [float(parts[i]) for i in (list(range(1, 14)) + [16])]
|
|
temp_list.extend(float(parts[i]) for i in range(18, 50))
|
|
temp_list.append(float(parts[50])) # difficulty
|
|
labels.append(np.array(temp_list, dtype=np.float32))
|
|
elif ncols == 7:
|
|
# 2D with two difficulty columns: [class, x, y, w, h, diff1, diff2]
|
|
# In joint 2D&3D training, diff2 is truncation, so use only diff1 for difficulty supervision.
|
|
temp = np.full(48, np.nan, dtype=np.float32)
|
|
temp[0] = cls_id
|
|
temp[1:5] = [float(parts[i]) for i in range(1, 5)]
|
|
temp[47] = float(parts[5])
|
|
labels.append(temp)
|
|
else:
|
|
raise ValueError(f"Unexpected number of columns ({ncols}) in label file {lb_file}: '{line}'")
|
|
|
|
if not labels:
|
|
return empty_2d, empty_3d
|
|
|
|
lb = np.stack(labels, axis=0) # (n, 48)
|
|
nl = len(lb)
|
|
|
|
# Validate
|
|
if nl > 0:
|
|
assert lb.shape[1] == 48, f"labels require 48 columns, got {lb.shape[1]}"
|
|
# Remove duplicates
|
|
_, idx = np.unique(lb, axis=0, return_index=True)
|
|
if len(idx) < nl:
|
|
lb = lb[idx]
|
|
|
|
if len(lb) == 0:
|
|
return empty_2d, empty_3d
|
|
|
|
# Split into 2D and 3D portions
|
|
cls = lb[:, 0:1] # (n, 1)
|
|
bboxes = lb[:, 1:5] # (n, 4) xywh normalized
|
|
|
|
# Difficulty → loss weights
|
|
dw = difficulty_weights
|
|
raw_diff = lb[:, 47].astype(int).clip(0, len(dw) - 1)
|
|
difficulties = np.array([dw[d] for d in raw_diff], dtype=np.float32).reshape(-1, 1)
|
|
difficulty_levels = raw_diff.astype(np.int64).reshape(-1, 1)
|
|
|
|
# 3D portion: columns 5-46 (42 dims)
|
|
# [x3d, y3d, z3d, l, h, w, rot_y, xc, yc, alpha, front(8), rear(8), left(8), right(8)]
|
|
labels_3d = lb[:, 5:47] # (n, 42)
|
|
|
|
lb_2d = {
|
|
"cls": cls,
|
|
"bboxes": bboxes,
|
|
"difficulties": difficulties,
|
|
"difficulty_levels": difficulty_levels,
|
|
"segments": [],
|
|
"keypoints": None,
|
|
"normalized": True,
|
|
"bbox_format": "xywh",
|
|
}
|
|
|
|
return lb_2d, labels_3d
|
|
|
|
|
|
@lru_cache(maxsize=256)
|
|
def _read_label_root_camera4_cached(calib_path_str):
|
|
"""Cached helper to read a label-root clip-level camera4.json file."""
|
|
import math
|
|
|
|
with open(calib_path_str, encoding="utf-8") as f:
|
|
payload = json.load(f)
|
|
|
|
required = ("focal_u", "focal_v", "cu", "cv")
|
|
if any(key not in payload for key in required):
|
|
return None
|
|
|
|
calib = dict(payload)
|
|
calib["focal_u"] = float(payload["focal_u"])
|
|
calib["focal_v"] = float(payload["focal_v"])
|
|
calib["cu"] = float(payload["cu"])
|
|
calib["cv"] = float(payload["cv"])
|
|
calib["distort_coeffs"] = list(payload.get("distort_coeffs", []))
|
|
if "pitch" in payload:
|
|
calib["pitch"] = math.radians(float(payload["pitch"]))
|
|
for angle_key in ("roll", "yaw"):
|
|
if angle_key in payload:
|
|
calib[angle_key] = math.radians(float(payload[angle_key]))
|
|
return calib
|
|
|
|
|
|
def read_calib_from_path(img_path, image_root=None, extra_calib_candidates=None):
|
|
"""Read clip-level camera4.json from the label-root calibration folder.
|
|
|
|
Args:
|
|
img_path (str): Path to the image file.
|
|
image_root (str | Path | None): Unused compatibility arg kept for existing call sites.
|
|
extra_calib_candidates (Iterable[str | Path] | None): Label-root per-frame calibration candidates. The loader
|
|
resolves each candidate's sibling `L2_calib/camera4.json` and ignores all other layouts.
|
|
|
|
Returns:
|
|
dict | None: Calibration dict with keys: focal_u, focal_v, cu, cv, pitch (radians), distort_coeffs.
|
|
Returns None if calibration file not found.
|
|
"""
|
|
_ = img_path, image_root
|
|
|
|
for candidate in extra_calib_candidates or ():
|
|
candidate = Path(candidate).resolve()
|
|
camera4_path = candidate if candidate.name == "camera4.json" else candidate.parent / "L2_calib" / "camera4.json"
|
|
if camera4_path.exists():
|
|
return _read_label_root_camera4_cached(str(camera4_path))
|
|
|
|
return None
|
|
|
|
|
|
def compute_vanishing_point_x(raw_calib, ori_w):
|
|
"""Compute vanishing point X from calibration."""
|
|
if raw_calib is None:
|
|
return ori_w / 2
|
|
|
|
return raw_calib.get("cu", ori_w / 2)
|
|
|
|
|
|
def compute_vanishing_point_y(raw_calib, ori_h):
|
|
"""Compute vanishing point Y from calibration."""
|
|
if raw_calib is None:
|
|
return ori_h / 2
|
|
|
|
cv_orig = raw_calib.get("cv", ori_h / 2)
|
|
pitch = raw_calib.get("pitch", 0.0)
|
|
focal_v = raw_calib.get("focal_v", ori_h)
|
|
return cv_orig - focal_v * np.tan(pitch) if pitch != 0 else cv_orig
|
|
|
|
|
|
def compute_centered_roi_bounds(ori_w, ori_h, roi_w, roi_h, center_x, center_y):
|
|
"""Compute ROI bounds centered on the requested crop center."""
|
|
crop_x1 = int(max(0, min(center_x - roi_w / 2, ori_w - roi_w)))
|
|
crop_y1 = int(max(0, min(center_y - roi_h / 2, ori_h - roi_h)))
|
|
return crop_x1, crop_y1, crop_x1 + roi_w, crop_y1 + roi_h
|
|
|
|
|
|
def adjust_calib_for_roi_crop(raw_calib, ori_w, ori_h, crop_bounds=None):
|
|
"""Shift intrinsics into ROI crop coordinates before resize."""
|
|
crop_x1, crop_y1, crop_x2, crop_y2 = crop_bounds or (0, 0, ori_w, ori_h)
|
|
cu = raw_calib.get("cu", ori_w / 2) if raw_calib else ori_w / 2
|
|
cv = raw_calib.get("cv", ori_h / 2) if raw_calib else ori_h / 2
|
|
focal_u = raw_calib.get("focal_u", ori_w) if raw_calib else ori_w
|
|
focal_v = raw_calib.get("focal_v", ori_h) if raw_calib else ori_h
|
|
distort_coeffs = raw_calib.get("distort_coeffs", []) if raw_calib else []
|
|
return {
|
|
"focal_u": focal_u,
|
|
"focal_v": focal_v,
|
|
"cu": cu - crop_x1,
|
|
"cv": cv - crop_y1,
|
|
"src_w": crop_x2 - crop_x1,
|
|
"src_h": crop_y2 - crop_y1,
|
|
"distort_coeffs": distort_coeffs,
|
|
}
|
|
|
|
|
|
def build_final_resized_calib(focal_u, focal_v, cu, cv, src_w, src_h, target_w, target_h, virtual_fx, distort_coeffs=None):
|
|
"""Build final calibration after ROI crop and direct resize."""
|
|
scale_x = target_w / src_w
|
|
scale_y = target_h / src_h
|
|
fx_final = focal_u * scale_x
|
|
return {
|
|
"fx": fx_final,
|
|
"fy": focal_v * scale_y,
|
|
"cx": cu * scale_x,
|
|
"cy": cv * scale_y,
|
|
"distort_coeffs": distort_coeffs if distort_coeffs is not None else [],
|
|
"depth_scale": fx_final / virtual_fx,
|
|
}
|
|
|
|
|
|
def pack_labels_to_48(lb_2d, lb_3d):
|
|
"""Pack 2D and 3D labels into the internal augmentation representation."""
|
|
bboxes = lb_2d["bboxes"]
|
|
n = len(bboxes)
|
|
if n == 0:
|
|
return np.zeros((0, 49), dtype=np.float32)
|
|
|
|
labels_48 = np.full((n, 49), np.nan, dtype=np.float32)
|
|
labels_48[:, 0] = lb_2d["cls"].reshape(-1)
|
|
labels_48[:, 1:5] = bboxes
|
|
labels_48[:, 47] = lb_2d["difficulties"].reshape(-1)
|
|
labels_48[:, 48] = lb_2d.get("difficulty_levels", np.zeros((n, 1), dtype=np.int64)).reshape(-1)
|
|
if lb_3d is not None and len(lb_3d):
|
|
labels_48[:, 5:47] = lb_3d
|
|
return labels_48
|
|
|
|
|
|
def unpack_labels_from_48(labels_48):
|
|
"""Unpack the internal 48-dim representation into 2D and 3D labels."""
|
|
lb_2d = {
|
|
"cls": np.zeros((0, 1), dtype=np.float32),
|
|
"bboxes": np.zeros((0, 4), dtype=np.float32),
|
|
"difficulties": np.zeros((0, 1), dtype=np.float32),
|
|
"difficulty_levels": np.zeros((0, 1), dtype=np.int64),
|
|
"segments": [],
|
|
"keypoints": None,
|
|
"normalized": True,
|
|
"bbox_format": "xywh",
|
|
}
|
|
if len(labels_48) == 0:
|
|
return lb_2d, None
|
|
|
|
lb_2d["cls"] = labels_48[:, 0:1]
|
|
lb_2d["bboxes"] = labels_48[:, 1:5]
|
|
lb_2d["difficulties"] = labels_48[:, 47:48]
|
|
lb_2d["difficulty_levels"] = labels_48[:, 48:49].astype(np.int64) if labels_48.shape[1] > 48 else np.zeros((len(labels_48), 1), dtype=np.int64)
|
|
return lb_2d, labels_48[:, 5:47]
|
|
|
|
|
|
def _handle_cut_labels_42(labels, outside_mask, still_inside_mask):
|
|
"""Handle cut-in/cut-out updates for ROI-remapped 42-dim labels."""
|
|
if len(labels) == 0:
|
|
return
|
|
|
|
partial_mask = ~(still_inside_mask | outside_mask)
|
|
if not np.any(partial_mask):
|
|
return
|
|
|
|
rot_y = labels[partial_mask, 6]
|
|
is_cut_in = (rot_y >= -np.pi) & (rot_y <= 0)
|
|
partial_indices = np.where(partial_mask)[0]
|
|
|
|
def _invalidate_face(face_indices, face_offset):
|
|
labels[np.ix_(face_indices, np.arange(face_offset, face_offset + 6))] = -1
|
|
labels[face_indices, face_offset + 6] = 0
|
|
labels[face_indices, face_offset + 7] = 0
|
|
|
|
cut_in_idx = partial_indices[is_cut_in]
|
|
if len(cut_in_idx):
|
|
for face_offset in (18, 26, 34):
|
|
_invalidate_face(cut_in_idx, face_offset)
|
|
labels[cut_in_idx, 16] = 1
|
|
labels[cut_in_idx, 17] = 1
|
|
|
|
cut_out_idx = partial_indices[~is_cut_in]
|
|
if len(cut_out_idx):
|
|
for face_offset in (10, 26, 34):
|
|
_invalidate_face(cut_out_idx, face_offset)
|
|
labels[cut_out_idx, 24] = 1
|
|
labels[cut_out_idx, 25] = 1
|
|
|
|
|
|
def remap_labels_to_roi(lb_2d, lb_3d, ori_w, ori_h, crop_bounds):
|
|
"""Shift boxes and UV coordinates from original image space into ROI-normalized space."""
|
|
bboxes = lb_2d["bboxes"]
|
|
if len(bboxes) == 0:
|
|
return lb_2d, lb_3d
|
|
|
|
crop_x1, crop_y1, crop_x2, crop_y2 = crop_bounds
|
|
roi_width = crop_x2 - crop_x1
|
|
roi_height = crop_y2 - crop_y1
|
|
|
|
bboxes = bboxes.copy()
|
|
x1 = (bboxes[:, 0] - bboxes[:, 2] / 2) * ori_w
|
|
y1 = (bboxes[:, 1] - bboxes[:, 3] / 2) * ori_h
|
|
x2 = (bboxes[:, 0] + bboxes[:, 2] / 2) * ori_w
|
|
y2 = (bboxes[:, 1] + bboxes[:, 3] / 2) * ori_h
|
|
|
|
x1_roi = x1 - crop_x1
|
|
y1_roi = y1 - crop_y1
|
|
x2_roi = x2 - crop_x1
|
|
y2_roi = y2 - crop_y1
|
|
|
|
still_inside = (x1_roi >= 0) & (y1_roi >= 0) & (x2_roi < roi_width) & (y2_roi < roi_height)
|
|
outside = (
|
|
((x1_roi < 0) & (x2_roi < 0))
|
|
| ((x1_roi >= roi_width) & (x2_roi >= roi_width))
|
|
| ((y1_roi < 0) & (y2_roi < 0))
|
|
| ((y1_roi >= roi_height) & (y2_roi >= roi_height))
|
|
)
|
|
|
|
if lb_3d is not None and len(lb_3d) > 0:
|
|
lb_3d = lb_3d.copy()
|
|
_handle_cut_labels_42(lb_3d, outside, still_inside)
|
|
|
|
x1_roi = np.clip(x1_roi, 0, roi_width - 1)
|
|
y1_roi = np.clip(y1_roi, 0, roi_height - 1)
|
|
x2_roi = np.clip(x2_roi, 0, roi_width - 1)
|
|
y2_roi = np.clip(y2_roi, 0, roi_height - 1)
|
|
|
|
bboxes[:, 0] = (x1_roi + x2_roi) * 0.5 / roi_width
|
|
bboxes[:, 1] = (y1_roi + y2_roi) * 0.5 / roi_height
|
|
bboxes[:, 2] = (x2_roi - x1_roi) / roi_width
|
|
bboxes[:, 3] = (y2_roi - y1_roi) / roi_height
|
|
|
|
keep = ~outside
|
|
lb_2d = {
|
|
**lb_2d,
|
|
"bboxes": bboxes[keep],
|
|
"cls": lb_2d["cls"][keep],
|
|
"difficulties": lb_2d["difficulties"][keep],
|
|
"difficulty_levels": lb_2d["difficulty_levels"][keep],
|
|
}
|
|
|
|
if lb_3d is not None and len(lb_3d) > 0:
|
|
lb_3d = lb_3d[keep]
|
|
for xi, yi in [(7, 8), (14, 15), (22, 23), (30, 31), (38, 39)]:
|
|
valid = ~np.isnan(lb_3d[:, xi]) & (lb_3d[:, xi] != -1)
|
|
if np.any(valid):
|
|
lb_3d[valid, xi] = (lb_3d[valid, xi] * ori_w - crop_x1) / roi_width
|
|
lb_3d[valid, yi] = (lb_3d[valid, yi] * ori_h - crop_y1) / roi_height
|
|
|
|
return lb_2d, lb_3d
|
|
|
|
|
|
def normalize_roi_depth(lb_3d, fx_final, virtual_fx):
|
|
"""Normalize ROI z3d targets to the canonical virtual focal length."""
|
|
if lb_3d is None or len(lb_3d) == 0:
|
|
return lb_3d
|
|
|
|
lb_3d = lb_3d.copy()
|
|
z3d_scale = virtual_fx / fx_final
|
|
mask = ~np.isnan(lb_3d[:, 2]) & (lb_3d[:, 2] > 0)
|
|
lb_3d[mask, 2] *= z3d_scale
|
|
for col in [12, 20, 28, 36]:
|
|
mask = ~np.isnan(lb_3d[:, col]) & (lb_3d[:, col] != -1.0) & (lb_3d[:, col] > 0)
|
|
lb_3d[mask, col] *= z3d_scale
|
|
return lb_3d
|
|
|
|
|
|
def compute_simul_calib(calib_params, ori_img_size, target_size, crop_center_x, crop_center_y, target_fx, augment=False):
|
|
"""Compute virtual camera calibration parameters from original fisheye calibration.
|
|
|
|
Uses OpenCV to compute optimal new camera matrix after undistortion (no black margins),
|
|
then crops a region while maintaining target aspect ratio.
|
|
|
|
Port from yolov5-3d/utils/dataloaders3d_ground.py:698-824.
|
|
|
|
Args:
|
|
calib_params (dict): Original calibration with focal_u, focal_v, cu, cv, distort_coeffs.
|
|
ori_img_size (tuple): Original image size (width, height).
|
|
target_size (tuple): Target size (width, height) e.g., (960, 480).
|
|
crop_center_x (float): Crop center X in distorted image (typically image center).
|
|
crop_center_y (float): Crop center Y in distorted image (typically vanishing point Y).
|
|
target_fx (float): Target focal length x for virtual camera.
|
|
augment (bool): If True, randomly select crop size between min and max.
|
|
|
|
Returns:
|
|
dict: Virtual camera calibration with fx, fy, cx, cy, crop_bounds, scale, K_undistorted, fx_to_target_scale.
|
|
"""
|
|
import math
|
|
import random
|
|
|
|
fx_orig = calib_params["focal_u"]
|
|
fy_orig = calib_params["focal_v"]
|
|
cx_orig = calib_params["cu"]
|
|
cy_orig = calib_params["cv"]
|
|
distort_coeffs = calib_params.get("distort_coeffs", [])
|
|
|
|
ori_w, ori_h = ori_img_size
|
|
target_w, target_h = target_size
|
|
|
|
K_orig = np.array([[fx_orig, 0, cx_orig], [0, fy_orig, cy_orig], [0, 0, 1]], dtype=np.float64)
|
|
D = np.array(distort_coeffs[:4], dtype=np.float64) if len(distort_coeffs) >= 4 else np.zeros(4, dtype=np.float64)
|
|
|
|
# Optimal new camera matrix (no black margins)
|
|
K_new = cv2.fisheye.estimateNewCameraMatrixForUndistortRectify(K_orig, D, (ori_w, ori_h), np.eye(3), balance=0.0)
|
|
|
|
fx_undist = K_new[0, 0]
|
|
fy_undist = K_new[1, 1]
|
|
cx_undist = K_new[0, 2]
|
|
cy_undist = K_new[1, 2]
|
|
|
|
# Undistort crop center point
|
|
dist_point = np.array([[[crop_center_x, crop_center_y]]], dtype=np.float32)
|
|
undist_point = cv2.fisheye.undistortPoints(dist_point, K_orig, D, P=K_new)
|
|
cx_undist_crop = undist_point[0, 0, 0]
|
|
cy_undist_crop = undist_point[0, 0, 1]
|
|
|
|
# Max crop dimensions centered on undistorted crop center
|
|
max_w = min(cx_undist_crop * 2, (ori_w - cx_undist_crop) * 2)
|
|
max_h = min(cy_undist_crop * 2, (ori_h - cy_undist_crop) * 2)
|
|
|
|
# GCD approach for exact aspect ratio with integer coordinates
|
|
gcd = math.gcd(target_w, target_h)
|
|
ratio_w = target_w // gcd
|
|
ratio_h = target_h // gcd
|
|
|
|
k_from_w = int(max_w / ratio_w)
|
|
k_from_h = int(max_h / ratio_h)
|
|
k_max = min(k_from_w, k_from_h)
|
|
k_min = max(target_w // ratio_w, target_h // ratio_h)
|
|
|
|
if augment and k_max > k_min:
|
|
k = random.randint(k_min, k_max)
|
|
else:
|
|
k = k_max
|
|
|
|
crop_w = k * ratio_w
|
|
crop_h = k * ratio_h
|
|
|
|
crop_x1 = int(cx_undist_crop - crop_w / 2)
|
|
crop_y1 = int(cy_undist_crop - crop_h / 2)
|
|
crop_x2 = crop_x1 + crop_w
|
|
crop_y2 = crop_y1 + crop_h
|
|
|
|
scale_x = target_w / crop_w
|
|
scale_y = target_h / crop_h
|
|
|
|
scaled_fx = scale_x * fx_undist
|
|
fx_to_target_scale = target_fx / scaled_fx
|
|
|
|
return {
|
|
"fx": scaled_fx,
|
|
"fy": scale_y * fy_undist,
|
|
"cx": (cx_undist - crop_x1) * scale_x,
|
|
"cy": (cy_undist - crop_y1) * scale_y,
|
|
"distort_coeffs": [],
|
|
"depth_scale": scaled_fx / target_fx,
|
|
"crop_bounds": (crop_x1, crop_y1, crop_x2, crop_y2),
|
|
"scale": (scale_x, scale_y),
|
|
"K_undistorted": K_new,
|
|
"K_orig": K_orig,
|
|
"D": D,
|
|
"fx_to_target_scale": fx_to_target_scale,
|
|
}
|
|
|
|
|
|
def apply_simul_transform(img, labels_48, simul_calib, calib_params, target_size, augment=False):
|
|
"""Apply fisheye undistortion + crop + resize to image and 48-dim labels.
|
|
|
|
Port from yolov5-3d/utils/dataloaders3d_ground.py:826-1039.
|
|
|
|
Args:
|
|
img (np.ndarray): Input image (H, W, 3) BGR — distorted fisheye image.
|
|
labels_48 (np.ndarray): Label array (N, 48) in 48-dim format.
|
|
simul_calib (dict): Pre-computed virtual camera calibration from compute_simul_calib().
|
|
calib_params (dict): Original calibration dict.
|
|
target_size (tuple): Target size (width, height).
|
|
augment (bool): If True, use random interpolation for resize.
|
|
|
|
Returns:
|
|
img_transformed (np.ndarray): Transformed image.
|
|
labels_transformed (np.ndarray): Transformed labels (M, 48), M <= N.
|
|
"""
|
|
import random
|
|
|
|
h_orig, w_orig = img.shape[:2]
|
|
target_w, target_h = target_size
|
|
|
|
K_orig = simul_calib["K_orig"]
|
|
D = simul_calib["D"]
|
|
K_new = simul_calib["K_undistorted"]
|
|
|
|
# Step 1: Undistort full image
|
|
img_undistorted = cv2.fisheye.undistortImage(img, K_orig, D, Knew=K_new)
|
|
|
|
# Step 2: Crop
|
|
crop_x1, crop_y1, crop_x2, crop_y2 = simul_calib["crop_bounds"]
|
|
img_cropped = img_undistorted[crop_y1:crop_y2, crop_x1:crop_x2]
|
|
|
|
# Step 3: Resize
|
|
if augment:
|
|
interp = random.choice([cv2.INTER_NEAREST, cv2.INTER_LINEAR, cv2.INTER_AREA, cv2.INTER_CUBIC, cv2.INTER_LANCZOS4])
|
|
else:
|
|
interp = cv2.INTER_LINEAR
|
|
img_transformed = cv2.resize(img_cropped, target_size, interpolation=interp)
|
|
|
|
# Step 4: Transform labels
|
|
labels_transformed = labels_48.copy()
|
|
if len(labels_transformed) == 0:
|
|
return img_transformed, labels_transformed
|
|
|
|
scale_x, scale_y = simul_calib["scale"]
|
|
fx_to_target_scale = simul_calib["fx_to_target_scale"]
|
|
|
|
# Collect all 2D points to undistort in batch
|
|
all_points = []
|
|
point_map = [] # (label_idx, field_type, col_indices)
|
|
|
|
for i in range(len(labels_transformed)):
|
|
# Bbox corners from xywhn
|
|
xc = labels_transformed[i, 1] * w_orig
|
|
yc = labels_transformed[i, 2] * h_orig
|
|
bw = labels_transformed[i, 3] * w_orig
|
|
bh = labels_transformed[i, 4] * h_orig
|
|
x1, y1 = xc - bw / 2, yc - bh / 2
|
|
x2, y2 = xc + bw / 2, yc + bh / 2
|
|
all_points.extend([[x1, y1], [x2, y2]])
|
|
point_map.extend([(i, "bbox_tl"), (i, "bbox_br")])
|
|
|
|
# Whole box UV (dims 12-13 in 48-dim = xc, yc normalized)
|
|
if not np.isnan(labels_transformed[i, 12]):
|
|
ux = labels_transformed[i, 12] * w_orig
|
|
uy = labels_transformed[i, 13] * h_orig
|
|
all_points.append([ux, uy])
|
|
point_map.append((i, "whole_uv"))
|
|
|
|
# Face UVs: front(19,20), rear(27,28), left(35,36), right(43,44) in 48-dim
|
|
for face_name, uv_cols in [("front", (19, 20)), ("rear", (27, 28)), ("left", (35, 36)), ("right", (43, 44))]:
|
|
if not np.isnan(labels_transformed[i, uv_cols[0]]) and labels_transformed[i, uv_cols[0]] != -1:
|
|
fu = labels_transformed[i, uv_cols[0]] * w_orig
|
|
fv = labels_transformed[i, uv_cols[1]] * h_orig
|
|
all_points.append([fu, fv])
|
|
point_map.append((i, f"{face_name}_uv"))
|
|
|
|
if not all_points:
|
|
return img_transformed, labels_transformed
|
|
|
|
# Batch undistort all points
|
|
pts_dist = np.array(all_points, dtype=np.float32).reshape(-1, 1, 2)
|
|
pts_undist = cv2.fisheye.undistortPoints(pts_dist, K_orig, D, P=K_new).reshape(-1, 2)
|
|
|
|
# Apply crop + resize to undistorted points
|
|
pts_transformed = np.zeros_like(pts_undist)
|
|
pts_transformed[:, 0] = (pts_undist[:, 0] - crop_x1) * scale_x
|
|
pts_transformed[:, 1] = (pts_undist[:, 1] - crop_y1) * scale_y
|
|
|
|
# Write back transformed coordinates
|
|
outside = np.zeros(len(labels_transformed), dtype=bool)
|
|
still_inside = np.ones(len(labels_transformed), dtype=bool)
|
|
for idx, (label_i, field) in enumerate(point_map):
|
|
px, py = pts_transformed[idx]
|
|
|
|
if field == "bbox_tl":
|
|
labels_transformed[label_i, 1] = px # temp store x1
|
|
elif field == "bbox_br":
|
|
x1_t = labels_transformed[label_i, 1]
|
|
# Get the tl point from previous entry
|
|
tl_idx = idx - 1
|
|
y1_t = pts_transformed[tl_idx, 1]
|
|
|
|
# Check if fully inside before clipping
|
|
if x1_t < 0 or y1_t < 0 or px > target_w or py > target_h:
|
|
still_inside[label_i] = False
|
|
|
|
# Clip to image bounds
|
|
x1_c = np.clip(x1_t, 0, target_w)
|
|
x2_c = np.clip(px, 0, target_w)
|
|
y1_c = np.clip(y1_t, 0, target_h)
|
|
y2_c = np.clip(py, 0, target_h)
|
|
|
|
bw_new = x2_c - x1_c
|
|
bh_new = y2_c - y1_c
|
|
if bw_new <= 0 or bh_new <= 0:
|
|
outside[label_i] = True
|
|
continue
|
|
# Convert back to xywhn
|
|
labels_transformed[label_i, 1] = (x1_c + x2_c) / 2 / target_w
|
|
labels_transformed[label_i, 2] = (y1_c + y2_c) / 2 / target_h
|
|
labels_transformed[label_i, 3] = bw_new / target_w
|
|
labels_transformed[label_i, 4] = bh_new / target_h
|
|
elif field == "whole_uv":
|
|
labels_transformed[label_i, 12] = px / target_w
|
|
labels_transformed[label_i, 13] = py / target_h
|
|
elif field.endswith("_uv"):
|
|
face = field.split("_")[0]
|
|
uv_map = {"front": (19, 20), "rear": (27, 28), "left": (35, 36), "right": (43, 44)}
|
|
cols = uv_map[face]
|
|
labels_transformed[label_i, cols[0]] = px / target_w
|
|
labels_transformed[label_i, cols[1]] = py / target_h
|
|
|
|
# Scale z3d by fx_to_target_scale
|
|
labels_transformed = _scale_z3d(labels_transformed, fx_to_target_scale)
|
|
|
|
# Handle partial visibility (cut-in/cut-out)
|
|
_handle_cut_labels(labels_transformed, outside, still_inside)
|
|
|
|
# Remove outside boxes
|
|
labels_transformed = labels_transformed[~outside]
|
|
|
|
return img_transformed, labels_transformed
|
|
|
|
|
|
def _scale_z3d(labels, scale):
|
|
"""Scale z3d coordinates for depth normalization.
|
|
|
|
Port from yolov5-3d/utils/dataloaders3d_ground.py:1041-1080.
|
|
"""
|
|
if len(labels) == 0 or scale == 1.0:
|
|
return labels
|
|
|
|
labels_scaled = labels.copy()
|
|
# Whole z3d (dim 7 in 48-dim)
|
|
labels_scaled[:, 7] *= scale
|
|
# Face z3d: front(17), rear(25), left(33), right(41)
|
|
# Note: only scale if not NaN and not -1 (invalid), to preserve missing/invalid indicators
|
|
for col in [17, 25, 33, 41]:
|
|
mask = ~np.isnan(labels_scaled[:, col]) & (labels_scaled[:, col] != -1.0)
|
|
labels_scaled[mask, col] *= scale
|
|
return labels_scaled
|
|
|
|
|
|
def _handle_cut_labels(labels, outside_mask, still_inside_mask):
|
|
"""Handle partial visibility for objects partially outside the image.
|
|
|
|
For objects with bbox partially outside, mark as cut-in or cut-out based on rotation angle.
|
|
Cut-in (approaching, rot_y in [-pi, 0]): keep front face only.
|
|
Cut-out (leaving, rot_y > 0): keep rear face only.
|
|
Port from yolov5-3d/utils/dataloaders3d_ground.py:1000-1037.
|
|
|
|
Args:
|
|
labels (np.ndarray): Label array (N, 48) in 48-dim format.
|
|
outside_mask (np.ndarray): Boolean mask (N,) — True for fully outside boxes.
|
|
still_inside_mask (np.ndarray): Boolean mask (N,) — True for fully inside boxes.
|
|
"""
|
|
if len(labels) == 0:
|
|
return
|
|
|
|
partial_mask = ~(still_inside_mask | outside_mask)
|
|
if not np.any(partial_mask):
|
|
return
|
|
|
|
rot_y = labels[partial_mask, 11] # dim11: rot_y in 48-dim
|
|
is_cut_in = (rot_y >= -np.pi) & (rot_y <= 0)
|
|
partial_indices = np.where(partial_mask)[0]
|
|
|
|
def _invalidate_face(face_indices, face_offset):
|
|
labels[np.ix_(face_indices, np.arange(face_offset, face_offset + 6))] = -1
|
|
labels[face_indices, face_offset + 6] = 0
|
|
labels[face_indices, face_offset + 7] = 0
|
|
|
|
# Cut-in: keep front face, invalidate others
|
|
cut_in_idx = partial_indices[is_cut_in]
|
|
if len(cut_in_idx):
|
|
for face_offset in (23, 31, 39):
|
|
_invalidate_face(cut_in_idx, face_offset)
|
|
labels[cut_in_idx, 21] = 1 # front face score = 1
|
|
labels[cut_in_idx, 22] = 1
|
|
|
|
# Cut-out: keep rear face, invalidate others
|
|
cut_out_idx = partial_indices[~is_cut_in]
|
|
if len(cut_out_idx):
|
|
for face_offset in (15, 31, 39):
|
|
_invalidate_face(cut_out_idx, face_offset)
|
|
labels[cut_out_idx, 29] = 1 # rear face score = 1
|
|
labels[cut_out_idx, 30] = 1
|