""" ROI (Region of Interest) processor for ground truth labels. This module handles ROI computation and ground truth filtering/clipping to match the training-time ROI processing logic. """ import numpy as np import json from pathlib import Path class ROIProcessor: """Process ground truth labels with ROI filtering and clipping.""" def __init__( self, calib_root=None, roi_config=None, ori_img_size=(1920, 1080), roi_bottom_offset=0, roi_right_offset=0, roi_use_true_vp_x=False, ): """ Initialize ROI processor. Args: calib_root: str or Path, root directory containing calibration files roi_config: dict or list, ROI configuration - If dict: {'mode': 'size', 'width': 1920, 'height': 960} or {'mode': 'bounds', 'x1': 0, 'y1': 120, 'x2': 1920, 'y2': 1080} - If list of 2 values [width, height]: ROI size mode - If list of 4 values [x1, y1, x2, y2]: ROI bounds mode ori_img_size: tuple, original image size (width, height) roi_bottom_offset: int, pixels to trim from the bottom edge of the ROI (shifts y2 upward) roi_right_offset: int, pixels to trim from the right edge of the ROI (shifts x2 leftward) roi_use_true_vp_x: bool, use geometric vanishing point X as crop center for ROI1-style crop """ self.calib_root = Path(calib_root) if calib_root else None self.roi_config = self._parse_roi_config(roi_config) self.ori_img_size = ori_img_size self.roi_bottom_offset = roi_bottom_offset self.roi_right_offset = roi_right_offset self.roi_use_true_vp_x = roi_use_true_vp_x self.calib_cache = {} # Cache calibration parameters def _parse_roi_config(self, roi_config): """Parse ROI configuration into standardized format.""" if roi_config is None: return None if isinstance(roi_config, dict): return roi_config if isinstance(roi_config, (list, tuple)): if len(roi_config) == 2: return {'mode': 'size', 'width': roi_config[0], 'height': roi_config[1]} elif len(roi_config) == 4: return {'mode': 'bounds', 'x1': roi_config[0], 'y1': roi_config[1], 'x2': roi_config[2], 'y2': roi_config[3]} raise ValueError(f"Invalid ROI config: {roi_config}") def load_calibration(self, case_name, frame_name=None, level1_name=None): """ Load calibration parameters for a case. Args: case_name: str, case identifier frame_name: str, optional frame name (if calibration is per-frame) level1_name: str, optional level1 directory name for 2-level path structure Returns: dict with calibration parameters: focal_u, focal_v, cu, cv, yaw, pitch, etc. """ if self.calib_root is None: return None # Try case-level calibration first cache_key = f"{level1_name}/{case_name}" if level1_name else f"{case_name}" if cache_key in self.calib_cache: return self.calib_cache[cache_key] # Look for calibration file. # Supported layouts: # - calib_root/level1/case/calib/L2_calib/camera4.json # - calib_root/level1/case/calib/camera4.json # - calib_root/level1/case/calibration.json # - calib_root/case/calib/L2_calib/camera4.json # - calib_root/case/calib/camera4.json # - calib_root/case/calibration.json case_root = self.calib_root / level1_name / case_name if level1_name else self.calib_root / case_name calib_candidates = [ case_root / "calib/L2_calib/camera4.json", case_root / "calib/camera4.json", case_root / "calibration.json", ] case_calib_path = next((path for path in calib_candidates if path.exists()), None) if case_calib_path is None: print(f"Warning: Calibration file not found for case {case_name}") return None try: with open(case_calib_path, 'r') as f: calib_data = json.load(f) # Extract relevant parameters calib_params = { 'focal_u': calib_data.get('focal_u', calib_data.get('fx')), 'focal_v': calib_data.get('focal_v', calib_data.get('fy')), 'cu': calib_data.get('cu', calib_data.get('cx')), 'cv': calib_data.get('cv', calib_data.get('cy')), 'yaw': calib_data.get('yaw', 0.0), 'pitch': calib_data.get('pitch', 0.0), 'distort_coeffs': calib_data.get('distort_coeffs', []) } self.calib_cache[cache_key] = calib_params return calib_params except Exception as e: print(f"Error loading calibration for {case_name}: {e}") return None def compute_roi(self, calib_params): """ Compute ROI bounds based on calibration and configuration. Matches the logic in LoadImages3D / LoadImagesAndLabels3D. Args: calib_params: dict, calibration parameters Returns: tuple: (roi_x1, roi_y1, roi_x2, roi_y2) or None if ROI disabled """ if self.roi_config is None: return None oriW, oriH = self.ori_img_size # Compute vanishing point (crop center) fx = calib_params['focal_u'] fy = calib_params['focal_v'] cx = calib_params['cu'] cy = calib_params['cv'] c_pitch = calib_params['pitch'] c_yaw = calib_params.get('yaw', 0.0) # Vanishing point coordinates vanish_x = cx + fx * np.tan(c_yaw * np.pi / 180) vanish_y = cy - fy * np.tan(c_pitch * np.pi / 180) # ROI0 uses image center X; ROI1 uses the true geometric vanishing point X. crop_center_x = vanish_x if self.roi_use_true_vp_x else oriW // 2 crop_center_y = vanish_y if self.roi_config['mode'] == 'size': # ROI defined by [width, height] roi_width = self.roi_config['width'] roi_height = self.roi_config['height'] roi_x1 = int(crop_center_x - roi_width / 2.0) roi_y1 = int(crop_center_y - roi_height / 2.0) roi_x2 = roi_x1 + roi_width - self.roi_right_offset roi_y2 = roi_y1 + roi_height - self.roi_bottom_offset elif self.roi_config['mode'] == 'bounds': # ROI defined by [x1, y1, x2, y2] roi_x1 = self.roi_config['x1'] roi_y1 = self.roi_config['y1'] roi_x2 = self.roi_config['x2'] roi_y2 = self.roi_config['y2'] else: return None # Clip to image bounds roi_x1 = max(0, roi_x1) roi_y1 = max(0, roi_y1) roi_x2 = min(oriW, roi_x2) roi_y2 = min(oriH, roi_y2) return (roi_x1, roi_y1, roi_x2, roi_y2) def xywhn2xyxy(self, boxes, img_w, img_h): """ Convert normalized [x_center, y_center, width, height] to [x1, y1, x2, y2]. Args: boxes: np.array of shape (N, 4), normalized boxes img_w: int, image width img_h: int, image height Returns: np.array of shape (N, 4), absolute pixel coordinates """ x_center = boxes[:, 0] * img_w y_center = boxes[:, 1] * img_h width = boxes[:, 2] * img_w height = boxes[:, 3] * img_h x1 = x_center - width / 2 y1 = y_center - height / 2 x2 = x_center + width / 2 y2 = y_center + height / 2 return np.stack([x1, y1, x2, y2], axis=1) def xyxy2xywhn(self, boxes, img_w, img_h): """ Convert [x1, y1, x2, y2] to normalized [x_center, y_center, width, height]. Args: boxes: np.array of shape (N, 4), absolute pixel coordinates img_w: int, image width img_h: int, image height Returns: np.array of shape (N, 4), normalized boxes """ x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3] x_center = (x1 + x2) / 2 / img_w y_center = (y1 + y2) / 2 / img_h width = (x2 - x1) / img_w height = (y2 - y1) / img_h return np.stack([x_center, y_center, width, height], axis=1) def process_annotations_with_roi(self, annotations, roi_bounds): """ Process annotations with ROI filtering and clipping. Matches the logic in post_process_labels_to_roi from dataloaders3d.py. Args: annotations: list of annotation dicts from GroundTruthParser roi_bounds: tuple (roi_x1, roi_y1, roi_x2, roi_y2) Returns: list of processed annotations (some may be filtered out) """ if roi_bounds is None or len(annotations) == 0: return annotations roi_x1, roi_y1, roi_x2, roi_y2 = roi_bounds roi_width = roi_x2 - roi_x1 roi_height = roi_y2 - roi_y1 oriW, oriH = self.ori_img_size processed_annotations = [] for ann in annotations: # Get original bbox in pixel coordinates [x1, y1, x2, y2] bbox_orig = ann['bbox_2d'] x1, y1, x2, y2 = bbox_orig # Shift to ROI-relative coordinates new_x1 = x1 - roi_x1 new_y1 = y1 - roi_y1 new_x2 = x2 - roi_x1 new_y2 = y2 - roi_y1 # Check if box is completely outside ROI if ((new_x1 < 0 and new_x2 < 0) or (new_x1 >= roi_width and new_x2 >= roi_width) or (new_y1 < 0 and new_y2 < 0) or (new_y1 >= roi_height and new_y2 >= roi_height)): # Box is completely outside, skip it continue # Check if box is completely inside (before clipping) still_inside = (new_x1 >= 0 and new_y1 >= 0 and new_x2 < roi_width and new_y2 < roi_height) # Clip to ROI bounds new_x1 = np.clip(new_x1, 0, roi_width - 1) new_y1 = np.clip(new_y1, 0, roi_height - 1) new_x2 = np.clip(new_x2, 0, roi_width - 1) new_y2 = np.clip(new_y2, 0, roi_height - 1) # Check if box still has valid size after clipping if new_x2 <= new_x1 or new_y2 <= new_y1: continue # Convert back to original image coordinates (to match detection results) # Detection results are saved in original image coordinates after ROI processing final_x1 = new_x1 + roi_x1 final_y1 = new_y1 + roi_y1 final_x2 = new_x2 + roi_x1 final_y2 = new_y2 + roi_y1 # Update bbox to original image coordinates (filtered and clipped by ROI) new_ann = ann.copy() new_ann['bbox_2d'] = [final_x1, final_y1, final_x2, final_y2] new_ann['roi_filtered'] = True # Indicates GT has been filtered by ROI new_ann['roi_bounds'] = roi_bounds new_ann['was_clipped'] = not still_inside # If has 3D info and box was clipped, mark it # (may need special handling for 3D evaluation) if new_ann['has_3d'] and not still_inside: # For partially visible objects, the 3D center may be less reliable # This matches the cut-in/cut-out logic in training if new_ann['3d_info']: new_ann['3d_info']['partially_visible'] = True processed_annotations.append(new_ann) return processed_annotations def process_case_frame(self, case_name, frame_name, annotations, level1_name=None): """ Process annotations for a specific case and frame. Args: case_name: str, case identifier frame_name: str, frame identifier annotations: list, annotations from GroundTruthParser level1_name: str, optional level1 directory name for 2-level path structure Returns: tuple: (processed_annotations, roi_bounds) or (annotations, None) if no ROI """ if self.roi_config is None: return annotations, None # Load calibration calib_params = self.load_calibration(case_name, frame_name, level1_name) if calib_params is None: print(f"Warning: Cannot compute ROI without calibration for {case_name}/{frame_name}") return annotations, None # Compute ROI bounds roi_bounds = self.compute_roi(calib_params) if roi_bounds is None: return annotations, None # Process annotations with ROI processed = self.process_annotations_with_roi(annotations, roi_bounds) return processed, roi_bounds