Files
yolov26_3d/tools/convert_gt_to_label/package_visualization_batches.py
2026-06-24 09:35:46 +08:00

1054 lines
35 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# coding: utf-8
import argparse
import hashlib
import json
import os
import shutil
import subprocess
import tarfile
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from threading import Lock
from time import perf_counter
from count_visualization_batches import (
DEFAULT_IMAGE_EXTS,
DEFAULT_SHOWPATH_ROOT,
collect_image_stats,
normalize_image_exts,
)
DEFAULT_BATCH_GLOB = "batch_*"
DEFAULT_ARCHIVE_FORMAT = "tar.gz"
DEFAULT_ARCHIVE_BACKEND = "auto"
DEFAULT_MANIFEST_NAME = "archives_manifest.json"
DEFAULT_FRAMES_PER_PART = 2000
MODALITIES = ("2D", "3D")
LOG_LOCK = Lock()
@dataclass(frozen=True)
class FrameEntry:
batch_name: str
relpath: str
path_2d: Path
path_3d: Path
def log(message=""):
with LOG_LOCK:
print(message, flush=True)
def default_scan_workers():
return max(1, min(8, os.cpu_count() or 1))
def default_archive_workers():
return len(MODALITIES)
def default_part_workers():
return 1
def parse_args():
parser = argparse.ArgumentParser(
description="汇总所有 batch 的 2D/3D 可视化结果,并按固定帧数分片打包。"
)
parser.add_argument(
"target_path",
nargs="?",
default=DEFAULT_SHOWPATH_ROOT,
help="可视化 batch 根目录,或单个 batch 目录。",
)
parser.add_argument(
"output_root",
nargs="?",
default=None,
help="归档输出目录。默认输出到 <batch_root> 的并列目录 <name>_archives。",
)
parser.add_argument(
"--batch-glob",
default=DEFAULT_BATCH_GLOB,
help="batch 根目录模式下用于查找 batch 的 glob默认 batch_*。",
)
parser.add_argument(
"--archive-format",
choices=["tar", "tar.gz"],
default=DEFAULT_ARCHIVE_FORMAT,
help="归档格式,默认 tar.gz。",
)
parser.add_argument(
"--frames-per-part",
type=int,
default=DEFAULT_FRAMES_PER_PART,
help="每个 part 包含多少帧,默认 2000。",
)
parser.add_argument(
"--scan-workers",
type=int,
default=default_scan_workers(),
help="扫描 batch 的并发线程数,默认按机器核数自适应,最多 8。",
)
parser.add_argument(
"--archive-workers",
type=int,
default=default_archive_workers(),
help="单个 part 内归档并发数,默认 2同时处理 2D 和 3D。",
)
parser.add_argument(
"--part-workers",
type=int,
default=default_part_workers(),
help="同时处理多少个 part默认 1每个 part 内仍会按 archive-workers 并发处理 2D/3D。",
)
parser.add_argument(
"--archive-backend",
choices=["auto", "python", "system_tar"],
default=DEFAULT_ARCHIVE_BACKEND,
help="归档实现后端,默认 auto优先使用系统 tar/gzip以提升大批量打包速度。",
)
parser.add_argument(
"--gzip-compresslevel",
type=int,
default=1,
help="tar.gz 压缩级别,范围 0-9默认 1 以优先速度。",
)
parser.add_argument(
"--image-exts",
default=DEFAULT_IMAGE_EXTS,
help="参与校验统计的图片扩展名,逗号分隔,默认 .jpg,.jpeg,.png。",
)
parser.add_argument(
"--check-pairs",
action="store_true",
default=True,
help="打包前校验 2D/3D 相对路径是否一一对应,默认开启。",
)
parser.add_argument(
"--no-check-pairs",
dest="check_pairs",
action="store_false",
help="关闭 2D/3D 一一对应校验,仅分别收集文件。",
)
parser.add_argument(
"--sample-mismatch-limit",
type=int,
default=10,
help="2D/3D 配对校验失败时manifest 中最多保留多少条样例。",
)
parser.add_argument(
"--skip-existing",
action="store_true",
help="若目标 part 归档已存在且大小大于 0则跳过该 part。",
)
parser.add_argument(
"--checksum",
action="store_true",
help="归档完成后计算 sha256。",
)
parser.add_argument(
"--verify-archive",
action="store_true",
default=True,
help="归档完成后快速校验非空且可读取,默认开启。",
)
parser.add_argument(
"--no-verify-archive",
dest="verify_archive",
action="store_false",
help="关闭归档完成后的快速校验,以进一步提速。",
)
parser.add_argument(
"--remove-source-after-success",
action="store_true",
help="所有归档全部成功后删除源 batch 目录。默认关闭,请谨慎使用。",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="只做校验和计划输出,不实际创建归档或删除源目录。",
)
return parser.parse_args()
def now_str():
return datetime.now().isoformat(timespec="seconds")
def is_relative_to(path, other):
try:
path.relative_to(other)
return True
except ValueError:
return False
def is_batch_dir(path):
return path.is_dir() and ((path / "2D").is_dir() or (path / "3D").is_dir())
def resolve_batch_dirs(target_path, batch_glob):
if is_batch_dir(target_path):
return target_path.parent, [target_path], "single_batch"
if not target_path.is_dir():
raise FileNotFoundError(f"target path does not exist or is not a directory: {target_path}")
batch_dirs = sorted(path for path in target_path.glob(batch_glob) if is_batch_dir(path))
if not batch_dirs:
raise FileNotFoundError(
f"no batch directories found under {target_path} with batch_glob={batch_glob}"
)
return target_path, batch_dirs, "batch_root"
def default_output_root(batch_root):
return batch_root.parent / f"{batch_root.name}_archives"
def archive_suffix(archive_format):
return "tar.gz" if archive_format == "tar.gz" else "tar"
def archive_basename(archive_path):
name = archive_path.name
if name.endswith(".tar.gz"):
return name[: -len(".tar.gz")]
if name.endswith(".tar"):
return name[: -len(".tar")]
return archive_path.stem
def build_archive_path(output_root, modality, part_index, archive_format):
return output_root / modality / f"part_{part_index:04d}.{archive_suffix(archive_format)}"
def write_json(path, data):
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def ensure_safe_output_root(root_dir, output_root):
batch_dir_resolved = root_dir.resolve()
output_root_resolved = output_root.resolve()
if is_relative_to(output_root_resolved, batch_dir_resolved):
raise ValueError(
"output_root 不允许位于源目录内部,"
f"source_root={batch_dir_resolved}, output_root={output_root_resolved}"
)
def supports_system_tar_backend(archive_format, gzip_compresslevel):
if shutil.which("tar") is None:
return False
if archive_format != "tar.gz":
return True
if gzip_compresslevel == 0:
return False
return shutil.which("gzip") is not None
def resolve_archive_backend(requested_backend, archive_format, gzip_compresslevel):
system_tar_supported = supports_system_tar_backend(
archive_format,
gzip_compresslevel,
)
if requested_backend == "auto":
return "system_tar" if system_tar_supported else "python"
if requested_backend == "system_tar":
if shutil.which("tar") is None:
raise RuntimeError("archive-backend=system_tar 需要系统 tar 命令。")
if archive_format == "tar.gz" and shutil.which("gzip") is None:
raise RuntimeError("archive-backend=system_tar 在 tar.gz 模式下需要 gzip 命令。")
if archive_format == "tar.gz" and gzip_compresslevel == 0:
raise ValueError(
"archive-backend=system_tar 当前不支持 gzip-compresslevel=0"
"请改用 python 后端或将压缩级别设为 1-9。"
)
return "system_tar"
return "python"
def open_tarfile(archive_path, archive_format, gzip_compresslevel):
mode = "w:gz" if archive_format == "tar.gz" else "w"
if archive_format == "tar.gz":
return tarfile.open(archive_path, mode=mode, compresslevel=gzip_compresslevel)
return tarfile.open(archive_path, mode=mode)
def build_archive_member_dir(archive_path):
return f"{archive_basename(archive_path)}/images"
def build_archive_entry_name(archive_path, source_path):
return f"{build_archive_member_dir(archive_path)}/{source_path.name}"
def ensure_unique_archive_names(source_paths, archive_path):
seen_names = {}
for source_path in source_paths:
archive_name = build_archive_entry_name(archive_path, source_path)
if archive_name in seen_names:
raise ValueError(
"archive entry name duplicated: "
f"{archive_name}, first={seen_names[archive_name]}, duplicate={source_path}"
)
seen_names[archive_name] = source_path
def build_relative_source_paths(source_paths, source_root):
source_root = source_root.resolve()
relative_paths = []
for source_path in source_paths:
try:
relative_path = source_path.relative_to(source_root)
except ValueError as exc:
raise ValueError(
f"source path is outside source_root: source_root={source_root}, path={source_path}"
) from exc
relative_paths.append(relative_path.as_posix())
return relative_paths
def decode_stderr(stderr_bytes):
if not stderr_bytes:
return ""
return stderr_bytes.decode("utf-8", errors="replace").strip()
def escape_tar_transform_replacement(value):
return value.replace("\\", "\\\\").replace("&", r"\&").replace("|", r"\|")
def create_archive_with_python_tar(
source_paths,
archive_path,
archive_format,
gzip_compresslevel,
):
archive_path.parent.mkdir(parents=True, exist_ok=True)
partial_archive = archive_path.with_name(f"{archive_path.name}.partial")
if partial_archive.exists():
partial_archive.unlink()
ensure_unique_archive_names(source_paths, archive_path)
try:
with open_tarfile(partial_archive, archive_format, gzip_compresslevel) as tar_obj:
for source_path in source_paths:
tar_obj.add(
source_path,
arcname=build_archive_entry_name(archive_path, source_path),
recursive=False,
)
partial_archive.replace(archive_path)
finally:
if partial_archive.exists():
partial_archive.unlink()
def create_archive_with_system_tar(
source_paths,
archive_path,
source_root,
archive_format,
gzip_compresslevel,
):
archive_path.parent.mkdir(parents=True, exist_ok=True)
partial_archive = archive_path.with_name(f"{archive_path.name}.partial")
if partial_archive.exists():
partial_archive.unlink()
ensure_unique_archive_names(source_paths, archive_path)
relative_paths = build_relative_source_paths(source_paths, source_root)
relative_paths_input = b"".join(os.fsencode(path) + b"\0" for path in relative_paths)
archive_member_dir = escape_tar_transform_replacement(build_archive_member_dir(archive_path))
transform_rule = f"s|.*/||;s|^|{archive_member_dir}/|"
try:
if archive_format == "tar":
tar_cmd = [
"tar",
"-cf",
str(partial_archive),
"--directory",
str(source_root),
"--transform",
transform_rule,
"--no-recursion",
"--null",
"--files-from",
"-",
]
completed = subprocess.run(
tar_cmd,
input=relative_paths_input,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
check=False,
)
if completed.returncode != 0:
raise RuntimeError(
"system tar failed: "
f"{decode_stderr(completed.stderr) or 'unknown error'}"
)
else:
tar_cmd = [
"tar",
"-cf",
"-",
"--directory",
str(source_root),
"--transform",
transform_rule,
"--no-recursion",
"--null",
"--files-from",
"-",
]
gzip_cmd = ["gzip", f"-{gzip_compresslevel}", "-c"]
with open(partial_archive, "wb") as archive_file:
tar_proc = subprocess.Popen(
tar_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
gzip_proc = subprocess.Popen(
gzip_cmd,
stdin=tar_proc.stdout,
stdout=archive_file,
stderr=subprocess.PIPE,
)
except Exception:
tar_proc.kill()
tar_proc.wait()
raise
tar_proc.stdout.close()
_, tar_stderr = tar_proc.communicate(input=relative_paths_input)
_, gzip_stderr = gzip_proc.communicate()
if tar_proc.returncode != 0:
raise RuntimeError(
"system tar failed: "
f"{decode_stderr(tar_stderr) or 'unknown error'}"
)
if gzip_proc.returncode != 0:
raise RuntimeError(
"gzip failed: "
f"{decode_stderr(gzip_stderr) or 'unknown error'}"
)
partial_archive.replace(archive_path)
finally:
if partial_archive.exists():
partial_archive.unlink()
def create_archive(
source_paths,
archive_path,
source_root,
archive_backend,
archive_format,
gzip_compresslevel,
):
if archive_backend == "system_tar":
create_archive_with_system_tar(
source_paths,
archive_path,
source_root,
archive_format,
gzip_compresslevel,
)
return
create_archive_with_python_tar(
source_paths,
archive_path,
archive_format,
gzip_compresslevel,
)
def verify_archive(archive_path):
if not archive_path.exists():
raise FileNotFoundError(f"archive not found: {archive_path}")
if archive_path.stat().st_size <= 0:
raise ValueError(f"archive size is 0: {archive_path}")
with tarfile.open(archive_path, mode="r:*") as tar_obj:
first_member = tar_obj.next()
if first_member is None:
raise ValueError(f"archive is empty: {archive_path}")
def compute_sha256(file_path):
digest = hashlib.sha256()
with open(file_path, "rb") as f:
while True:
chunk = f.read(1024 * 1024)
if not chunk:
break
digest.update(chunk)
return digest.hexdigest()
def part_manifest_path(output_root, part_index):
return output_root / f"part_{part_index:04d}.manifest.json"
def build_part_manifest(
args,
part_index,
archive_paths,
entries,
status,
error=None,
archive_sha256=None,
):
archive_details = {}
for modality, archive_path in archive_paths.items():
archive_exists = archive_path.exists()
archive_details[modality] = {
"archive_path": str(archive_path.resolve()),
"archive_exists": archive_exists,
"archive_size_bytes": archive_path.stat().st_size if archive_exists else 0,
"archive_sha256": archive_sha256.get(modality) if archive_sha256 else None,
}
batch_distribution = Counter(entry.batch_name for entry in entries)
return {
"created_at": now_str(),
"part_name": f"part_{part_index:04d}",
"part_index": part_index,
"archive_format": args.archive_format,
"archive_backend": args.archive_backend_resolved,
"frames_per_part": args.frames_per_part,
"frame_count": len(entries),
"archive_details": archive_details,
"status": status,
"dry_run": args.dry_run,
"check_pairs": args.check_pairs,
"verify_archive": args.verify_archive,
"batch_distribution": dict(sorted(batch_distribution.items())),
"frame_samples": [entry.relpath for entry in entries[: min(5, len(entries))]],
"error": error,
}
def summarize_results(results):
status_counter = Counter(item["status"] for item in results)
return {
"total_parts": len(results),
"status_counts": dict(status_counter),
}
def collect_batch_relpaths(batch_dir, image_exts):
modality_relpaths = {}
modality_counts = {}
for modality in MODALITIES:
image_dir = batch_dir / modality
count, relpaths = collect_image_stats(image_dir, image_exts, collect_relpaths=True)
modality_counts[modality] = count
modality_relpaths[modality] = relpaths or set()
return modality_relpaths, modality_counts
def collect_frame_entries(batch_dir, image_exts, check_pairs, sample_mismatch_limit):
modality_relpaths, modality_counts = collect_batch_relpaths(batch_dir, image_exts)
dir_2d = batch_dir / "2D"
dir_3d = batch_dir / "3D"
relpaths_2d = modality_relpaths["2D"]
relpaths_3d = modality_relpaths["3D"]
only_in_2d = sorted(relpaths_2d - relpaths_3d)
only_in_3d = sorted(relpaths_3d - relpaths_2d)
paired_relpaths = sorted(relpaths_2d & relpaths_3d)
if check_pairs and (only_in_2d or only_in_3d):
raise ValueError(
"2D/3D 配对校验失败: "
f"batch={batch_dir.name}, only_in_2d={len(only_in_2d)}, only_in_3d={len(only_in_3d)}, "
f"only_in_2d_samples={only_in_2d[:sample_mismatch_limit]}, "
f"only_in_3d_samples={only_in_3d[:sample_mismatch_limit]}"
)
if check_pairs:
selected_relpaths = paired_relpaths
else:
selected_relpaths = sorted(relpaths_2d | relpaths_3d)
if check_pairs:
entries = [
FrameEntry(
batch_name=batch_dir.name,
relpath=relpath,
path_2d=dir_2d / relpath,
path_3d=dir_3d / relpath,
)
for relpath in selected_relpaths
]
else:
entries = []
for relpath in selected_relpaths:
path_2d = dir_2d / relpath
path_3d = dir_3d / relpath
if not path_2d.is_file() or not path_3d.is_file():
continue
entries.append(
FrameEntry(
batch_name=batch_dir.name,
relpath=relpath,
path_2d=path_2d,
path_3d=path_3d,
)
)
batch_summary = {
"batch_name": batch_dir.name,
"count_2d": modality_counts["2D"],
"count_3d": modality_counts["3D"],
"paired_count": len(paired_relpaths),
"selected_count": len(entries),
"only_in_2d_count": len(only_in_2d),
"only_in_3d_count": len(only_in_3d),
"only_in_2d_samples": only_in_2d[:sample_mismatch_limit],
"only_in_3d_samples": only_in_3d[:sample_mismatch_limit],
}
return entries, batch_summary
def scan_single_batch(
batch_index,
total_batches,
batch_dir,
image_exts,
check_pairs,
sample_mismatch_limit,
):
batch_started = perf_counter()
log(f"[SCAN] ({batch_index}/{total_batches}) {batch_dir.name}")
batch_entries, batch_summary = collect_frame_entries(
batch_dir,
image_exts,
check_pairs=check_pairs,
sample_mismatch_limit=sample_mismatch_limit,
)
return batch_index, batch_entries, batch_summary, perf_counter() - batch_started
def build_global_frame_index(batch_dirs, image_exts, check_pairs, sample_mismatch_limit, scan_workers):
entries = []
batch_summaries = []
total_batches = len(batch_dirs)
scan_started = perf_counter()
cumulative_entries = 0
if scan_workers <= 1 or total_batches <= 1:
ordered_entries = []
ordered_summaries = []
for batch_index, batch_dir in enumerate(batch_dirs, start=1):
batch_index, batch_entries, batch_summary, elapsed = scan_single_batch(
batch_index,
total_batches,
batch_dir,
image_exts,
check_pairs,
sample_mismatch_limit,
)
ordered_entries.append(batch_entries)
ordered_summaries.append(batch_summary)
cumulative_entries += len(batch_entries)
log(
f"[SCAN DONE] ({batch_index}/{total_batches}) {batch_dir.name} "
f"2D={batch_summary['count_2d']} 3D={batch_summary['count_3d']} "
f"paired={batch_summary['paired_count']} selected={batch_summary['selected_count']} "
f"elapsed={elapsed:.1f}s cumulative={cumulative_entries}"
)
else:
ordered_entries = [None] * total_batches
ordered_summaries = [None] * total_batches
max_workers = min(scan_workers, total_batches)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(
scan_single_batch,
batch_index,
total_batches,
batch_dir,
image_exts,
check_pairs,
sample_mismatch_limit,
): (batch_index, batch_dir)
for batch_index, batch_dir in enumerate(batch_dirs, start=1)
}
for future in as_completed(future_map):
batch_index, batch_dir = future_map[future]
_, batch_entries, batch_summary, elapsed = future.result()
ordered_entries[batch_index - 1] = batch_entries
ordered_summaries[batch_index - 1] = batch_summary
cumulative_entries += len(batch_entries)
log(
f"[SCAN DONE] ({batch_index}/{total_batches}) {batch_dir.name} "
f"2D={batch_summary['count_2d']} 3D={batch_summary['count_3d']} "
f"paired={batch_summary['paired_count']} selected={batch_summary['selected_count']} "
f"elapsed={elapsed:.1f}s cumulative={cumulative_entries}"
)
for batch_entries in ordered_entries:
entries.extend(batch_entries)
batch_summaries.extend(ordered_summaries)
log(
f"[SCAN SUMMARY] batches={total_batches} total_frames={len(entries)} "
f"elapsed={perf_counter() - scan_started:.1f}s"
)
return entries, batch_summaries
def count_parts(total_entries, frames_per_part):
return (total_entries + frames_per_part - 1) // frames_per_part
def iter_entry_chunks(entries, frames_per_part):
for index in range(0, len(entries), frames_per_part):
yield entries[index : index + frames_per_part]
def package_modality_archive(modality, source_paths, archive_path, source_root, args):
create_archive(
source_paths,
archive_path,
source_root,
args.archive_backend_resolved,
args.archive_format,
args.gzip_compresslevel,
)
if args.verify_archive:
verify_archive(archive_path)
archive_sha256 = compute_sha256(archive_path) if args.checksum else None
return modality, archive_sha256
def process_part(part_index, entries, source_root, output_root, args):
archive_paths = {
modality: build_archive_path(output_root, modality, part_index, args.archive_format)
for modality in MODALITIES
}
part_started = perf_counter()
header_lines = [
"",
"==========================================",
f"Part : part_{part_index:04d}",
f"Frame cnt : {len(entries)}",
]
for modality in MODALITIES:
header_lines.append(f"{modality} archive: {archive_paths[modality]}")
header_lines.append("==========================================")
log("\n".join(header_lines))
if args.skip_existing and all(
archive_path.exists() and archive_path.stat().st_size > 0
for archive_path in archive_paths.values()
):
archive_sha256 = {
modality: compute_sha256(archive_path) if args.checksum else None
for modality, archive_path in archive_paths.items()
}
manifest = build_part_manifest(
args,
part_index,
archive_paths,
entries,
status="skipped_existing",
archive_sha256=archive_sha256,
)
write_json(part_manifest_path(output_root, part_index), manifest)
log(
f"[SKIP] archives already exist for part_{part_index:04d} "
f"elapsed={perf_counter() - part_started:.1f}s"
)
return manifest
if args.dry_run:
manifest = build_part_manifest(
args,
part_index,
archive_paths,
entries,
status="dry_run",
)
write_json(part_manifest_path(output_root, part_index), manifest)
log(
f"[DRY RUN] would create archives for part_{part_index:04d} "
f"elapsed={perf_counter() - part_started:.1f}s"
)
return manifest
archive_sha256 = {}
try:
source_paths = {
"2D": [entry.path_2d for entry in entries],
"3D": [entry.path_3d for entry in entries],
}
max_workers = min(max(1, args.archive_workers), len(MODALITIES))
if max_workers <= 1:
for modality in MODALITIES:
_, archive_sha = package_modality_archive(
modality,
source_paths[modality],
archive_paths[modality],
source_root,
args,
)
archive_sha256[modality] = archive_sha
else:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(
package_modality_archive,
modality,
source_paths[modality],
archive_paths[modality],
source_root,
args,
): modality
for modality in MODALITIES
}
for future in as_completed(future_map):
modality, archive_sha = future.result()
archive_sha256[modality] = archive_sha
manifest = build_part_manifest(
args,
part_index,
archive_paths,
entries,
status="packaged",
archive_sha256=archive_sha256,
)
write_json(part_manifest_path(output_root, part_index), manifest)
log(f"[OK] packaged part_{part_index:04d} elapsed={perf_counter() - part_started:.1f}s")
return manifest
except Exception as exc:
manifest = build_part_manifest(
args,
part_index,
archive_paths,
entries,
status="failed",
error=str(exc),
)
write_json(part_manifest_path(output_root, part_index), manifest)
log(f"[FAIL] part_{part_index:04d}: {exc}")
return manifest
def package_parts(entries, source_root, output_root, args):
part_specs = [
(
part_index,
entries[start_index : start_index + args.frames_per_part],
)
for part_index, start_index in enumerate(
range(0, len(entries), args.frames_per_part),
start=1,
)
]
total_parts = len(part_specs)
if args.part_workers <= 1 or total_parts <= 1:
return [
process_part(part_index, part_entries, source_root, output_root, args)
for part_index, part_entries in part_specs
]
ordered_results = [None] * total_parts
max_workers = min(args.part_workers, total_parts)
log(f"[PACKAGE] part_workers={max_workers} archive_workers={args.archive_workers}")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(
process_part,
part_index,
part_entries,
source_root,
output_root,
args,
): part_index
for part_index, part_entries in part_specs
}
for future in as_completed(future_map):
part_index = future_map[future]
ordered_results[part_index - 1] = future.result()
return ordered_results
def build_overall_manifest(
args,
target_path,
batch_root,
output_root,
resolve_mode,
image_exts,
batch_summaries,
total_frames,
results,
):
return {
"generated_at": now_str(),
"target_path": str(target_path.resolve()),
"batch_root": str(batch_root.resolve()),
"output_root": str(output_root.resolve()),
"resolve_mode": resolve_mode,
"batch_glob": args.batch_glob,
"archive_format": args.archive_format,
"archive_backend_requested": args.archive_backend,
"archive_backend_resolved": args.archive_backend_resolved,
"frames_per_part": args.frames_per_part,
"image_exts": list(image_exts),
"check_pairs": args.check_pairs,
"sample_mismatch_limit": args.sample_mismatch_limit,
"part_workers": args.part_workers,
"archive_workers": args.archive_workers,
"skip_existing": args.skip_existing,
"checksum": args.checksum,
"verify_archive": args.verify_archive,
"remove_source_after_success": args.remove_source_after_success,
"dry_run": args.dry_run,
"total_frames": total_frames,
"total_batches": len(batch_summaries),
"batch_summaries": batch_summaries,
"summary": summarize_results(results),
"parts": results,
}
def main():
try:
args = parse_args()
if args.frames_per_part <= 0:
raise ValueError("frames-per-part 必须大于 0")
if args.scan_workers <= 0:
raise ValueError("scan-workers 必须大于 0")
if args.archive_workers <= 0:
raise ValueError("archive-workers 必须大于 0")
if args.part_workers <= 0:
raise ValueError("part-workers 必须大于 0")
if not 0 <= args.gzip_compresslevel <= 9:
raise ValueError("gzip-compresslevel 必须在 0 到 9 之间")
image_exts = normalize_image_exts(args.image_exts)
args.archive_backend_resolved = resolve_archive_backend(
args.archive_backend,
args.archive_format,
args.gzip_compresslevel,
)
target_path = Path(args.target_path).resolve()
batch_root, batch_dirs, resolve_mode = resolve_batch_dirs(target_path, args.batch_glob)
output_root = Path(args.output_root).resolve() if args.output_root else default_output_root(batch_root)
ensure_safe_output_root(batch_root, output_root)
output_root.mkdir(parents=True, exist_ok=True)
for modality in MODALITIES:
(output_root / modality).mkdir(parents=True, exist_ok=True)
log("")
log("######################################################################")
log("# Package visualization batches")
log("######################################################################")
log(f"Mode : {resolve_mode}")
log(f"Target : {target_path}")
log(f"Batch root : {batch_root}")
log(f"Output root : {output_root}")
log(f"Archive format : {args.archive_format}")
log(
f"Archive backend: {args.archive_backend_resolved} "
f"(requested={args.archive_backend})"
)
log(f"Frames/part : {args.frames_per_part}")
log(f"Scan workers : {args.scan_workers}")
log(f"Archive workers: {args.archive_workers}")
log(f"Part workers : {args.part_workers}")
log(f"Gzip level : {args.gzip_compresslevel}")
log(f"Check pairs : {args.check_pairs}")
log(f"Skip existing : {args.skip_existing}")
log(f"Checksum : {args.checksum}")
log(f"Verify archive : {args.verify_archive}")
log(f"Remove source : {args.remove_source_after_success}")
log(f"Dry run : {args.dry_run}")
log(f"Total batches : {len(batch_dirs)}")
entries, batch_summaries = build_global_frame_index(
batch_dirs,
image_exts,
check_pairs=args.check_pairs,
sample_mismatch_limit=args.sample_mismatch_limit,
scan_workers=args.scan_workers,
)
if not entries:
raise ValueError("未找到可用于打包的 2D/3D 配对帧")
total_parts = count_parts(len(entries), args.frames_per_part)
log(f"Total frames : {len(entries)}")
log(f"Total parts : {total_parts}")
results = package_parts(entries, batch_root, output_root, args)
overall_manifest = build_overall_manifest(
args,
target_path,
batch_root,
output_root,
resolve_mode,
image_exts,
batch_summaries,
len(entries),
results,
)
overall_manifest_path = output_root / DEFAULT_MANIFEST_NAME
write_json(overall_manifest_path, overall_manifest)
summary = overall_manifest["summary"]
log("")
log(
"[DONE] total_parts={total} status_counts={status_counts}".format(
total=summary["total_parts"],
status_counts=summary["status_counts"],
)
)
log(f"Manifest: {overall_manifest_path}")
failed_statuses = {"failed"}
has_failure = any(item["status"] in failed_statuses for item in results)
if args.remove_source_after_success and not args.dry_run and not has_failure:
for batch_dir in batch_dirs:
if batch_dir.exists():
shutil.rmtree(batch_dir)
return 1 if has_failure else 0
except Exception as exc:
log(f"[ERROR] {exc}")
return 1
if __name__ == "__main__":
raise SystemExit(main())