#!/usr/bin/env python3 from __future__ import annotations import argparse import logging import re import shutil from collections import Counter from pathlib import Path from typing import Iterable from PIL import Image DEFAULT_SRC_ROOT = Path("/mnt/nfs/mono3d/ydong_data/Detection/2Ddetection_20260402") DEFAULT_DST_IMAGES_DIR = Path( "/mnt/nfs/mono3d/xdzhu_data/Mono3d/Mono3d_4face_2m_g1m3/driving_png_20260202/Detection2D/D4Q_2D/images" ) DEFAULT_DST_LABELS_DIR = Path( "/mnt/nfs/mono3d/xdzhu_data/Mono3d/Mono3d_4face_2m_g1m3/driving_png_20260320/Detection2D/D4Q/labels" ) IMAGE_SUFFIXES = {".jpg", ".jpeg"} LABEL_SUFFIXES = {".txt"} DEFAULT_NAME_TOKENS = ("D01", "D4Q", "D4Q2", "D01P") DEFAULT_SOURCE_SIZE = (3840, 2160) DEFAULT_TARGET_SIZE = (1920, 1080) try: RESAMPLE_LANCZOS = Image.Resampling.LANCZOS except AttributeError: # pragma: no cover - Pillow < 9.1 compatibility RESAMPLE_LANCZOS = Image.LANCZOS def parse_size(value: str) -> tuple[int, int]: match = re.fullmatch(r"\s*(\d+)\s*[xX]\s*(\d+)\s*", value) if match is None: raise argparse.ArgumentTypeError(f"Invalid size '{value}'. Expected WIDTHxHEIGHT, e.g. 3840x2160.") width, height = (int(group) for group in match.groups()) if width <= 0 or height <= 0: raise argparse.ArgumentTypeError(f"Image size must be positive, got {value}.") return width, height def parse_name_tokens(raw_tokens: Iterable[str] | None) -> tuple[str, ...]: if raw_tokens is None: return DEFAULT_NAME_TOKENS tokens: list[str] = [] for raw_token in raw_tokens: tokens.extend(piece.strip() for piece in raw_token.split(",")) normalized = tuple(dict.fromkeys(token for token in tokens if token)) if not normalized: raise ValueError("At least one non-empty --name-token value is required.") return normalized def build_name_pattern(name_tokens: tuple[str, ...]) -> re.Pattern[str]: alternatives = "|".join(re.escape(token.lower()) for token in sorted(name_tokens, key=len, reverse=True)) # Match dataset tokens as standalone parts of the file name so strings like "...17b78d01..." # are not treated as a valid D01 sample. return re.compile(rf"(^|[^0-9a-z])(?:{alternatives})(?=[^0-9a-z]|$)", re.IGNORECASE) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Copy Detection2D images whose names match the requested tokens, resize the image, " "and copy the paired labels." ) ) parser.add_argument("--src-root", type=Path, default=DEFAULT_SRC_ROOT, help="Source Detection2D dataset root.") parser.add_argument( "--dst-images-dir", type=Path, default=DEFAULT_DST_IMAGES_DIR, help="Destination directory for resized image files.", ) parser.add_argument( "--dst-labels-dir", type=Path, default=DEFAULT_DST_LABELS_DIR, help="Destination directory for paired label files.", ) parser.add_argument( "--name-token", action="append", dest="name_tokens", default=None, help=( "Case-insensitive dataset token to match in the file name. Repeat the flag or provide " "a comma-separated list. Defaults to D01, D4Q, D4Q2, D01P." ), ) parser.add_argument( "--source-size", type=parse_size, default=DEFAULT_SOURCE_SIZE, help="Only process images with this source size (WIDTHxHEIGHT).", ) parser.add_argument( "--target-size", type=parse_size, default=DEFAULT_TARGET_SIZE, help="Resize matching images to this size (WIDTHxHEIGHT).", ) parser.add_argument( "--overwrite", action="store_true", help="Overwrite destination files if they already exist.", ) parser.add_argument( "--limit", type=int, default=None, help="Only inspect the first N matching image names.", ) parser.add_argument( "--log-every", type=int, default=1000, help="Log progress every N inspected image files.", ) parser.add_argument( "--dry-run", action="store_true", help="Print planned work without writing files.", ) return parser.parse_args() def iter_matching_images( src_images_dir: Path, suffixes: set[str], name_pattern: re.Pattern[str], limit: int | None = None, ) -> list[Path]: matches: list[Path] = [] for path in src_images_dir.rglob("*"): if not path.is_file() or path.suffix.lower() not in suffixes: continue if not name_pattern.search(path.stem.lower()): continue matches.append(path) if limit is not None and len(matches) >= limit: break return matches def read_image_size(image_path: Path) -> tuple[int, int]: with Image.open(image_path) as image: return image.size def resize_one_image( src_path: Path, dst_dir: Path, target_size: tuple[int, int], overwrite: bool, dry_run: bool, ) -> tuple[str, Path, str]: dst_path = dst_dir / src_path.name if dst_path.exists() and not overwrite: return "skipped", dst_path, "" if dry_run: return "planned", dst_path, "" dst_path.parent.mkdir(parents=True, exist_ok=True) try: with Image.open(src_path) as image: if image.mode not in {"RGB", "L"}: image = image.convert("RGB") resized = image.resize(target_size, RESAMPLE_LANCZOS) save_kwargs = {"quality": 95} if dst_path.suffix.lower() in {".jpg", ".jpeg"} else {} resized.save(dst_path, **save_kwargs) except OSError as exc: return "failed", dst_path, str(exc) return "copied", dst_path, "" def copy_one_label(src_path: Path, dst_dir: Path, overwrite: bool, dry_run: bool) -> tuple[str, Path, str]: dst_path = dst_dir / src_path.name if dst_path.exists() and not overwrite: return "skipped", dst_path, "" if dry_run: return "planned", dst_path, "" dst_path.parent.mkdir(parents=True, exist_ok=True) try: shutil.copy2(src_path, dst_path) except OSError as exc: return "failed", dst_path, str(exc) return "copied", dst_path, "" def maybe_log_plan(image_path: Path, label_path: Path, dst_images_dir: Path, dst_labels_dir: Path, index: int) -> None: if index >= 5: return logging.info("plan image: %s -> %s", image_path, dst_images_dir / image_path.name) logging.info("plan label: %s -> %s", label_path, dst_labels_dir / label_path.name) def log_progress(index: int, total: int, counts: Counter) -> None: logging.info( ( "progress %d/%d | matched=%d wrong_size=%d missing_label=%d " "images(copied=%d skipped=%d planned=%d failed=%d) " "labels(copied=%d skipped=%d planned=%d failed=%d)" ), index, total, counts["pairs_matched"], counts["images_wrong_size"], counts["labels_missing"], counts["images_copied"], counts["images_skipped"], counts["images_planned"], counts["images_failed"], counts["labels_copied"], counts["labels_skipped"], counts["labels_planned"], counts["labels_failed"], ) def process_dataset( image_paths: list[Path], src_labels_dir: Path, dst_images_dir: Path, dst_labels_dir: Path, source_size: tuple[int, int], target_size: tuple[int, int], overwrite: bool, dry_run: bool, log_every: int, ) -> Counter: counts: Counter[str] = Counter() total = len(image_paths) for index, image_path in enumerate(image_paths, start=1): label_path = src_labels_dir / f"{image_path.stem}.txt" if label_path.suffix.lower() not in LABEL_SUFFIXES: counts["labels_invalid_suffix"] += 1 elif not label_path.exists(): counts["labels_missing"] += 1 logging.warning("missing label for image: %s", image_path) else: try: image_size = read_image_size(image_path) except OSError as exc: counts["images_failed"] += 1 logging.error("failed to read image size: %s | %s", image_path, exc) else: if image_size != source_size: counts["images_wrong_size"] += 1 if counts["images_wrong_size"] <= 10: logging.info("skip image with unexpected size %s: %s", image_size, image_path) else: counts["pairs_matched"] += 1 if dry_run: maybe_log_plan(image_path, label_path, dst_images_dir, dst_labels_dir, counts["pairs_matched"] - 1) image_status, dst_image_path, image_message = resize_one_image( src_path=image_path, dst_dir=dst_images_dir, target_size=target_size, overwrite=overwrite, dry_run=dry_run, ) counts[f"images_{image_status}"] += 1 if image_status == "failed": logging.error("image write failed: %s -> %s | %s", image_path, dst_image_path, image_message) else: label_status, dst_label_path, label_message = copy_one_label( src_path=label_path, dst_dir=dst_labels_dir, overwrite=overwrite, dry_run=dry_run, ) counts[f"labels_{label_status}"] += 1 if label_status == "failed": logging.error("label copy failed: %s -> %s | %s", label_path, dst_label_path, label_message) if index % log_every == 0 or index == total: log_progress(index, total, counts) return counts def main() -> int: args = parse_args() logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") src_root = args.src_root.resolve() src_images_dir = src_root / "images" src_labels_dir = src_root / "labels" dst_images_dir = args.dst_images_dir.resolve() dst_labels_dir = args.dst_labels_dir.resolve() name_tokens = parse_name_tokens(args.name_tokens) name_pattern = build_name_pattern(name_tokens) if not src_root.exists(): raise FileNotFoundError(f"Source root does not exist: {src_root}") if not src_images_dir.exists(): raise FileNotFoundError(f"Source images directory does not exist: {src_images_dir}") if not src_labels_dir.exists(): raise FileNotFoundError(f"Source labels directory does not exist: {src_labels_dir}") if args.log_every <= 0: raise ValueError("--log-every must be a positive integer.") if args.limit is not None and args.limit <= 0: raise ValueError("--limit must be a positive integer when provided.") image_paths = iter_matching_images(src_images_dir, IMAGE_SUFFIXES, name_pattern, limit=args.limit) logging.info("source root: %s", src_root) logging.info("destination images dir: %s", dst_images_dir) logging.info("destination labels dir: %s", dst_labels_dir) logging.info("file name tokens: %s", ", ".join(name_tokens)) logging.info("required source size: %sx%s", *args.source_size) logging.info("target resize: %sx%s", *args.target_size) logging.info( "labels are copied without rewriting because the Detection2D labels are normalized; " "a full-image resize keeps normalized coordinates unchanged." ) logging.info("found %d candidate image(s) by name", len(image_paths)) counts = process_dataset( image_paths=image_paths, src_labels_dir=src_labels_dir, dst_images_dir=dst_images_dir, dst_labels_dir=dst_labels_dir, source_size=args.source_size, target_size=args.target_size, overwrite=args.overwrite, dry_run=args.dry_run, log_every=args.log_every, ) if counts["images_failed"] or counts["labels_failed"]: logging.error( "copy finished with failures | image_failed=%d label_failed=%d", counts["images_failed"], counts["labels_failed"], ) return 1 logging.info( ( "copy finished successfully | matched=%d wrong_size=%d missing_label=%d " "images(copied=%d skipped=%d planned=%d) labels(copied=%d skipped=%d planned=%d)" ), counts["pairs_matched"], counts["images_wrong_size"], counts["labels_missing"], counts["images_copied"], counts["images_skipped"], counts["images_planned"], counts["labels_copied"], counts["labels_skipped"], counts["labels_planned"], ) return 0 if __name__ == "__main__": raise SystemExit(main())