#!/usr/bin/env python3 # coding: utf-8 import argparse import hashlib import json import os import shutil import subprocess import tarfile from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime from pathlib import Path from threading import Lock from time import perf_counter from count_visualization_batches import ( DEFAULT_IMAGE_EXTS, DEFAULT_SHOWPATH_ROOT, collect_image_stats, normalize_image_exts, ) DEFAULT_BATCH_GLOB = "batch_*" DEFAULT_ARCHIVE_FORMAT = "tar.gz" DEFAULT_ARCHIVE_BACKEND = "auto" DEFAULT_MANIFEST_NAME = "archives_manifest.json" DEFAULT_FRAMES_PER_PART = 2000 MODALITIES = ("2D", "3D") LOG_LOCK = Lock() @dataclass(frozen=True) class FrameEntry: batch_name: str relpath: str path_2d: Path path_3d: Path def log(message=""): with LOG_LOCK: print(message, flush=True) def default_scan_workers(): return max(1, min(8, os.cpu_count() or 1)) def default_archive_workers(): return len(MODALITIES) def default_part_workers(): return 1 def parse_args(): parser = argparse.ArgumentParser( description="汇总所有 batch 的 2D/3D 可视化结果,并按固定帧数分片打包。" ) parser.add_argument( "target_path", nargs="?", default=DEFAULT_SHOWPATH_ROOT, help="可视化 batch 根目录,或单个 batch 目录。", ) parser.add_argument( "output_root", nargs="?", default=None, help="归档输出目录。默认输出到 的并列目录 _archives。", ) parser.add_argument( "--batch-glob", default=DEFAULT_BATCH_GLOB, help="batch 根目录模式下用于查找 batch 的 glob,默认 batch_*。", ) parser.add_argument( "--archive-format", choices=["tar", "tar.gz"], default=DEFAULT_ARCHIVE_FORMAT, help="归档格式,默认 tar.gz。", ) parser.add_argument( "--frames-per-part", type=int, default=DEFAULT_FRAMES_PER_PART, help="每个 part 包含多少帧,默认 2000。", ) parser.add_argument( "--scan-workers", type=int, default=default_scan_workers(), help="扫描 batch 的并发线程数,默认按机器核数自适应,最多 8。", ) parser.add_argument( "--archive-workers", type=int, default=default_archive_workers(), help="单个 part 内归档并发数,默认 2,同时处理 2D 和 3D。", ) parser.add_argument( "--part-workers", type=int, default=default_part_workers(), help="同时处理多少个 part,默认 1;每个 part 内仍会按 archive-workers 并发处理 2D/3D。", ) parser.add_argument( "--archive-backend", choices=["auto", "python", "system_tar"], default=DEFAULT_ARCHIVE_BACKEND, help="归档实现后端,默认 auto;优先使用系统 tar/gzip,以提升大批量打包速度。", ) parser.add_argument( "--gzip-compresslevel", type=int, default=1, help="tar.gz 压缩级别,范围 0-9,默认 1 以优先速度。", ) parser.add_argument( "--image-exts", default=DEFAULT_IMAGE_EXTS, help="参与校验统计的图片扩展名,逗号分隔,默认 .jpg,.jpeg,.png。", ) parser.add_argument( "--check-pairs", action="store_true", default=True, help="打包前校验 2D/3D 相对路径是否一一对应,默认开启。", ) parser.add_argument( "--no-check-pairs", dest="check_pairs", action="store_false", help="关闭 2D/3D 一一对应校验,仅分别收集文件。", ) parser.add_argument( "--sample-mismatch-limit", type=int, default=10, help="2D/3D 配对校验失败时,manifest 中最多保留多少条样例。", ) parser.add_argument( "--skip-existing", action="store_true", help="若目标 part 归档已存在且大小大于 0,则跳过该 part。", ) parser.add_argument( "--checksum", action="store_true", help="归档完成后计算 sha256。", ) parser.add_argument( "--verify-archive", action="store_true", default=True, help="归档完成后快速校验非空且可读取,默认开启。", ) parser.add_argument( "--no-verify-archive", dest="verify_archive", action="store_false", help="关闭归档完成后的快速校验,以进一步提速。", ) parser.add_argument( "--remove-source-after-success", action="store_true", help="所有归档全部成功后删除源 batch 目录。默认关闭,请谨慎使用。", ) parser.add_argument( "--dry-run", action="store_true", help="只做校验和计划输出,不实际创建归档或删除源目录。", ) return parser.parse_args() def now_str(): return datetime.now().isoformat(timespec="seconds") def is_relative_to(path, other): try: path.relative_to(other) return True except ValueError: return False def is_batch_dir(path): return path.is_dir() and ((path / "2D").is_dir() or (path / "3D").is_dir()) def resolve_batch_dirs(target_path, batch_glob): if is_batch_dir(target_path): return target_path.parent, [target_path], "single_batch" if not target_path.is_dir(): raise FileNotFoundError(f"target path does not exist or is not a directory: {target_path}") batch_dirs = sorted(path for path in target_path.glob(batch_glob) if is_batch_dir(path)) if not batch_dirs: raise FileNotFoundError( f"no batch directories found under {target_path} with batch_glob={batch_glob}" ) return target_path, batch_dirs, "batch_root" def default_output_root(batch_root): return batch_root.parent / f"{batch_root.name}_archives" def archive_suffix(archive_format): return "tar.gz" if archive_format == "tar.gz" else "tar" def archive_basename(archive_path): name = archive_path.name if name.endswith(".tar.gz"): return name[: -len(".tar.gz")] if name.endswith(".tar"): return name[: -len(".tar")] return archive_path.stem def build_archive_path(output_root, modality, part_index, archive_format): return output_root / modality / f"part_{part_index:04d}.{archive_suffix(archive_format)}" def write_json(path, data): path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def ensure_safe_output_root(root_dir, output_root): batch_dir_resolved = root_dir.resolve() output_root_resolved = output_root.resolve() if is_relative_to(output_root_resolved, batch_dir_resolved): raise ValueError( "output_root 不允许位于源目录内部," f"source_root={batch_dir_resolved}, output_root={output_root_resolved}" ) def supports_system_tar_backend(archive_format, gzip_compresslevel): if shutil.which("tar") is None: return False if archive_format != "tar.gz": return True if gzip_compresslevel == 0: return False return shutil.which("gzip") is not None def resolve_archive_backend(requested_backend, archive_format, gzip_compresslevel): system_tar_supported = supports_system_tar_backend( archive_format, gzip_compresslevel, ) if requested_backend == "auto": return "system_tar" if system_tar_supported else "python" if requested_backend == "system_tar": if shutil.which("tar") is None: raise RuntimeError("archive-backend=system_tar 需要系统 tar 命令。") if archive_format == "tar.gz" and shutil.which("gzip") is None: raise RuntimeError("archive-backend=system_tar 在 tar.gz 模式下需要 gzip 命令。") if archive_format == "tar.gz" and gzip_compresslevel == 0: raise ValueError( "archive-backend=system_tar 当前不支持 gzip-compresslevel=0," "请改用 python 后端或将压缩级别设为 1-9。" ) return "system_tar" return "python" def open_tarfile(archive_path, archive_format, gzip_compresslevel): mode = "w:gz" if archive_format == "tar.gz" else "w" if archive_format == "tar.gz": return tarfile.open(archive_path, mode=mode, compresslevel=gzip_compresslevel) return tarfile.open(archive_path, mode=mode) def build_archive_member_dir(archive_path): return f"{archive_basename(archive_path)}/images" def build_archive_entry_name(archive_path, source_path): return f"{build_archive_member_dir(archive_path)}/{source_path.name}" def ensure_unique_archive_names(source_paths, archive_path): seen_names = {} for source_path in source_paths: archive_name = build_archive_entry_name(archive_path, source_path) if archive_name in seen_names: raise ValueError( "archive entry name duplicated: " f"{archive_name}, first={seen_names[archive_name]}, duplicate={source_path}" ) seen_names[archive_name] = source_path def build_relative_source_paths(source_paths, source_root): source_root = source_root.resolve() relative_paths = [] for source_path in source_paths: try: relative_path = source_path.relative_to(source_root) except ValueError as exc: raise ValueError( f"source path is outside source_root: source_root={source_root}, path={source_path}" ) from exc relative_paths.append(relative_path.as_posix()) return relative_paths def decode_stderr(stderr_bytes): if not stderr_bytes: return "" return stderr_bytes.decode("utf-8", errors="replace").strip() def escape_tar_transform_replacement(value): return value.replace("\\", "\\\\").replace("&", r"\&").replace("|", r"\|") def create_archive_with_python_tar( source_paths, archive_path, archive_format, gzip_compresslevel, ): archive_path.parent.mkdir(parents=True, exist_ok=True) partial_archive = archive_path.with_name(f"{archive_path.name}.partial") if partial_archive.exists(): partial_archive.unlink() ensure_unique_archive_names(source_paths, archive_path) try: with open_tarfile(partial_archive, archive_format, gzip_compresslevel) as tar_obj: for source_path in source_paths: tar_obj.add( source_path, arcname=build_archive_entry_name(archive_path, source_path), recursive=False, ) partial_archive.replace(archive_path) finally: if partial_archive.exists(): partial_archive.unlink() def create_archive_with_system_tar( source_paths, archive_path, source_root, archive_format, gzip_compresslevel, ): archive_path.parent.mkdir(parents=True, exist_ok=True) partial_archive = archive_path.with_name(f"{archive_path.name}.partial") if partial_archive.exists(): partial_archive.unlink() ensure_unique_archive_names(source_paths, archive_path) relative_paths = build_relative_source_paths(source_paths, source_root) relative_paths_input = b"".join(os.fsencode(path) + b"\0" for path in relative_paths) archive_member_dir = escape_tar_transform_replacement(build_archive_member_dir(archive_path)) transform_rule = f"s|.*/||;s|^|{archive_member_dir}/|" try: if archive_format == "tar": tar_cmd = [ "tar", "-cf", str(partial_archive), "--directory", str(source_root), "--transform", transform_rule, "--no-recursion", "--null", "--files-from", "-", ] completed = subprocess.run( tar_cmd, input=relative_paths_input, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, check=False, ) if completed.returncode != 0: raise RuntimeError( "system tar failed: " f"{decode_stderr(completed.stderr) or 'unknown error'}" ) else: tar_cmd = [ "tar", "-cf", "-", "--directory", str(source_root), "--transform", transform_rule, "--no-recursion", "--null", "--files-from", "-", ] gzip_cmd = ["gzip", f"-{gzip_compresslevel}", "-c"] with open(partial_archive, "wb") as archive_file: tar_proc = subprocess.Popen( tar_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) try: gzip_proc = subprocess.Popen( gzip_cmd, stdin=tar_proc.stdout, stdout=archive_file, stderr=subprocess.PIPE, ) except Exception: tar_proc.kill() tar_proc.wait() raise tar_proc.stdout.close() _, tar_stderr = tar_proc.communicate(input=relative_paths_input) _, gzip_stderr = gzip_proc.communicate() if tar_proc.returncode != 0: raise RuntimeError( "system tar failed: " f"{decode_stderr(tar_stderr) or 'unknown error'}" ) if gzip_proc.returncode != 0: raise RuntimeError( "gzip failed: " f"{decode_stderr(gzip_stderr) or 'unknown error'}" ) partial_archive.replace(archive_path) finally: if partial_archive.exists(): partial_archive.unlink() def create_archive( source_paths, archive_path, source_root, archive_backend, archive_format, gzip_compresslevel, ): if archive_backend == "system_tar": create_archive_with_system_tar( source_paths, archive_path, source_root, archive_format, gzip_compresslevel, ) return create_archive_with_python_tar( source_paths, archive_path, archive_format, gzip_compresslevel, ) def verify_archive(archive_path): if not archive_path.exists(): raise FileNotFoundError(f"archive not found: {archive_path}") if archive_path.stat().st_size <= 0: raise ValueError(f"archive size is 0: {archive_path}") with tarfile.open(archive_path, mode="r:*") as tar_obj: first_member = tar_obj.next() if first_member is None: raise ValueError(f"archive is empty: {archive_path}") def compute_sha256(file_path): digest = hashlib.sha256() with open(file_path, "rb") as f: while True: chunk = f.read(1024 * 1024) if not chunk: break digest.update(chunk) return digest.hexdigest() def part_manifest_path(output_root, part_index): return output_root / f"part_{part_index:04d}.manifest.json" def build_part_manifest( args, part_index, archive_paths, entries, status, error=None, archive_sha256=None, ): archive_details = {} for modality, archive_path in archive_paths.items(): archive_exists = archive_path.exists() archive_details[modality] = { "archive_path": str(archive_path.resolve()), "archive_exists": archive_exists, "archive_size_bytes": archive_path.stat().st_size if archive_exists else 0, "archive_sha256": archive_sha256.get(modality) if archive_sha256 else None, } batch_distribution = Counter(entry.batch_name for entry in entries) return { "created_at": now_str(), "part_name": f"part_{part_index:04d}", "part_index": part_index, "archive_format": args.archive_format, "archive_backend": args.archive_backend_resolved, "frames_per_part": args.frames_per_part, "frame_count": len(entries), "archive_details": archive_details, "status": status, "dry_run": args.dry_run, "check_pairs": args.check_pairs, "verify_archive": args.verify_archive, "batch_distribution": dict(sorted(batch_distribution.items())), "frame_samples": [entry.relpath for entry in entries[: min(5, len(entries))]], "error": error, } def summarize_results(results): status_counter = Counter(item["status"] for item in results) return { "total_parts": len(results), "status_counts": dict(status_counter), } def collect_batch_relpaths(batch_dir, image_exts): modality_relpaths = {} modality_counts = {} for modality in MODALITIES: image_dir = batch_dir / modality count, relpaths = collect_image_stats(image_dir, image_exts, collect_relpaths=True) modality_counts[modality] = count modality_relpaths[modality] = relpaths or set() return modality_relpaths, modality_counts def collect_frame_entries(batch_dir, image_exts, check_pairs, sample_mismatch_limit): modality_relpaths, modality_counts = collect_batch_relpaths(batch_dir, image_exts) dir_2d = batch_dir / "2D" dir_3d = batch_dir / "3D" relpaths_2d = modality_relpaths["2D"] relpaths_3d = modality_relpaths["3D"] only_in_2d = sorted(relpaths_2d - relpaths_3d) only_in_3d = sorted(relpaths_3d - relpaths_2d) paired_relpaths = sorted(relpaths_2d & relpaths_3d) if check_pairs and (only_in_2d or only_in_3d): raise ValueError( "2D/3D 配对校验失败: " f"batch={batch_dir.name}, only_in_2d={len(only_in_2d)}, only_in_3d={len(only_in_3d)}, " f"only_in_2d_samples={only_in_2d[:sample_mismatch_limit]}, " f"only_in_3d_samples={only_in_3d[:sample_mismatch_limit]}" ) if check_pairs: selected_relpaths = paired_relpaths else: selected_relpaths = sorted(relpaths_2d | relpaths_3d) if check_pairs: entries = [ FrameEntry( batch_name=batch_dir.name, relpath=relpath, path_2d=dir_2d / relpath, path_3d=dir_3d / relpath, ) for relpath in selected_relpaths ] else: entries = [] for relpath in selected_relpaths: path_2d = dir_2d / relpath path_3d = dir_3d / relpath if not path_2d.is_file() or not path_3d.is_file(): continue entries.append( FrameEntry( batch_name=batch_dir.name, relpath=relpath, path_2d=path_2d, path_3d=path_3d, ) ) batch_summary = { "batch_name": batch_dir.name, "count_2d": modality_counts["2D"], "count_3d": modality_counts["3D"], "paired_count": len(paired_relpaths), "selected_count": len(entries), "only_in_2d_count": len(only_in_2d), "only_in_3d_count": len(only_in_3d), "only_in_2d_samples": only_in_2d[:sample_mismatch_limit], "only_in_3d_samples": only_in_3d[:sample_mismatch_limit], } return entries, batch_summary def scan_single_batch( batch_index, total_batches, batch_dir, image_exts, check_pairs, sample_mismatch_limit, ): batch_started = perf_counter() log(f"[SCAN] ({batch_index}/{total_batches}) {batch_dir.name}") batch_entries, batch_summary = collect_frame_entries( batch_dir, image_exts, check_pairs=check_pairs, sample_mismatch_limit=sample_mismatch_limit, ) return batch_index, batch_entries, batch_summary, perf_counter() - batch_started def build_global_frame_index(batch_dirs, image_exts, check_pairs, sample_mismatch_limit, scan_workers): entries = [] batch_summaries = [] total_batches = len(batch_dirs) scan_started = perf_counter() cumulative_entries = 0 if scan_workers <= 1 or total_batches <= 1: ordered_entries = [] ordered_summaries = [] for batch_index, batch_dir in enumerate(batch_dirs, start=1): batch_index, batch_entries, batch_summary, elapsed = scan_single_batch( batch_index, total_batches, batch_dir, image_exts, check_pairs, sample_mismatch_limit, ) ordered_entries.append(batch_entries) ordered_summaries.append(batch_summary) cumulative_entries += len(batch_entries) log( f"[SCAN DONE] ({batch_index}/{total_batches}) {batch_dir.name} " f"2D={batch_summary['count_2d']} 3D={batch_summary['count_3d']} " f"paired={batch_summary['paired_count']} selected={batch_summary['selected_count']} " f"elapsed={elapsed:.1f}s cumulative={cumulative_entries}" ) else: ordered_entries = [None] * total_batches ordered_summaries = [None] * total_batches max_workers = min(scan_workers, total_batches) with ThreadPoolExecutor(max_workers=max_workers) as executor: future_map = { executor.submit( scan_single_batch, batch_index, total_batches, batch_dir, image_exts, check_pairs, sample_mismatch_limit, ): (batch_index, batch_dir) for batch_index, batch_dir in enumerate(batch_dirs, start=1) } for future in as_completed(future_map): batch_index, batch_dir = future_map[future] _, batch_entries, batch_summary, elapsed = future.result() ordered_entries[batch_index - 1] = batch_entries ordered_summaries[batch_index - 1] = batch_summary cumulative_entries += len(batch_entries) log( f"[SCAN DONE] ({batch_index}/{total_batches}) {batch_dir.name} " f"2D={batch_summary['count_2d']} 3D={batch_summary['count_3d']} " f"paired={batch_summary['paired_count']} selected={batch_summary['selected_count']} " f"elapsed={elapsed:.1f}s cumulative={cumulative_entries}" ) for batch_entries in ordered_entries: entries.extend(batch_entries) batch_summaries.extend(ordered_summaries) log( f"[SCAN SUMMARY] batches={total_batches} total_frames={len(entries)} " f"elapsed={perf_counter() - scan_started:.1f}s" ) return entries, batch_summaries def count_parts(total_entries, frames_per_part): return (total_entries + frames_per_part - 1) // frames_per_part def iter_entry_chunks(entries, frames_per_part): for index in range(0, len(entries), frames_per_part): yield entries[index : index + frames_per_part] def package_modality_archive(modality, source_paths, archive_path, source_root, args): create_archive( source_paths, archive_path, source_root, args.archive_backend_resolved, args.archive_format, args.gzip_compresslevel, ) if args.verify_archive: verify_archive(archive_path) archive_sha256 = compute_sha256(archive_path) if args.checksum else None return modality, archive_sha256 def process_part(part_index, entries, source_root, output_root, args): archive_paths = { modality: build_archive_path(output_root, modality, part_index, args.archive_format) for modality in MODALITIES } part_started = perf_counter() header_lines = [ "", "==========================================", f"Part : part_{part_index:04d}", f"Frame cnt : {len(entries)}", ] for modality in MODALITIES: header_lines.append(f"{modality} archive: {archive_paths[modality]}") header_lines.append("==========================================") log("\n".join(header_lines)) if args.skip_existing and all( archive_path.exists() and archive_path.stat().st_size > 0 for archive_path in archive_paths.values() ): archive_sha256 = { modality: compute_sha256(archive_path) if args.checksum else None for modality, archive_path in archive_paths.items() } manifest = build_part_manifest( args, part_index, archive_paths, entries, status="skipped_existing", archive_sha256=archive_sha256, ) write_json(part_manifest_path(output_root, part_index), manifest) log( f"[SKIP] archives already exist for part_{part_index:04d} " f"elapsed={perf_counter() - part_started:.1f}s" ) return manifest if args.dry_run: manifest = build_part_manifest( args, part_index, archive_paths, entries, status="dry_run", ) write_json(part_manifest_path(output_root, part_index), manifest) log( f"[DRY RUN] would create archives for part_{part_index:04d} " f"elapsed={perf_counter() - part_started:.1f}s" ) return manifest archive_sha256 = {} try: source_paths = { "2D": [entry.path_2d for entry in entries], "3D": [entry.path_3d for entry in entries], } max_workers = min(max(1, args.archive_workers), len(MODALITIES)) if max_workers <= 1: for modality in MODALITIES: _, archive_sha = package_modality_archive( modality, source_paths[modality], archive_paths[modality], source_root, args, ) archive_sha256[modality] = archive_sha else: with ThreadPoolExecutor(max_workers=max_workers) as executor: future_map = { executor.submit( package_modality_archive, modality, source_paths[modality], archive_paths[modality], source_root, args, ): modality for modality in MODALITIES } for future in as_completed(future_map): modality, archive_sha = future.result() archive_sha256[modality] = archive_sha manifest = build_part_manifest( args, part_index, archive_paths, entries, status="packaged", archive_sha256=archive_sha256, ) write_json(part_manifest_path(output_root, part_index), manifest) log(f"[OK] packaged part_{part_index:04d} elapsed={perf_counter() - part_started:.1f}s") return manifest except Exception as exc: manifest = build_part_manifest( args, part_index, archive_paths, entries, status="failed", error=str(exc), ) write_json(part_manifest_path(output_root, part_index), manifest) log(f"[FAIL] part_{part_index:04d}: {exc}") return manifest def package_parts(entries, source_root, output_root, args): part_specs = [ ( part_index, entries[start_index : start_index + args.frames_per_part], ) for part_index, start_index in enumerate( range(0, len(entries), args.frames_per_part), start=1, ) ] total_parts = len(part_specs) if args.part_workers <= 1 or total_parts <= 1: return [ process_part(part_index, part_entries, source_root, output_root, args) for part_index, part_entries in part_specs ] ordered_results = [None] * total_parts max_workers = min(args.part_workers, total_parts) log(f"[PACKAGE] part_workers={max_workers} archive_workers={args.archive_workers}") with ThreadPoolExecutor(max_workers=max_workers) as executor: future_map = { executor.submit( process_part, part_index, part_entries, source_root, output_root, args, ): part_index for part_index, part_entries in part_specs } for future in as_completed(future_map): part_index = future_map[future] ordered_results[part_index - 1] = future.result() return ordered_results def build_overall_manifest( args, target_path, batch_root, output_root, resolve_mode, image_exts, batch_summaries, total_frames, results, ): return { "generated_at": now_str(), "target_path": str(target_path.resolve()), "batch_root": str(batch_root.resolve()), "output_root": str(output_root.resolve()), "resolve_mode": resolve_mode, "batch_glob": args.batch_glob, "archive_format": args.archive_format, "archive_backend_requested": args.archive_backend, "archive_backend_resolved": args.archive_backend_resolved, "frames_per_part": args.frames_per_part, "image_exts": list(image_exts), "check_pairs": args.check_pairs, "sample_mismatch_limit": args.sample_mismatch_limit, "part_workers": args.part_workers, "archive_workers": args.archive_workers, "skip_existing": args.skip_existing, "checksum": args.checksum, "verify_archive": args.verify_archive, "remove_source_after_success": args.remove_source_after_success, "dry_run": args.dry_run, "total_frames": total_frames, "total_batches": len(batch_summaries), "batch_summaries": batch_summaries, "summary": summarize_results(results), "parts": results, } def main(): try: args = parse_args() if args.frames_per_part <= 0: raise ValueError("frames-per-part 必须大于 0") if args.scan_workers <= 0: raise ValueError("scan-workers 必须大于 0") if args.archive_workers <= 0: raise ValueError("archive-workers 必须大于 0") if args.part_workers <= 0: raise ValueError("part-workers 必须大于 0") if not 0 <= args.gzip_compresslevel <= 9: raise ValueError("gzip-compresslevel 必须在 0 到 9 之间") image_exts = normalize_image_exts(args.image_exts) args.archive_backend_resolved = resolve_archive_backend( args.archive_backend, args.archive_format, args.gzip_compresslevel, ) target_path = Path(args.target_path).resolve() batch_root, batch_dirs, resolve_mode = resolve_batch_dirs(target_path, args.batch_glob) output_root = Path(args.output_root).resolve() if args.output_root else default_output_root(batch_root) ensure_safe_output_root(batch_root, output_root) output_root.mkdir(parents=True, exist_ok=True) for modality in MODALITIES: (output_root / modality).mkdir(parents=True, exist_ok=True) log("") log("######################################################################") log("# Package visualization batches") log("######################################################################") log(f"Mode : {resolve_mode}") log(f"Target : {target_path}") log(f"Batch root : {batch_root}") log(f"Output root : {output_root}") log(f"Archive format : {args.archive_format}") log( f"Archive backend: {args.archive_backend_resolved} " f"(requested={args.archive_backend})" ) log(f"Frames/part : {args.frames_per_part}") log(f"Scan workers : {args.scan_workers}") log(f"Archive workers: {args.archive_workers}") log(f"Part workers : {args.part_workers}") log(f"Gzip level : {args.gzip_compresslevel}") log(f"Check pairs : {args.check_pairs}") log(f"Skip existing : {args.skip_existing}") log(f"Checksum : {args.checksum}") log(f"Verify archive : {args.verify_archive}") log(f"Remove source : {args.remove_source_after_success}") log(f"Dry run : {args.dry_run}") log(f"Total batches : {len(batch_dirs)}") entries, batch_summaries = build_global_frame_index( batch_dirs, image_exts, check_pairs=args.check_pairs, sample_mismatch_limit=args.sample_mismatch_limit, scan_workers=args.scan_workers, ) if not entries: raise ValueError("未找到可用于打包的 2D/3D 配对帧") total_parts = count_parts(len(entries), args.frames_per_part) log(f"Total frames : {len(entries)}") log(f"Total parts : {total_parts}") results = package_parts(entries, batch_root, output_root, args) overall_manifest = build_overall_manifest( args, target_path, batch_root, output_root, resolve_mode, image_exts, batch_summaries, len(entries), results, ) overall_manifest_path = output_root / DEFAULT_MANIFEST_NAME write_json(overall_manifest_path, overall_manifest) summary = overall_manifest["summary"] log("") log( "[DONE] total_parts={total} status_counts={status_counts}".format( total=summary["total_parts"], status_counts=summary["status_counts"], ) ) log(f"Manifest: {overall_manifest_path}") failed_statuses = {"failed"} has_failure = any(item["status"] in failed_statuses for item in results) if args.remove_source_after_success and not args.dry_run and not has_failure: for batch_dir in batch_dirs: if batch_dir.exists(): shutil.rmtree(batch_dir) return 1 if has_failure else 0 except Exception as exc: log(f"[ERROR] {exc}") return 1 if __name__ == "__main__": raise SystemExit(main())