1054 lines
35 KiB
Python
Executable File
1054 lines
35 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# coding: utf-8
|
||
|
||
import argparse
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import tarfile
|
||
from collections import Counter
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from threading import Lock
|
||
from time import perf_counter
|
||
|
||
from count_visualization_batches import (
|
||
DEFAULT_IMAGE_EXTS,
|
||
DEFAULT_SHOWPATH_ROOT,
|
||
collect_image_stats,
|
||
normalize_image_exts,
|
||
)
|
||
|
||
|
||
DEFAULT_BATCH_GLOB = "batch_*"
|
||
DEFAULT_ARCHIVE_FORMAT = "tar.gz"
|
||
DEFAULT_ARCHIVE_BACKEND = "auto"
|
||
DEFAULT_MANIFEST_NAME = "archives_manifest.json"
|
||
DEFAULT_FRAMES_PER_PART = 2000
|
||
MODALITIES = ("2D", "3D")
|
||
LOG_LOCK = Lock()
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class FrameEntry:
|
||
batch_name: str
|
||
relpath: str
|
||
path_2d: Path
|
||
path_3d: Path
|
||
|
||
|
||
def log(message=""):
|
||
with LOG_LOCK:
|
||
print(message, flush=True)
|
||
|
||
|
||
def default_scan_workers():
|
||
return max(1, min(8, os.cpu_count() or 1))
|
||
|
||
|
||
def default_archive_workers():
|
||
return len(MODALITIES)
|
||
|
||
|
||
def default_part_workers():
|
||
return 1
|
||
|
||
|
||
def parse_args():
|
||
parser = argparse.ArgumentParser(
|
||
description="汇总所有 batch 的 2D/3D 可视化结果,并按固定帧数分片打包。"
|
||
)
|
||
parser.add_argument(
|
||
"target_path",
|
||
nargs="?",
|
||
default=DEFAULT_SHOWPATH_ROOT,
|
||
help="可视化 batch 根目录,或单个 batch 目录。",
|
||
)
|
||
parser.add_argument(
|
||
"output_root",
|
||
nargs="?",
|
||
default=None,
|
||
help="归档输出目录。默认输出到 <batch_root> 的并列目录 <name>_archives。",
|
||
)
|
||
parser.add_argument(
|
||
"--batch-glob",
|
||
default=DEFAULT_BATCH_GLOB,
|
||
help="batch 根目录模式下用于查找 batch 的 glob,默认 batch_*。",
|
||
)
|
||
parser.add_argument(
|
||
"--archive-format",
|
||
choices=["tar", "tar.gz"],
|
||
default=DEFAULT_ARCHIVE_FORMAT,
|
||
help="归档格式,默认 tar.gz。",
|
||
)
|
||
parser.add_argument(
|
||
"--frames-per-part",
|
||
type=int,
|
||
default=DEFAULT_FRAMES_PER_PART,
|
||
help="每个 part 包含多少帧,默认 2000。",
|
||
)
|
||
parser.add_argument(
|
||
"--scan-workers",
|
||
type=int,
|
||
default=default_scan_workers(),
|
||
help="扫描 batch 的并发线程数,默认按机器核数自适应,最多 8。",
|
||
)
|
||
parser.add_argument(
|
||
"--archive-workers",
|
||
type=int,
|
||
default=default_archive_workers(),
|
||
help="单个 part 内归档并发数,默认 2,同时处理 2D 和 3D。",
|
||
)
|
||
parser.add_argument(
|
||
"--part-workers",
|
||
type=int,
|
||
default=default_part_workers(),
|
||
help="同时处理多少个 part,默认 1;每个 part 内仍会按 archive-workers 并发处理 2D/3D。",
|
||
)
|
||
parser.add_argument(
|
||
"--archive-backend",
|
||
choices=["auto", "python", "system_tar"],
|
||
default=DEFAULT_ARCHIVE_BACKEND,
|
||
help="归档实现后端,默认 auto;优先使用系统 tar/gzip,以提升大批量打包速度。",
|
||
)
|
||
parser.add_argument(
|
||
"--gzip-compresslevel",
|
||
type=int,
|
||
default=1,
|
||
help="tar.gz 压缩级别,范围 0-9,默认 1 以优先速度。",
|
||
)
|
||
parser.add_argument(
|
||
"--image-exts",
|
||
default=DEFAULT_IMAGE_EXTS,
|
||
help="参与校验统计的图片扩展名,逗号分隔,默认 .jpg,.jpeg,.png。",
|
||
)
|
||
parser.add_argument(
|
||
"--check-pairs",
|
||
action="store_true",
|
||
default=True,
|
||
help="打包前校验 2D/3D 相对路径是否一一对应,默认开启。",
|
||
)
|
||
parser.add_argument(
|
||
"--no-check-pairs",
|
||
dest="check_pairs",
|
||
action="store_false",
|
||
help="关闭 2D/3D 一一对应校验,仅分别收集文件。",
|
||
)
|
||
parser.add_argument(
|
||
"--sample-mismatch-limit",
|
||
type=int,
|
||
default=10,
|
||
help="2D/3D 配对校验失败时,manifest 中最多保留多少条样例。",
|
||
)
|
||
parser.add_argument(
|
||
"--skip-existing",
|
||
action="store_true",
|
||
help="若目标 part 归档已存在且大小大于 0,则跳过该 part。",
|
||
)
|
||
parser.add_argument(
|
||
"--checksum",
|
||
action="store_true",
|
||
help="归档完成后计算 sha256。",
|
||
)
|
||
parser.add_argument(
|
||
"--verify-archive",
|
||
action="store_true",
|
||
default=True,
|
||
help="归档完成后快速校验非空且可读取,默认开启。",
|
||
)
|
||
parser.add_argument(
|
||
"--no-verify-archive",
|
||
dest="verify_archive",
|
||
action="store_false",
|
||
help="关闭归档完成后的快速校验,以进一步提速。",
|
||
)
|
||
parser.add_argument(
|
||
"--remove-source-after-success",
|
||
action="store_true",
|
||
help="所有归档全部成功后删除源 batch 目录。默认关闭,请谨慎使用。",
|
||
)
|
||
parser.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="只做校验和计划输出,不实际创建归档或删除源目录。",
|
||
)
|
||
return parser.parse_args()
|
||
|
||
|
||
def now_str():
|
||
return datetime.now().isoformat(timespec="seconds")
|
||
|
||
|
||
def is_relative_to(path, other):
|
||
try:
|
||
path.relative_to(other)
|
||
return True
|
||
except ValueError:
|
||
return False
|
||
|
||
|
||
def is_batch_dir(path):
|
||
return path.is_dir() and ((path / "2D").is_dir() or (path / "3D").is_dir())
|
||
|
||
|
||
def resolve_batch_dirs(target_path, batch_glob):
|
||
if is_batch_dir(target_path):
|
||
return target_path.parent, [target_path], "single_batch"
|
||
|
||
if not target_path.is_dir():
|
||
raise FileNotFoundError(f"target path does not exist or is not a directory: {target_path}")
|
||
|
||
batch_dirs = sorted(path for path in target_path.glob(batch_glob) if is_batch_dir(path))
|
||
if not batch_dirs:
|
||
raise FileNotFoundError(
|
||
f"no batch directories found under {target_path} with batch_glob={batch_glob}"
|
||
)
|
||
return target_path, batch_dirs, "batch_root"
|
||
|
||
|
||
def default_output_root(batch_root):
|
||
return batch_root.parent / f"{batch_root.name}_archives"
|
||
|
||
|
||
def archive_suffix(archive_format):
|
||
return "tar.gz" if archive_format == "tar.gz" else "tar"
|
||
|
||
|
||
def archive_basename(archive_path):
|
||
name = archive_path.name
|
||
if name.endswith(".tar.gz"):
|
||
return name[: -len(".tar.gz")]
|
||
if name.endswith(".tar"):
|
||
return name[: -len(".tar")]
|
||
return archive_path.stem
|
||
|
||
|
||
def build_archive_path(output_root, modality, part_index, archive_format):
|
||
return output_root / modality / f"part_{part_index:04d}.{archive_suffix(archive_format)}"
|
||
|
||
|
||
def write_json(path, data):
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def ensure_safe_output_root(root_dir, output_root):
|
||
batch_dir_resolved = root_dir.resolve()
|
||
output_root_resolved = output_root.resolve()
|
||
if is_relative_to(output_root_resolved, batch_dir_resolved):
|
||
raise ValueError(
|
||
"output_root 不允许位于源目录内部,"
|
||
f"source_root={batch_dir_resolved}, output_root={output_root_resolved}"
|
||
)
|
||
|
||
|
||
def supports_system_tar_backend(archive_format, gzip_compresslevel):
|
||
if shutil.which("tar") is None:
|
||
return False
|
||
if archive_format != "tar.gz":
|
||
return True
|
||
if gzip_compresslevel == 0:
|
||
return False
|
||
return shutil.which("gzip") is not None
|
||
|
||
|
||
def resolve_archive_backend(requested_backend, archive_format, gzip_compresslevel):
|
||
system_tar_supported = supports_system_tar_backend(
|
||
archive_format,
|
||
gzip_compresslevel,
|
||
)
|
||
if requested_backend == "auto":
|
||
return "system_tar" if system_tar_supported else "python"
|
||
|
||
if requested_backend == "system_tar":
|
||
if shutil.which("tar") is None:
|
||
raise RuntimeError("archive-backend=system_tar 需要系统 tar 命令。")
|
||
if archive_format == "tar.gz" and shutil.which("gzip") is None:
|
||
raise RuntimeError("archive-backend=system_tar 在 tar.gz 模式下需要 gzip 命令。")
|
||
if archive_format == "tar.gz" and gzip_compresslevel == 0:
|
||
raise ValueError(
|
||
"archive-backend=system_tar 当前不支持 gzip-compresslevel=0,"
|
||
"请改用 python 后端或将压缩级别设为 1-9。"
|
||
)
|
||
return "system_tar"
|
||
|
||
return "python"
|
||
|
||
|
||
def open_tarfile(archive_path, archive_format, gzip_compresslevel):
|
||
mode = "w:gz" if archive_format == "tar.gz" else "w"
|
||
if archive_format == "tar.gz":
|
||
return tarfile.open(archive_path, mode=mode, compresslevel=gzip_compresslevel)
|
||
return tarfile.open(archive_path, mode=mode)
|
||
|
||
|
||
def build_archive_member_dir(archive_path):
|
||
return f"{archive_basename(archive_path)}/images"
|
||
|
||
|
||
def build_archive_entry_name(archive_path, source_path):
|
||
return f"{build_archive_member_dir(archive_path)}/{source_path.name}"
|
||
|
||
|
||
def ensure_unique_archive_names(source_paths, archive_path):
|
||
seen_names = {}
|
||
for source_path in source_paths:
|
||
archive_name = build_archive_entry_name(archive_path, source_path)
|
||
if archive_name in seen_names:
|
||
raise ValueError(
|
||
"archive entry name duplicated: "
|
||
f"{archive_name}, first={seen_names[archive_name]}, duplicate={source_path}"
|
||
)
|
||
seen_names[archive_name] = source_path
|
||
|
||
|
||
def build_relative_source_paths(source_paths, source_root):
|
||
source_root = source_root.resolve()
|
||
relative_paths = []
|
||
for source_path in source_paths:
|
||
try:
|
||
relative_path = source_path.relative_to(source_root)
|
||
except ValueError as exc:
|
||
raise ValueError(
|
||
f"source path is outside source_root: source_root={source_root}, path={source_path}"
|
||
) from exc
|
||
relative_paths.append(relative_path.as_posix())
|
||
return relative_paths
|
||
|
||
|
||
def decode_stderr(stderr_bytes):
|
||
if not stderr_bytes:
|
||
return ""
|
||
return stderr_bytes.decode("utf-8", errors="replace").strip()
|
||
|
||
|
||
def escape_tar_transform_replacement(value):
|
||
return value.replace("\\", "\\\\").replace("&", r"\&").replace("|", r"\|")
|
||
|
||
|
||
def create_archive_with_python_tar(
|
||
source_paths,
|
||
archive_path,
|
||
archive_format,
|
||
gzip_compresslevel,
|
||
):
|
||
archive_path.parent.mkdir(parents=True, exist_ok=True)
|
||
partial_archive = archive_path.with_name(f"{archive_path.name}.partial")
|
||
if partial_archive.exists():
|
||
partial_archive.unlink()
|
||
|
||
ensure_unique_archive_names(source_paths, archive_path)
|
||
try:
|
||
with open_tarfile(partial_archive, archive_format, gzip_compresslevel) as tar_obj:
|
||
for source_path in source_paths:
|
||
tar_obj.add(
|
||
source_path,
|
||
arcname=build_archive_entry_name(archive_path, source_path),
|
||
recursive=False,
|
||
)
|
||
|
||
partial_archive.replace(archive_path)
|
||
finally:
|
||
if partial_archive.exists():
|
||
partial_archive.unlink()
|
||
|
||
|
||
def create_archive_with_system_tar(
|
||
source_paths,
|
||
archive_path,
|
||
source_root,
|
||
archive_format,
|
||
gzip_compresslevel,
|
||
):
|
||
archive_path.parent.mkdir(parents=True, exist_ok=True)
|
||
partial_archive = archive_path.with_name(f"{archive_path.name}.partial")
|
||
if partial_archive.exists():
|
||
partial_archive.unlink()
|
||
|
||
ensure_unique_archive_names(source_paths, archive_path)
|
||
relative_paths = build_relative_source_paths(source_paths, source_root)
|
||
relative_paths_input = b"".join(os.fsencode(path) + b"\0" for path in relative_paths)
|
||
archive_member_dir = escape_tar_transform_replacement(build_archive_member_dir(archive_path))
|
||
transform_rule = f"s|.*/||;s|^|{archive_member_dir}/|"
|
||
|
||
try:
|
||
if archive_format == "tar":
|
||
tar_cmd = [
|
||
"tar",
|
||
"-cf",
|
||
str(partial_archive),
|
||
"--directory",
|
||
str(source_root),
|
||
"--transform",
|
||
transform_rule,
|
||
"--no-recursion",
|
||
"--null",
|
||
"--files-from",
|
||
"-",
|
||
]
|
||
completed = subprocess.run(
|
||
tar_cmd,
|
||
input=relative_paths_input,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.PIPE,
|
||
check=False,
|
||
)
|
||
if completed.returncode != 0:
|
||
raise RuntimeError(
|
||
"system tar failed: "
|
||
f"{decode_stderr(completed.stderr) or 'unknown error'}"
|
||
)
|
||
else:
|
||
tar_cmd = [
|
||
"tar",
|
||
"-cf",
|
||
"-",
|
||
"--directory",
|
||
str(source_root),
|
||
"--transform",
|
||
transform_rule,
|
||
"--no-recursion",
|
||
"--null",
|
||
"--files-from",
|
||
"-",
|
||
]
|
||
gzip_cmd = ["gzip", f"-{gzip_compresslevel}", "-c"]
|
||
with open(partial_archive, "wb") as archive_file:
|
||
tar_proc = subprocess.Popen(
|
||
tar_cmd,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
try:
|
||
gzip_proc = subprocess.Popen(
|
||
gzip_cmd,
|
||
stdin=tar_proc.stdout,
|
||
stdout=archive_file,
|
||
stderr=subprocess.PIPE,
|
||
)
|
||
except Exception:
|
||
tar_proc.kill()
|
||
tar_proc.wait()
|
||
raise
|
||
|
||
tar_proc.stdout.close()
|
||
_, tar_stderr = tar_proc.communicate(input=relative_paths_input)
|
||
_, gzip_stderr = gzip_proc.communicate()
|
||
|
||
if tar_proc.returncode != 0:
|
||
raise RuntimeError(
|
||
"system tar failed: "
|
||
f"{decode_stderr(tar_stderr) or 'unknown error'}"
|
||
)
|
||
if gzip_proc.returncode != 0:
|
||
raise RuntimeError(
|
||
"gzip failed: "
|
||
f"{decode_stderr(gzip_stderr) or 'unknown error'}"
|
||
)
|
||
|
||
partial_archive.replace(archive_path)
|
||
finally:
|
||
if partial_archive.exists():
|
||
partial_archive.unlink()
|
||
|
||
|
||
def create_archive(
|
||
source_paths,
|
||
archive_path,
|
||
source_root,
|
||
archive_backend,
|
||
archive_format,
|
||
gzip_compresslevel,
|
||
):
|
||
if archive_backend == "system_tar":
|
||
create_archive_with_system_tar(
|
||
source_paths,
|
||
archive_path,
|
||
source_root,
|
||
archive_format,
|
||
gzip_compresslevel,
|
||
)
|
||
return
|
||
|
||
create_archive_with_python_tar(
|
||
source_paths,
|
||
archive_path,
|
||
archive_format,
|
||
gzip_compresslevel,
|
||
)
|
||
|
||
|
||
def verify_archive(archive_path):
|
||
if not archive_path.exists():
|
||
raise FileNotFoundError(f"archive not found: {archive_path}")
|
||
if archive_path.stat().st_size <= 0:
|
||
raise ValueError(f"archive size is 0: {archive_path}")
|
||
|
||
with tarfile.open(archive_path, mode="r:*") as tar_obj:
|
||
first_member = tar_obj.next()
|
||
if first_member is None:
|
||
raise ValueError(f"archive is empty: {archive_path}")
|
||
|
||
|
||
def compute_sha256(file_path):
|
||
digest = hashlib.sha256()
|
||
with open(file_path, "rb") as f:
|
||
while True:
|
||
chunk = f.read(1024 * 1024)
|
||
if not chunk:
|
||
break
|
||
digest.update(chunk)
|
||
return digest.hexdigest()
|
||
|
||
|
||
def part_manifest_path(output_root, part_index):
|
||
return output_root / f"part_{part_index:04d}.manifest.json"
|
||
|
||
|
||
def build_part_manifest(
|
||
args,
|
||
part_index,
|
||
archive_paths,
|
||
entries,
|
||
status,
|
||
error=None,
|
||
archive_sha256=None,
|
||
):
|
||
archive_details = {}
|
||
for modality, archive_path in archive_paths.items():
|
||
archive_exists = archive_path.exists()
|
||
archive_details[modality] = {
|
||
"archive_path": str(archive_path.resolve()),
|
||
"archive_exists": archive_exists,
|
||
"archive_size_bytes": archive_path.stat().st_size if archive_exists else 0,
|
||
"archive_sha256": archive_sha256.get(modality) if archive_sha256 else None,
|
||
}
|
||
|
||
batch_distribution = Counter(entry.batch_name for entry in entries)
|
||
return {
|
||
"created_at": now_str(),
|
||
"part_name": f"part_{part_index:04d}",
|
||
"part_index": part_index,
|
||
"archive_format": args.archive_format,
|
||
"archive_backend": args.archive_backend_resolved,
|
||
"frames_per_part": args.frames_per_part,
|
||
"frame_count": len(entries),
|
||
"archive_details": archive_details,
|
||
"status": status,
|
||
"dry_run": args.dry_run,
|
||
"check_pairs": args.check_pairs,
|
||
"verify_archive": args.verify_archive,
|
||
"batch_distribution": dict(sorted(batch_distribution.items())),
|
||
"frame_samples": [entry.relpath for entry in entries[: min(5, len(entries))]],
|
||
"error": error,
|
||
}
|
||
|
||
|
||
def summarize_results(results):
|
||
status_counter = Counter(item["status"] for item in results)
|
||
return {
|
||
"total_parts": len(results),
|
||
"status_counts": dict(status_counter),
|
||
}
|
||
|
||
|
||
def collect_batch_relpaths(batch_dir, image_exts):
|
||
modality_relpaths = {}
|
||
modality_counts = {}
|
||
for modality in MODALITIES:
|
||
image_dir = batch_dir / modality
|
||
count, relpaths = collect_image_stats(image_dir, image_exts, collect_relpaths=True)
|
||
modality_counts[modality] = count
|
||
modality_relpaths[modality] = relpaths or set()
|
||
return modality_relpaths, modality_counts
|
||
|
||
|
||
def collect_frame_entries(batch_dir, image_exts, check_pairs, sample_mismatch_limit):
|
||
modality_relpaths, modality_counts = collect_batch_relpaths(batch_dir, image_exts)
|
||
dir_2d = batch_dir / "2D"
|
||
dir_3d = batch_dir / "3D"
|
||
relpaths_2d = modality_relpaths["2D"]
|
||
relpaths_3d = modality_relpaths["3D"]
|
||
|
||
only_in_2d = sorted(relpaths_2d - relpaths_3d)
|
||
only_in_3d = sorted(relpaths_3d - relpaths_2d)
|
||
paired_relpaths = sorted(relpaths_2d & relpaths_3d)
|
||
|
||
if check_pairs and (only_in_2d or only_in_3d):
|
||
raise ValueError(
|
||
"2D/3D 配对校验失败: "
|
||
f"batch={batch_dir.name}, only_in_2d={len(only_in_2d)}, only_in_3d={len(only_in_3d)}, "
|
||
f"only_in_2d_samples={only_in_2d[:sample_mismatch_limit]}, "
|
||
f"only_in_3d_samples={only_in_3d[:sample_mismatch_limit]}"
|
||
)
|
||
|
||
if check_pairs:
|
||
selected_relpaths = paired_relpaths
|
||
else:
|
||
selected_relpaths = sorted(relpaths_2d | relpaths_3d)
|
||
|
||
if check_pairs:
|
||
entries = [
|
||
FrameEntry(
|
||
batch_name=batch_dir.name,
|
||
relpath=relpath,
|
||
path_2d=dir_2d / relpath,
|
||
path_3d=dir_3d / relpath,
|
||
)
|
||
for relpath in selected_relpaths
|
||
]
|
||
else:
|
||
entries = []
|
||
for relpath in selected_relpaths:
|
||
path_2d = dir_2d / relpath
|
||
path_3d = dir_3d / relpath
|
||
if not path_2d.is_file() or not path_3d.is_file():
|
||
continue
|
||
entries.append(
|
||
FrameEntry(
|
||
batch_name=batch_dir.name,
|
||
relpath=relpath,
|
||
path_2d=path_2d,
|
||
path_3d=path_3d,
|
||
)
|
||
)
|
||
|
||
batch_summary = {
|
||
"batch_name": batch_dir.name,
|
||
"count_2d": modality_counts["2D"],
|
||
"count_3d": modality_counts["3D"],
|
||
"paired_count": len(paired_relpaths),
|
||
"selected_count": len(entries),
|
||
"only_in_2d_count": len(only_in_2d),
|
||
"only_in_3d_count": len(only_in_3d),
|
||
"only_in_2d_samples": only_in_2d[:sample_mismatch_limit],
|
||
"only_in_3d_samples": only_in_3d[:sample_mismatch_limit],
|
||
}
|
||
return entries, batch_summary
|
||
|
||
|
||
def scan_single_batch(
|
||
batch_index,
|
||
total_batches,
|
||
batch_dir,
|
||
image_exts,
|
||
check_pairs,
|
||
sample_mismatch_limit,
|
||
):
|
||
batch_started = perf_counter()
|
||
log(f"[SCAN] ({batch_index}/{total_batches}) {batch_dir.name}")
|
||
batch_entries, batch_summary = collect_frame_entries(
|
||
batch_dir,
|
||
image_exts,
|
||
check_pairs=check_pairs,
|
||
sample_mismatch_limit=sample_mismatch_limit,
|
||
)
|
||
return batch_index, batch_entries, batch_summary, perf_counter() - batch_started
|
||
|
||
|
||
def build_global_frame_index(batch_dirs, image_exts, check_pairs, sample_mismatch_limit, scan_workers):
|
||
entries = []
|
||
batch_summaries = []
|
||
total_batches = len(batch_dirs)
|
||
scan_started = perf_counter()
|
||
cumulative_entries = 0
|
||
|
||
if scan_workers <= 1 or total_batches <= 1:
|
||
ordered_entries = []
|
||
ordered_summaries = []
|
||
for batch_index, batch_dir in enumerate(batch_dirs, start=1):
|
||
batch_index, batch_entries, batch_summary, elapsed = scan_single_batch(
|
||
batch_index,
|
||
total_batches,
|
||
batch_dir,
|
||
image_exts,
|
||
check_pairs,
|
||
sample_mismatch_limit,
|
||
)
|
||
ordered_entries.append(batch_entries)
|
||
ordered_summaries.append(batch_summary)
|
||
cumulative_entries += len(batch_entries)
|
||
log(
|
||
f"[SCAN DONE] ({batch_index}/{total_batches}) {batch_dir.name} "
|
||
f"2D={batch_summary['count_2d']} 3D={batch_summary['count_3d']} "
|
||
f"paired={batch_summary['paired_count']} selected={batch_summary['selected_count']} "
|
||
f"elapsed={elapsed:.1f}s cumulative={cumulative_entries}"
|
||
)
|
||
else:
|
||
ordered_entries = [None] * total_batches
|
||
ordered_summaries = [None] * total_batches
|
||
max_workers = min(scan_workers, total_batches)
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
future_map = {
|
||
executor.submit(
|
||
scan_single_batch,
|
||
batch_index,
|
||
total_batches,
|
||
batch_dir,
|
||
image_exts,
|
||
check_pairs,
|
||
sample_mismatch_limit,
|
||
): (batch_index, batch_dir)
|
||
for batch_index, batch_dir in enumerate(batch_dirs, start=1)
|
||
}
|
||
for future in as_completed(future_map):
|
||
batch_index, batch_dir = future_map[future]
|
||
_, batch_entries, batch_summary, elapsed = future.result()
|
||
ordered_entries[batch_index - 1] = batch_entries
|
||
ordered_summaries[batch_index - 1] = batch_summary
|
||
cumulative_entries += len(batch_entries)
|
||
log(
|
||
f"[SCAN DONE] ({batch_index}/{total_batches}) {batch_dir.name} "
|
||
f"2D={batch_summary['count_2d']} 3D={batch_summary['count_3d']} "
|
||
f"paired={batch_summary['paired_count']} selected={batch_summary['selected_count']} "
|
||
f"elapsed={elapsed:.1f}s cumulative={cumulative_entries}"
|
||
)
|
||
|
||
for batch_entries in ordered_entries:
|
||
entries.extend(batch_entries)
|
||
batch_summaries.extend(ordered_summaries)
|
||
|
||
log(
|
||
f"[SCAN SUMMARY] batches={total_batches} total_frames={len(entries)} "
|
||
f"elapsed={perf_counter() - scan_started:.1f}s"
|
||
)
|
||
return entries, batch_summaries
|
||
|
||
|
||
def count_parts(total_entries, frames_per_part):
|
||
return (total_entries + frames_per_part - 1) // frames_per_part
|
||
|
||
|
||
def iter_entry_chunks(entries, frames_per_part):
|
||
for index in range(0, len(entries), frames_per_part):
|
||
yield entries[index : index + frames_per_part]
|
||
|
||
|
||
def package_modality_archive(modality, source_paths, archive_path, source_root, args):
|
||
create_archive(
|
||
source_paths,
|
||
archive_path,
|
||
source_root,
|
||
args.archive_backend_resolved,
|
||
args.archive_format,
|
||
args.gzip_compresslevel,
|
||
)
|
||
if args.verify_archive:
|
||
verify_archive(archive_path)
|
||
archive_sha256 = compute_sha256(archive_path) if args.checksum else None
|
||
return modality, archive_sha256
|
||
|
||
|
||
def process_part(part_index, entries, source_root, output_root, args):
|
||
archive_paths = {
|
||
modality: build_archive_path(output_root, modality, part_index, args.archive_format)
|
||
for modality in MODALITIES
|
||
}
|
||
|
||
part_started = perf_counter()
|
||
header_lines = [
|
||
"",
|
||
"==========================================",
|
||
f"Part : part_{part_index:04d}",
|
||
f"Frame cnt : {len(entries)}",
|
||
]
|
||
for modality in MODALITIES:
|
||
header_lines.append(f"{modality} archive: {archive_paths[modality]}")
|
||
header_lines.append("==========================================")
|
||
log("\n".join(header_lines))
|
||
|
||
if args.skip_existing and all(
|
||
archive_path.exists() and archive_path.stat().st_size > 0
|
||
for archive_path in archive_paths.values()
|
||
):
|
||
archive_sha256 = {
|
||
modality: compute_sha256(archive_path) if args.checksum else None
|
||
for modality, archive_path in archive_paths.items()
|
||
}
|
||
manifest = build_part_manifest(
|
||
args,
|
||
part_index,
|
||
archive_paths,
|
||
entries,
|
||
status="skipped_existing",
|
||
archive_sha256=archive_sha256,
|
||
)
|
||
write_json(part_manifest_path(output_root, part_index), manifest)
|
||
log(
|
||
f"[SKIP] archives already exist for part_{part_index:04d} "
|
||
f"elapsed={perf_counter() - part_started:.1f}s"
|
||
)
|
||
return manifest
|
||
|
||
if args.dry_run:
|
||
manifest = build_part_manifest(
|
||
args,
|
||
part_index,
|
||
archive_paths,
|
||
entries,
|
||
status="dry_run",
|
||
)
|
||
write_json(part_manifest_path(output_root, part_index), manifest)
|
||
log(
|
||
f"[DRY RUN] would create archives for part_{part_index:04d} "
|
||
f"elapsed={perf_counter() - part_started:.1f}s"
|
||
)
|
||
return manifest
|
||
|
||
archive_sha256 = {}
|
||
try:
|
||
source_paths = {
|
||
"2D": [entry.path_2d for entry in entries],
|
||
"3D": [entry.path_3d for entry in entries],
|
||
}
|
||
max_workers = min(max(1, args.archive_workers), len(MODALITIES))
|
||
if max_workers <= 1:
|
||
for modality in MODALITIES:
|
||
_, archive_sha = package_modality_archive(
|
||
modality,
|
||
source_paths[modality],
|
||
archive_paths[modality],
|
||
source_root,
|
||
args,
|
||
)
|
||
archive_sha256[modality] = archive_sha
|
||
else:
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
future_map = {
|
||
executor.submit(
|
||
package_modality_archive,
|
||
modality,
|
||
source_paths[modality],
|
||
archive_paths[modality],
|
||
source_root,
|
||
args,
|
||
): modality
|
||
for modality in MODALITIES
|
||
}
|
||
for future in as_completed(future_map):
|
||
modality, archive_sha = future.result()
|
||
archive_sha256[modality] = archive_sha
|
||
|
||
manifest = build_part_manifest(
|
||
args,
|
||
part_index,
|
||
archive_paths,
|
||
entries,
|
||
status="packaged",
|
||
archive_sha256=archive_sha256,
|
||
)
|
||
write_json(part_manifest_path(output_root, part_index), manifest)
|
||
log(f"[OK] packaged part_{part_index:04d} elapsed={perf_counter() - part_started:.1f}s")
|
||
return manifest
|
||
except Exception as exc:
|
||
manifest = build_part_manifest(
|
||
args,
|
||
part_index,
|
||
archive_paths,
|
||
entries,
|
||
status="failed",
|
||
error=str(exc),
|
||
)
|
||
write_json(part_manifest_path(output_root, part_index), manifest)
|
||
log(f"[FAIL] part_{part_index:04d}: {exc}")
|
||
return manifest
|
||
|
||
|
||
def package_parts(entries, source_root, output_root, args):
|
||
part_specs = [
|
||
(
|
||
part_index,
|
||
entries[start_index : start_index + args.frames_per_part],
|
||
)
|
||
for part_index, start_index in enumerate(
|
||
range(0, len(entries), args.frames_per_part),
|
||
start=1,
|
||
)
|
||
]
|
||
total_parts = len(part_specs)
|
||
if args.part_workers <= 1 or total_parts <= 1:
|
||
return [
|
||
process_part(part_index, part_entries, source_root, output_root, args)
|
||
for part_index, part_entries in part_specs
|
||
]
|
||
|
||
ordered_results = [None] * total_parts
|
||
max_workers = min(args.part_workers, total_parts)
|
||
log(f"[PACKAGE] part_workers={max_workers} archive_workers={args.archive_workers}")
|
||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||
future_map = {
|
||
executor.submit(
|
||
process_part,
|
||
part_index,
|
||
part_entries,
|
||
source_root,
|
||
output_root,
|
||
args,
|
||
): part_index
|
||
for part_index, part_entries in part_specs
|
||
}
|
||
for future in as_completed(future_map):
|
||
part_index = future_map[future]
|
||
ordered_results[part_index - 1] = future.result()
|
||
|
||
return ordered_results
|
||
|
||
|
||
def build_overall_manifest(
|
||
args,
|
||
target_path,
|
||
batch_root,
|
||
output_root,
|
||
resolve_mode,
|
||
image_exts,
|
||
batch_summaries,
|
||
total_frames,
|
||
results,
|
||
):
|
||
return {
|
||
"generated_at": now_str(),
|
||
"target_path": str(target_path.resolve()),
|
||
"batch_root": str(batch_root.resolve()),
|
||
"output_root": str(output_root.resolve()),
|
||
"resolve_mode": resolve_mode,
|
||
"batch_glob": args.batch_glob,
|
||
"archive_format": args.archive_format,
|
||
"archive_backend_requested": args.archive_backend,
|
||
"archive_backend_resolved": args.archive_backend_resolved,
|
||
"frames_per_part": args.frames_per_part,
|
||
"image_exts": list(image_exts),
|
||
"check_pairs": args.check_pairs,
|
||
"sample_mismatch_limit": args.sample_mismatch_limit,
|
||
"part_workers": args.part_workers,
|
||
"archive_workers": args.archive_workers,
|
||
"skip_existing": args.skip_existing,
|
||
"checksum": args.checksum,
|
||
"verify_archive": args.verify_archive,
|
||
"remove_source_after_success": args.remove_source_after_success,
|
||
"dry_run": args.dry_run,
|
||
"total_frames": total_frames,
|
||
"total_batches": len(batch_summaries),
|
||
"batch_summaries": batch_summaries,
|
||
"summary": summarize_results(results),
|
||
"parts": results,
|
||
}
|
||
|
||
|
||
def main():
|
||
try:
|
||
args = parse_args()
|
||
if args.frames_per_part <= 0:
|
||
raise ValueError("frames-per-part 必须大于 0")
|
||
if args.scan_workers <= 0:
|
||
raise ValueError("scan-workers 必须大于 0")
|
||
if args.archive_workers <= 0:
|
||
raise ValueError("archive-workers 必须大于 0")
|
||
if args.part_workers <= 0:
|
||
raise ValueError("part-workers 必须大于 0")
|
||
if not 0 <= args.gzip_compresslevel <= 9:
|
||
raise ValueError("gzip-compresslevel 必须在 0 到 9 之间")
|
||
|
||
image_exts = normalize_image_exts(args.image_exts)
|
||
args.archive_backend_resolved = resolve_archive_backend(
|
||
args.archive_backend,
|
||
args.archive_format,
|
||
args.gzip_compresslevel,
|
||
)
|
||
|
||
target_path = Path(args.target_path).resolve()
|
||
batch_root, batch_dirs, resolve_mode = resolve_batch_dirs(target_path, args.batch_glob)
|
||
output_root = Path(args.output_root).resolve() if args.output_root else default_output_root(batch_root)
|
||
ensure_safe_output_root(batch_root, output_root)
|
||
output_root.mkdir(parents=True, exist_ok=True)
|
||
for modality in MODALITIES:
|
||
(output_root / modality).mkdir(parents=True, exist_ok=True)
|
||
|
||
log("")
|
||
log("######################################################################")
|
||
log("# Package visualization batches")
|
||
log("######################################################################")
|
||
log(f"Mode : {resolve_mode}")
|
||
log(f"Target : {target_path}")
|
||
log(f"Batch root : {batch_root}")
|
||
log(f"Output root : {output_root}")
|
||
log(f"Archive format : {args.archive_format}")
|
||
log(
|
||
f"Archive backend: {args.archive_backend_resolved} "
|
||
f"(requested={args.archive_backend})"
|
||
)
|
||
log(f"Frames/part : {args.frames_per_part}")
|
||
log(f"Scan workers : {args.scan_workers}")
|
||
log(f"Archive workers: {args.archive_workers}")
|
||
log(f"Part workers : {args.part_workers}")
|
||
log(f"Gzip level : {args.gzip_compresslevel}")
|
||
log(f"Check pairs : {args.check_pairs}")
|
||
log(f"Skip existing : {args.skip_existing}")
|
||
log(f"Checksum : {args.checksum}")
|
||
log(f"Verify archive : {args.verify_archive}")
|
||
log(f"Remove source : {args.remove_source_after_success}")
|
||
log(f"Dry run : {args.dry_run}")
|
||
log(f"Total batches : {len(batch_dirs)}")
|
||
|
||
entries, batch_summaries = build_global_frame_index(
|
||
batch_dirs,
|
||
image_exts,
|
||
check_pairs=args.check_pairs,
|
||
sample_mismatch_limit=args.sample_mismatch_limit,
|
||
scan_workers=args.scan_workers,
|
||
)
|
||
if not entries:
|
||
raise ValueError("未找到可用于打包的 2D/3D 配对帧")
|
||
|
||
total_parts = count_parts(len(entries), args.frames_per_part)
|
||
log(f"Total frames : {len(entries)}")
|
||
log(f"Total parts : {total_parts}")
|
||
|
||
results = package_parts(entries, batch_root, output_root, args)
|
||
|
||
overall_manifest = build_overall_manifest(
|
||
args,
|
||
target_path,
|
||
batch_root,
|
||
output_root,
|
||
resolve_mode,
|
||
image_exts,
|
||
batch_summaries,
|
||
len(entries),
|
||
results,
|
||
)
|
||
overall_manifest_path = output_root / DEFAULT_MANIFEST_NAME
|
||
write_json(overall_manifest_path, overall_manifest)
|
||
|
||
summary = overall_manifest["summary"]
|
||
log("")
|
||
log(
|
||
"[DONE] total_parts={total} status_counts={status_counts}".format(
|
||
total=summary["total_parts"],
|
||
status_counts=summary["status_counts"],
|
||
)
|
||
)
|
||
log(f"Manifest: {overall_manifest_path}")
|
||
|
||
failed_statuses = {"failed"}
|
||
has_failure = any(item["status"] in failed_statuses for item in results)
|
||
|
||
if args.remove_source_after_success and not args.dry_run and not has_failure:
|
||
for batch_dir in batch_dirs:
|
||
if batch_dir.exists():
|
||
shutil.rmtree(batch_dir)
|
||
|
||
return 1 if has_failure else 0
|
||
except Exception as exc:
|
||
log(f"[ERROR] {exc}")
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|