344 lines
13 KiB
Python
Executable File
344 lines
13 KiB
Python
Executable File
"""
|
|
ROI (Region of Interest) processor for ground truth labels.
|
|
|
|
This module handles ROI computation and ground truth filtering/clipping
|
|
to match the training-time ROI processing logic.
|
|
"""
|
|
import numpy as np
|
|
import json
|
|
from pathlib import Path
|
|
|
|
|
|
class ROIProcessor:
|
|
"""Process ground truth labels with ROI filtering and clipping."""
|
|
|
|
def __init__(
|
|
self,
|
|
calib_root=None,
|
|
roi_config=None,
|
|
ori_img_size=(1920, 1080),
|
|
roi_bottom_offset=0,
|
|
roi_right_offset=0,
|
|
roi_use_true_vp_x=False,
|
|
):
|
|
"""
|
|
Initialize ROI processor.
|
|
|
|
Args:
|
|
calib_root: str or Path, root directory containing calibration files
|
|
roi_config: dict or list, ROI configuration
|
|
- If dict: {'mode': 'size', 'width': 1920, 'height': 960} or
|
|
{'mode': 'bounds', 'x1': 0, 'y1': 120, 'x2': 1920, 'y2': 1080}
|
|
- If list of 2 values [width, height]: ROI size mode
|
|
- If list of 4 values [x1, y1, x2, y2]: ROI bounds mode
|
|
ori_img_size: tuple, original image size (width, height)
|
|
roi_bottom_offset: int, pixels to trim from the bottom edge of the ROI (shifts y2 upward)
|
|
roi_right_offset: int, pixels to trim from the right edge of the ROI (shifts x2 leftward)
|
|
roi_use_true_vp_x: bool, use geometric vanishing point X as crop center for ROI1-style crop
|
|
"""
|
|
self.calib_root = Path(calib_root) if calib_root else None
|
|
self.roi_config = self._parse_roi_config(roi_config)
|
|
self.ori_img_size = ori_img_size
|
|
self.roi_bottom_offset = roi_bottom_offset
|
|
self.roi_right_offset = roi_right_offset
|
|
self.roi_use_true_vp_x = roi_use_true_vp_x
|
|
self.calib_cache = {} # Cache calibration parameters
|
|
|
|
def _parse_roi_config(self, roi_config):
|
|
"""Parse ROI configuration into standardized format."""
|
|
if roi_config is None:
|
|
return None
|
|
|
|
if isinstance(roi_config, dict):
|
|
return roi_config
|
|
|
|
if isinstance(roi_config, (list, tuple)):
|
|
if len(roi_config) == 2:
|
|
return {'mode': 'size', 'width': roi_config[0], 'height': roi_config[1]}
|
|
elif len(roi_config) == 4:
|
|
return {'mode': 'bounds', 'x1': roi_config[0], 'y1': roi_config[1],
|
|
'x2': roi_config[2], 'y2': roi_config[3]}
|
|
|
|
raise ValueError(f"Invalid ROI config: {roi_config}")
|
|
|
|
def load_calibration(self, case_name, frame_name=None, level1_name=None):
|
|
"""
|
|
Load calibration parameters for a case.
|
|
|
|
Args:
|
|
case_name: str, case identifier
|
|
frame_name: str, optional frame name (if calibration is per-frame)
|
|
level1_name: str, optional level1 directory name for 2-level path structure
|
|
|
|
Returns:
|
|
dict with calibration parameters: focal_u, focal_v, cu, cv, yaw, pitch, etc.
|
|
"""
|
|
if self.calib_root is None:
|
|
return None
|
|
|
|
# Try case-level calibration first
|
|
cache_key = f"{level1_name}/{case_name}" if level1_name else f"{case_name}"
|
|
if cache_key in self.calib_cache:
|
|
return self.calib_cache[cache_key]
|
|
|
|
# Look for calibration file.
|
|
# Supported layouts:
|
|
# - calib_root/level1/case/calib/L2_calib/camera4.json
|
|
# - calib_root/level1/case/calib/camera4.json
|
|
# - calib_root/level1/case/calibration.json
|
|
# - calib_root/case/calib/L2_calib/camera4.json
|
|
# - calib_root/case/calib/camera4.json
|
|
# - calib_root/case/calibration.json
|
|
case_root = self.calib_root / level1_name / case_name if level1_name else self.calib_root / case_name
|
|
calib_candidates = [
|
|
case_root / "calib/L2_calib/camera4.json",
|
|
case_root / "calib/camera4.json",
|
|
case_root / "calibration.json",
|
|
]
|
|
case_calib_path = next((path for path in calib_candidates if path.exists()), None)
|
|
if case_calib_path is None:
|
|
print(f"Warning: Calibration file not found for case {case_name}")
|
|
return None
|
|
|
|
try:
|
|
with open(case_calib_path, 'r') as f:
|
|
calib_data = json.load(f)
|
|
|
|
# Extract relevant parameters
|
|
calib_params = {
|
|
'focal_u': calib_data.get('focal_u', calib_data.get('fx')),
|
|
'focal_v': calib_data.get('focal_v', calib_data.get('fy')),
|
|
'cu': calib_data.get('cu', calib_data.get('cx')),
|
|
'cv': calib_data.get('cv', calib_data.get('cy')),
|
|
'yaw': calib_data.get('yaw', 0.0),
|
|
'pitch': calib_data.get('pitch', 0.0),
|
|
'distort_coeffs': calib_data.get('distort_coeffs', [])
|
|
}
|
|
|
|
self.calib_cache[cache_key] = calib_params
|
|
return calib_params
|
|
|
|
except Exception as e:
|
|
print(f"Error loading calibration for {case_name}: {e}")
|
|
return None
|
|
|
|
def compute_roi(self, calib_params):
|
|
"""
|
|
Compute ROI bounds based on calibration and configuration.
|
|
|
|
Matches the logic in LoadImages3D / LoadImagesAndLabels3D.
|
|
|
|
Args:
|
|
calib_params: dict, calibration parameters
|
|
|
|
Returns:
|
|
tuple: (roi_x1, roi_y1, roi_x2, roi_y2) or None if ROI disabled
|
|
"""
|
|
if self.roi_config is None:
|
|
return None
|
|
|
|
oriW, oriH = self.ori_img_size
|
|
|
|
# Compute vanishing point (crop center)
|
|
fx = calib_params['focal_u']
|
|
fy = calib_params['focal_v']
|
|
cx = calib_params['cu']
|
|
cy = calib_params['cv']
|
|
c_pitch = calib_params['pitch']
|
|
c_yaw = calib_params.get('yaw', 0.0)
|
|
|
|
# Vanishing point coordinates
|
|
vanish_x = cx + fx * np.tan(c_yaw * np.pi / 180)
|
|
vanish_y = cy - fy * np.tan(c_pitch * np.pi / 180)
|
|
|
|
# ROI0 uses image center X; ROI1 uses the true geometric vanishing point X.
|
|
crop_center_x = vanish_x if self.roi_use_true_vp_x else oriW // 2
|
|
crop_center_y = vanish_y
|
|
|
|
if self.roi_config['mode'] == 'size':
|
|
# ROI defined by [width, height]
|
|
roi_width = self.roi_config['width']
|
|
roi_height = self.roi_config['height']
|
|
|
|
roi_x1 = int(crop_center_x - roi_width / 2.0)
|
|
roi_y1 = int(crop_center_y - roi_height / 2.0)
|
|
roi_x2 = roi_x1 + roi_width - self.roi_right_offset
|
|
roi_y2 = roi_y1 + roi_height - self.roi_bottom_offset
|
|
|
|
elif self.roi_config['mode'] == 'bounds':
|
|
# ROI defined by [x1, y1, x2, y2]
|
|
roi_x1 = self.roi_config['x1']
|
|
roi_y1 = self.roi_config['y1']
|
|
roi_x2 = self.roi_config['x2']
|
|
roi_y2 = self.roi_config['y2']
|
|
else:
|
|
return None
|
|
|
|
# Clip to image bounds
|
|
roi_x1 = max(0, roi_x1)
|
|
roi_y1 = max(0, roi_y1)
|
|
roi_x2 = min(oriW, roi_x2)
|
|
roi_y2 = min(oriH, roi_y2)
|
|
|
|
return (roi_x1, roi_y1, roi_x2, roi_y2)
|
|
|
|
def xywhn2xyxy(self, boxes, img_w, img_h):
|
|
"""
|
|
Convert normalized [x_center, y_center, width, height] to [x1, y1, x2, y2].
|
|
|
|
Args:
|
|
boxes: np.array of shape (N, 4), normalized boxes
|
|
img_w: int, image width
|
|
img_h: int, image height
|
|
|
|
Returns:
|
|
np.array of shape (N, 4), absolute pixel coordinates
|
|
"""
|
|
x_center = boxes[:, 0] * img_w
|
|
y_center = boxes[:, 1] * img_h
|
|
width = boxes[:, 2] * img_w
|
|
height = boxes[:, 3] * img_h
|
|
|
|
x1 = x_center - width / 2
|
|
y1 = y_center - height / 2
|
|
x2 = x_center + width / 2
|
|
y2 = y_center + height / 2
|
|
|
|
return np.stack([x1, y1, x2, y2], axis=1)
|
|
|
|
def xyxy2xywhn(self, boxes, img_w, img_h):
|
|
"""
|
|
Convert [x1, y1, x2, y2] to normalized [x_center, y_center, width, height].
|
|
|
|
Args:
|
|
boxes: np.array of shape (N, 4), absolute pixel coordinates
|
|
img_w: int, image width
|
|
img_h: int, image height
|
|
|
|
Returns:
|
|
np.array of shape (N, 4), normalized boxes
|
|
"""
|
|
x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]
|
|
|
|
x_center = (x1 + x2) / 2 / img_w
|
|
y_center = (y1 + y2) / 2 / img_h
|
|
width = (x2 - x1) / img_w
|
|
height = (y2 - y1) / img_h
|
|
|
|
return np.stack([x_center, y_center, width, height], axis=1)
|
|
|
|
def process_annotations_with_roi(self, annotations, roi_bounds):
|
|
"""
|
|
Process annotations with ROI filtering and clipping.
|
|
|
|
Matches the logic in post_process_labels_to_roi from dataloaders3d.py.
|
|
|
|
Args:
|
|
annotations: list of annotation dicts from GroundTruthParser
|
|
roi_bounds: tuple (roi_x1, roi_y1, roi_x2, roi_y2)
|
|
|
|
Returns:
|
|
list of processed annotations (some may be filtered out)
|
|
"""
|
|
if roi_bounds is None or len(annotations) == 0:
|
|
return annotations
|
|
|
|
roi_x1, roi_y1, roi_x2, roi_y2 = roi_bounds
|
|
roi_width = roi_x2 - roi_x1
|
|
roi_height = roi_y2 - roi_y1
|
|
|
|
oriW, oriH = self.ori_img_size
|
|
|
|
processed_annotations = []
|
|
|
|
for ann in annotations:
|
|
# Get original bbox in pixel coordinates [x1, y1, x2, y2]
|
|
bbox_orig = ann['bbox_2d']
|
|
x1, y1, x2, y2 = bbox_orig
|
|
|
|
# Shift to ROI-relative coordinates
|
|
new_x1 = x1 - roi_x1
|
|
new_y1 = y1 - roi_y1
|
|
new_x2 = x2 - roi_x1
|
|
new_y2 = y2 - roi_y1
|
|
|
|
# Check if box is completely outside ROI
|
|
if ((new_x1 < 0 and new_x2 < 0) or
|
|
(new_x1 >= roi_width and new_x2 >= roi_width) or
|
|
(new_y1 < 0 and new_y2 < 0) or
|
|
(new_y1 >= roi_height and new_y2 >= roi_height)):
|
|
# Box is completely outside, skip it
|
|
continue
|
|
|
|
# Check if box is completely inside (before clipping)
|
|
still_inside = (new_x1 >= 0 and new_y1 >= 0 and
|
|
new_x2 < roi_width and new_y2 < roi_height)
|
|
|
|
# Clip to ROI bounds
|
|
new_x1 = np.clip(new_x1, 0, roi_width - 1)
|
|
new_y1 = np.clip(new_y1, 0, roi_height - 1)
|
|
new_x2 = np.clip(new_x2, 0, roi_width - 1)
|
|
new_y2 = np.clip(new_y2, 0, roi_height - 1)
|
|
|
|
# Check if box still has valid size after clipping
|
|
if new_x2 <= new_x1 or new_y2 <= new_y1:
|
|
continue
|
|
|
|
# Convert back to original image coordinates (to match detection results)
|
|
# Detection results are saved in original image coordinates after ROI processing
|
|
final_x1 = new_x1 + roi_x1
|
|
final_y1 = new_y1 + roi_y1
|
|
final_x2 = new_x2 + roi_x1
|
|
final_y2 = new_y2 + roi_y1
|
|
|
|
# Update bbox to original image coordinates (filtered and clipped by ROI)
|
|
new_ann = ann.copy()
|
|
new_ann['bbox_2d'] = [final_x1, final_y1, final_x2, final_y2]
|
|
new_ann['roi_filtered'] = True # Indicates GT has been filtered by ROI
|
|
new_ann['roi_bounds'] = roi_bounds
|
|
new_ann['was_clipped'] = not still_inside
|
|
|
|
# If has 3D info and box was clipped, mark it
|
|
# (may need special handling for 3D evaluation)
|
|
if new_ann['has_3d'] and not still_inside:
|
|
# For partially visible objects, the 3D center may be less reliable
|
|
# This matches the cut-in/cut-out logic in training
|
|
if new_ann['3d_info']:
|
|
new_ann['3d_info']['partially_visible'] = True
|
|
|
|
processed_annotations.append(new_ann)
|
|
|
|
return processed_annotations
|
|
|
|
def process_case_frame(self, case_name, frame_name, annotations, level1_name=None):
|
|
"""
|
|
Process annotations for a specific case and frame.
|
|
|
|
Args:
|
|
case_name: str, case identifier
|
|
frame_name: str, frame identifier
|
|
annotations: list, annotations from GroundTruthParser
|
|
level1_name: str, optional level1 directory name for 2-level path structure
|
|
|
|
Returns:
|
|
tuple: (processed_annotations, roi_bounds) or (annotations, None) if no ROI
|
|
"""
|
|
if self.roi_config is None:
|
|
return annotations, None
|
|
|
|
# Load calibration
|
|
calib_params = self.load_calibration(case_name, frame_name, level1_name)
|
|
if calib_params is None:
|
|
print(f"Warning: Cannot compute ROI without calibration for {case_name}/{frame_name}")
|
|
return annotations, None
|
|
|
|
# Compute ROI bounds
|
|
roi_bounds = self.compute_roi(calib_params)
|
|
if roi_bounds is None:
|
|
return annotations, None
|
|
|
|
# Process annotations with ROI
|
|
processed = self.process_annotations_with_roi(annotations, roi_bounds)
|
|
|
|
return processed, roi_bounds
|