Files
yolov26_3d/tools/copy_detection2d_g1m3_files.py
2026-06-24 09:35:46 +08:00

377 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import logging
import re
import shutil
from collections import Counter
from pathlib import Path
from typing import Iterable
from PIL import Image
DEFAULT_SRC_ROOT = Path("/mnt/nfs/mono3d/ydong_data/Detection/2Ddetection_20260402")
DEFAULT_DST_IMAGES_DIR = Path(
"/mnt/nfs/mono3d/xdzhu_data/Mono3d/Mono3d_4face_2m_g1m3/driving_png_20260202/Detection2D/D4Q_2D/images"
)
DEFAULT_DST_LABELS_DIR = Path(
"/mnt/nfs/mono3d/xdzhu_data/Mono3d/Mono3d_4face_2m_g1m3/driving_png_20260320/Detection2D/D4Q/labels"
)
IMAGE_SUFFIXES = {".jpg", ".jpeg"}
LABEL_SUFFIXES = {".txt"}
DEFAULT_NAME_TOKENS = ("D01", "D4Q", "D4Q2", "D01P")
DEFAULT_SOURCE_SIZE = (3840, 2160)
DEFAULT_TARGET_SIZE = (1920, 1080)
try:
RESAMPLE_LANCZOS = Image.Resampling.LANCZOS
except AttributeError: # pragma: no cover - Pillow < 9.1 compatibility
RESAMPLE_LANCZOS = Image.LANCZOS
def parse_size(value: str) -> tuple[int, int]:
match = re.fullmatch(r"\s*(\d+)\s*[xX]\s*(\d+)\s*", value)
if match is None:
raise argparse.ArgumentTypeError(f"Invalid size '{value}'. Expected WIDTHxHEIGHT, e.g. 3840x2160.")
width, height = (int(group) for group in match.groups())
if width <= 0 or height <= 0:
raise argparse.ArgumentTypeError(f"Image size must be positive, got {value}.")
return width, height
def parse_name_tokens(raw_tokens: Iterable[str] | None) -> tuple[str, ...]:
if raw_tokens is None:
return DEFAULT_NAME_TOKENS
tokens: list[str] = []
for raw_token in raw_tokens:
tokens.extend(piece.strip() for piece in raw_token.split(","))
normalized = tuple(dict.fromkeys(token for token in tokens if token))
if not normalized:
raise ValueError("At least one non-empty --name-token value is required.")
return normalized
def build_name_pattern(name_tokens: tuple[str, ...]) -> re.Pattern[str]:
alternatives = "|".join(re.escape(token.lower()) for token in sorted(name_tokens, key=len, reverse=True))
# Match dataset tokens as standalone parts of the file name so strings like "...17b78d01..."
# are not treated as a valid D01 sample.
return re.compile(rf"(^|[^0-9a-z])(?:{alternatives})(?=[^0-9a-z]|$)", re.IGNORECASE)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Copy Detection2D images whose names match the requested tokens, resize the image, "
"and copy the paired labels."
)
)
parser.add_argument("--src-root", type=Path, default=DEFAULT_SRC_ROOT, help="Source Detection2D dataset root.")
parser.add_argument(
"--dst-images-dir",
type=Path,
default=DEFAULT_DST_IMAGES_DIR,
help="Destination directory for resized image files.",
)
parser.add_argument(
"--dst-labels-dir",
type=Path,
default=DEFAULT_DST_LABELS_DIR,
help="Destination directory for paired label files.",
)
parser.add_argument(
"--name-token",
action="append",
dest="name_tokens",
default=None,
help=(
"Case-insensitive dataset token to match in the file name. Repeat the flag or provide "
"a comma-separated list. Defaults to D01, D4Q, D4Q2, D01P."
),
)
parser.add_argument(
"--source-size",
type=parse_size,
default=DEFAULT_SOURCE_SIZE,
help="Only process images with this source size (WIDTHxHEIGHT).",
)
parser.add_argument(
"--target-size",
type=parse_size,
default=DEFAULT_TARGET_SIZE,
help="Resize matching images to this size (WIDTHxHEIGHT).",
)
parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite destination files if they already exist.",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Only inspect the first N matching image names.",
)
parser.add_argument(
"--log-every",
type=int,
default=1000,
help="Log progress every N inspected image files.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print planned work without writing files.",
)
return parser.parse_args()
def iter_matching_images(
src_images_dir: Path,
suffixes: set[str],
name_pattern: re.Pattern[str],
limit: int | None = None,
) -> list[Path]:
matches: list[Path] = []
for path in src_images_dir.rglob("*"):
if not path.is_file() or path.suffix.lower() not in suffixes:
continue
if not name_pattern.search(path.stem.lower()):
continue
matches.append(path)
if limit is not None and len(matches) >= limit:
break
return matches
def read_image_size(image_path: Path) -> tuple[int, int]:
with Image.open(image_path) as image:
return image.size
def resize_one_image(
src_path: Path,
dst_dir: Path,
target_size: tuple[int, int],
overwrite: bool,
dry_run: bool,
) -> tuple[str, Path, str]:
dst_path = dst_dir / src_path.name
if dst_path.exists() and not overwrite:
return "skipped", dst_path, ""
if dry_run:
return "planned", dst_path, ""
dst_path.parent.mkdir(parents=True, exist_ok=True)
try:
with Image.open(src_path) as image:
if image.mode not in {"RGB", "L"}:
image = image.convert("RGB")
resized = image.resize(target_size, RESAMPLE_LANCZOS)
save_kwargs = {"quality": 95} if dst_path.suffix.lower() in {".jpg", ".jpeg"} else {}
resized.save(dst_path, **save_kwargs)
except OSError as exc:
return "failed", dst_path, str(exc)
return "copied", dst_path, ""
def copy_one_label(src_path: Path, dst_dir: Path, overwrite: bool, dry_run: bool) -> tuple[str, Path, str]:
dst_path = dst_dir / src_path.name
if dst_path.exists() and not overwrite:
return "skipped", dst_path, ""
if dry_run:
return "planned", dst_path, ""
dst_path.parent.mkdir(parents=True, exist_ok=True)
try:
shutil.copy2(src_path, dst_path)
except OSError as exc:
return "failed", dst_path, str(exc)
return "copied", dst_path, ""
def maybe_log_plan(image_path: Path, label_path: Path, dst_images_dir: Path, dst_labels_dir: Path, index: int) -> None:
if index >= 5:
return
logging.info("plan image: %s -> %s", image_path, dst_images_dir / image_path.name)
logging.info("plan label: %s -> %s", label_path, dst_labels_dir / label_path.name)
def log_progress(index: int, total: int, counts: Counter) -> None:
logging.info(
(
"progress %d/%d | matched=%d wrong_size=%d missing_label=%d "
"images(copied=%d skipped=%d planned=%d failed=%d) "
"labels(copied=%d skipped=%d planned=%d failed=%d)"
),
index,
total,
counts["pairs_matched"],
counts["images_wrong_size"],
counts["labels_missing"],
counts["images_copied"],
counts["images_skipped"],
counts["images_planned"],
counts["images_failed"],
counts["labels_copied"],
counts["labels_skipped"],
counts["labels_planned"],
counts["labels_failed"],
)
def process_dataset(
image_paths: list[Path],
src_labels_dir: Path,
dst_images_dir: Path,
dst_labels_dir: Path,
source_size: tuple[int, int],
target_size: tuple[int, int],
overwrite: bool,
dry_run: bool,
log_every: int,
) -> Counter:
counts: Counter[str] = Counter()
total = len(image_paths)
for index, image_path in enumerate(image_paths, start=1):
label_path = src_labels_dir / f"{image_path.stem}.txt"
if label_path.suffix.lower() not in LABEL_SUFFIXES:
counts["labels_invalid_suffix"] += 1
elif not label_path.exists():
counts["labels_missing"] += 1
logging.warning("missing label for image: %s", image_path)
else:
try:
image_size = read_image_size(image_path)
except OSError as exc:
counts["images_failed"] += 1
logging.error("failed to read image size: %s | %s", image_path, exc)
else:
if image_size != source_size:
counts["images_wrong_size"] += 1
if counts["images_wrong_size"] <= 10:
logging.info("skip image with unexpected size %s: %s", image_size, image_path)
else:
counts["pairs_matched"] += 1
if dry_run:
maybe_log_plan(image_path, label_path, dst_images_dir, dst_labels_dir, counts["pairs_matched"] - 1)
image_status, dst_image_path, image_message = resize_one_image(
src_path=image_path,
dst_dir=dst_images_dir,
target_size=target_size,
overwrite=overwrite,
dry_run=dry_run,
)
counts[f"images_{image_status}"] += 1
if image_status == "failed":
logging.error("image write failed: %s -> %s | %s", image_path, dst_image_path, image_message)
else:
label_status, dst_label_path, label_message = copy_one_label(
src_path=label_path,
dst_dir=dst_labels_dir,
overwrite=overwrite,
dry_run=dry_run,
)
counts[f"labels_{label_status}"] += 1
if label_status == "failed":
logging.error("label copy failed: %s -> %s | %s", label_path, dst_label_path, label_message)
if index % log_every == 0 or index == total:
log_progress(index, total, counts)
return counts
def main() -> int:
args = parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
src_root = args.src_root.resolve()
src_images_dir = src_root / "images"
src_labels_dir = src_root / "labels"
dst_images_dir = args.dst_images_dir.resolve()
dst_labels_dir = args.dst_labels_dir.resolve()
name_tokens = parse_name_tokens(args.name_tokens)
name_pattern = build_name_pattern(name_tokens)
if not src_root.exists():
raise FileNotFoundError(f"Source root does not exist: {src_root}")
if not src_images_dir.exists():
raise FileNotFoundError(f"Source images directory does not exist: {src_images_dir}")
if not src_labels_dir.exists():
raise FileNotFoundError(f"Source labels directory does not exist: {src_labels_dir}")
if args.log_every <= 0:
raise ValueError("--log-every must be a positive integer.")
if args.limit is not None and args.limit <= 0:
raise ValueError("--limit must be a positive integer when provided.")
image_paths = iter_matching_images(src_images_dir, IMAGE_SUFFIXES, name_pattern, limit=args.limit)
logging.info("source root: %s", src_root)
logging.info("destination images dir: %s", dst_images_dir)
logging.info("destination labels dir: %s", dst_labels_dir)
logging.info("file name tokens: %s", ", ".join(name_tokens))
logging.info("required source size: %sx%s", *args.source_size)
logging.info("target resize: %sx%s", *args.target_size)
logging.info(
"labels are copied without rewriting because the Detection2D labels are normalized; "
"a full-image resize keeps normalized coordinates unchanged."
)
logging.info("found %d candidate image(s) by name", len(image_paths))
counts = process_dataset(
image_paths=image_paths,
src_labels_dir=src_labels_dir,
dst_images_dir=dst_images_dir,
dst_labels_dir=dst_labels_dir,
source_size=args.source_size,
target_size=args.target_size,
overwrite=args.overwrite,
dry_run=args.dry_run,
log_every=args.log_every,
)
if counts["images_failed"] or counts["labels_failed"]:
logging.error(
"copy finished with failures | image_failed=%d label_failed=%d",
counts["images_failed"],
counts["labels_failed"],
)
return 1
logging.info(
(
"copy finished successfully | matched=%d wrong_size=%d missing_label=%d "
"images(copied=%d skipped=%d planned=%d) labels(copied=%d skipped=%d planned=%d)"
),
counts["pairs_matched"],
counts["images_wrong_size"],
counts["labels_missing"],
counts["images_copied"],
counts["images_skipped"],
counts["images_planned"],
counts["labels_copied"],
counts["labels_skipped"],
counts["labels_planned"],
)
return 0
if __name__ == "__main__":
raise SystemExit(main())