Files
yolov26_3d/tools/feishu_project/run_issue_data_inference.py
2026-06-24 09:35:46 +08:00

722 lines
27 KiB
Python
Executable File

#!/usr/bin/env python3
"""Batch inference runner for downloaded Feishu issue data."""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterable
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_DOWNLOAD_ROOT = Path("/data1/dongying/Mono3d/G1Q3/feishu_project/downloaded_issue_data")
DEFAULT_OUTPUT_ROOT = Path("/data1/dongying/Mono3d/G1Q3/feishu_project/inference_issue_data")
DEFAULT_INFERENCE_SCRIPT = ROOT / "tools" / "model_inference" / "core" / "run_two_roi_exported_onnx_infer.py"
DEFAULT_PYTHON_BIN = Path("/deeplearning_team/ydong/dongying/miniconda/envs/dev/bin/python")
ISSUE_DIR_RE = re.compile(r"^issue_(\d+)$")
SIGNED_INTEGER_RE = re.compile(r"^[+-]?\d+$")
FRAME_ID_CAMERA4_RE = re.compile(r"camera4\s*:\s*([+-]?\d+)", re.IGNORECASE)
FRAME_ID_ANY_CAMERA_RE = re.compile(r"(camera\d+)\s*:\s*([+-]?\d+)", re.IGNORECASE)
@dataclass(frozen=True)
class TargetFrame:
camera: str
frame_id: int
raw_text: str
@dataclass(frozen=True)
class InferenceCase:
issue_id: int
issue_dir: Path
case_dir: Path
camera4_bin: Path
relative_case_dir: Path
output_dir: Path
frame_window_source: str
target_frame_text: str | None = None
target_frame_id: int | None = None
requested_frame_index_start: int | None = None
requested_frame_index_end: int | None = None
requested_frame_id_start: int | None = None
requested_frame_id_end: int | None = None
@dataclass
class CaseResult:
issue_id: int
issue_dir: str
case_dir: str
camera4_bin: str
relative_case_dir: str
output_dir: str
status: str
detail: str
command: list[str]
log_path: str | None = None
frame_window_source: str | None = None
target_frame_text: str | None = None
target_frame_id: int | None = None
requested_frame_index_start: int | None = None
requested_frame_index_end: int | None = None
requested_frame_id_start: int | None = None
requested_frame_id_end: int | None = None
def to_dict(self) -> dict:
return {
"issue_id": self.issue_id,
"issue_dir": self.issue_dir,
"case_dir": self.case_dir,
"camera4_bin": self.camera4_bin,
"relative_case_dir": self.relative_case_dir,
"output_dir": self.output_dir,
"status": self.status,
"detail": self.detail,
"command": self.command,
"log_path": self.log_path,
"frame_window_source": self.frame_window_source,
"target_frame_text": self.target_frame_text,
"target_frame_id": self.target_frame_id,
"requested_frame_index_start": self.requested_frame_index_start,
"requested_frame_index_end": self.requested_frame_index_end,
"requested_frame_id_start": self.requested_frame_id_start,
"requested_frame_id_end": self.requested_frame_id_end,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run exported-model inference on all downloaded issue-data camera4.bin cases."
)
parser.add_argument(
"--download-root",
default=str(DEFAULT_DOWNLOAD_ROOT),
help="Root directory produced by download_issue_data.py.",
)
parser.add_argument(
"--output-root",
default=str(DEFAULT_OUTPUT_ROOT),
help="Root directory where inference outputs will be written.",
)
parser.add_argument(
"--manifest-path",
default="",
help="Optional explicit manifest JSON path. Defaults to <output-root>/inference_manifest.json",
)
parser.add_argument(
"--python-bin",
default=str(DEFAULT_PYTHON_BIN),
help="Python interpreter used to launch run_two_roi_exported_onnx_infer.py.",
)
parser.add_argument(
"--inference-script",
default=str(DEFAULT_INFERENCE_SCRIPT),
help="Path to run_two_roi_exported_onnx_infer.py.",
)
parser.add_argument(
"--issue-json",
default="",
help="Optional Feishu issue export JSON used to resolve 问题发生frameid windows.",
)
parser.add_argument(
"--issue-id",
action="append",
dest="issue_ids",
type=int,
help="Optional issue id filter. Can be repeated.",
)
parser.add_argument("--video-stride", type=int, default=1)
parser.add_argument("--max-images", type=int, default=0)
parser.add_argument("--frame-index-start", type=int, default=None)
parser.add_argument("--frame-index-end", type=int, default=None)
parser.add_argument("--frame-id-start", type=int, default=None)
parser.add_argument("--frame-id-end", type=int, default=None)
parser.add_argument("--target-frame-id", type=int, default=None)
parser.add_argument("--frame-before", type=int, default=100)
parser.add_argument("--frame-after", type=int, default=100)
parser.add_argument("--use-issue-frame-window", action="store_true")
parser.add_argument(
"--missing-issue-frame-policy",
choices=("full", "skip"),
default="full",
help="How to handle cases without a usable 问题发生frameid when --use-issue-frame-window is enabled.",
)
parser.add_argument("--exported-model", type=str, default="")
parser.add_argument("--device", type=str, default="")
parser.add_argument("--providers", nargs="*", default=None)
parser.add_argument("--enable-attr", action="store_true")
parser.add_argument("--enable-cross-class-merge-prior", action="store_true")
parser.add_argument("--save-aggregate-predictions", action="store_true")
parser.add_argument(
"--inference-arg",
action="append",
default=[],
help="Extra argument forwarded to run_two_roi_exported_onnx_infer.py. Can be repeated.",
)
parser.add_argument("--skip-existing", action="store_true")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if args.frame_before < 0:
parser.error("--frame-before must be greater than or equal to 0")
if args.frame_after < 0:
parser.error("--frame-after must be greater than or equal to 0")
if (
args.frame_index_start is not None
and args.frame_index_end is not None
and args.frame_index_start > args.frame_index_end
):
parser.error("--frame-index-start must be less than or equal to --frame-index-end")
if (
args.frame_id_start is not None
and args.frame_id_end is not None
and args.frame_id_start > args.frame_id_end
):
parser.error("--frame-id-start must be less than or equal to --frame-id-end")
if args.use_issue_frame_window and not args.issue_json:
parser.error("--issue-json is required when --use-issue-frame-window is enabled")
return args
def ensure_dir(path: Path, dry_run: bool) -> None:
if dry_run:
return
path.mkdir(parents=True, exist_ok=True)
def log_progress(message: str) -> None:
timestamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S")
print(f"[run_issue_data_inference {timestamp}] {message}", flush=True)
def compact_text(value: object, max_len: int = 96) -> str:
text = "" if value is None else str(value).strip()
text = re.sub(r"\s+", " ", text)
if len(text) <= max_len:
return text
return f"{text[: max_len - 3]}..."
def parse_issue_id_from_path(path: Path) -> int | None:
for part in path.parts:
match = ISSUE_DIR_RE.match(part)
if match:
return int(match.group(1))
return None
def load_issue_items(path: Path) -> list[dict]:
payload = json.loads(path.read_text(encoding="utf-8"))
return payload["items"]
def parse_target_frame(frame_text: object) -> tuple[TargetFrame | None, str]:
text = "" if frame_text is None else str(frame_text).strip()
if not text:
return None, "missing 问题发生frameid"
camera4_match = FRAME_ID_CAMERA4_RE.search(text)
if camera4_match:
frame_id = int(camera4_match.group(1))
if frame_id <= 0:
return None, f"non-positive 问题发生frameid: {text!r}"
return TargetFrame(camera="camera4", frame_id=frame_id, raw_text=text), ""
if SIGNED_INTEGER_RE.fullmatch(text):
frame_id = int(text)
if frame_id <= 0:
return None, f"non-positive 问题发生frameid: {text!r}"
return TargetFrame(camera="any", frame_id=frame_id, raw_text=text), ""
any_camera_match = FRAME_ID_ANY_CAMERA_RE.search(text)
if any_camera_match:
frame_id = int(any_camera_match.group(2))
if frame_id <= 0:
return None, f"non-positive 问题发生frameid: {text!r}"
return TargetFrame(camera=any_camera_match.group(1).lower(), frame_id=frame_id, raw_text=text), ""
return None, f"unparseable 问题发生frameid: {text!r}"
def build_issue_item_lookup(issue_json: Path) -> dict[int, dict]:
return {int(item["id"]): item for item in load_issue_items(issue_json)}
def has_manual_frame_window(args: argparse.Namespace) -> bool:
return any(
value is not None
for value in (
args.frame_index_start,
args.frame_index_end,
args.frame_id_start,
args.frame_id_end,
args.target_frame_id,
)
)
def build_case_result(
case: InferenceCase,
*,
status: str,
detail: str,
command: list[str] | None = None,
log_path: str | None = None,
) -> CaseResult:
return CaseResult(
issue_id=case.issue_id,
issue_dir=str(case.issue_dir),
case_dir=str(case.case_dir),
camera4_bin=str(case.camera4_bin),
relative_case_dir=str(case.relative_case_dir),
output_dir=str(case.output_dir),
status=status,
detail=detail,
command=command or [],
log_path=log_path,
frame_window_source=case.frame_window_source,
target_frame_text=case.target_frame_text,
target_frame_id=case.target_frame_id,
requested_frame_index_start=case.requested_frame_index_start,
requested_frame_index_end=case.requested_frame_index_end,
requested_frame_id_start=case.requested_frame_id_start,
requested_frame_id_end=case.requested_frame_id_end,
)
def discover_cases(download_root: Path, output_root: Path, issue_filter: set[int] | None) -> list[InferenceCase]:
cases: list[InferenceCase] = []
for camera4_bin in sorted(download_root.rglob("camera4.bin")):
if camera4_bin.parent.name != "sigmastar.1":
continue
try:
relative_camera = camera4_bin.relative_to(download_root)
except ValueError:
continue
issue_id = parse_issue_id_from_path(relative_camera)
if issue_id is None:
continue
if issue_filter and issue_id not in issue_filter:
continue
case_dir = camera4_bin.parent.parent
relative_case_dir = case_dir.relative_to(download_root)
issue_dir = download_root / f"issue_{issue_id}"
output_dir = output_root / relative_case_dir
cases.append(
InferenceCase(
issue_id=issue_id,
issue_dir=issue_dir,
case_dir=case_dir,
camera4_bin=camera4_bin,
relative_case_dir=relative_case_dir,
output_dir=output_dir,
frame_window_source="full",
)
)
return cases
def apply_frame_windows(
cases: list[InferenceCase],
args: argparse.Namespace,
issue_item_lookup: dict[int, dict],
) -> tuple[list[InferenceCase], list[CaseResult]]:
prepared_cases: list[InferenceCase] = []
skipped_results: list[CaseResult] = []
for case in cases:
prepared_case = InferenceCase(
issue_id=case.issue_id,
issue_dir=case.issue_dir,
case_dir=case.case_dir,
camera4_bin=case.camera4_bin,
relative_case_dir=case.relative_case_dir,
output_dir=case.output_dir,
frame_window_source="full",
)
if has_manual_frame_window(args):
requested_frame_id_start = args.frame_id_start
requested_frame_id_end = args.frame_id_end
if args.target_frame_id is not None:
if requested_frame_id_start is None:
requested_frame_id_start = max(0, args.target_frame_id - args.frame_before)
if requested_frame_id_end is None:
requested_frame_id_end = args.target_frame_id + args.frame_after
prepared_case = InferenceCase(
issue_id=case.issue_id,
issue_dir=case.issue_dir,
case_dir=case.case_dir,
camera4_bin=case.camera4_bin,
relative_case_dir=case.relative_case_dir,
output_dir=case.output_dir,
frame_window_source="manual",
target_frame_id=args.target_frame_id,
requested_frame_index_start=args.frame_index_start,
requested_frame_index_end=args.frame_index_end,
requested_frame_id_start=requested_frame_id_start,
requested_frame_id_end=requested_frame_id_end,
)
prepared_cases.append(prepared_case)
continue
if not args.use_issue_frame_window:
prepared_cases.append(prepared_case)
continue
item = issue_item_lookup.get(case.issue_id)
if item is None:
if args.missing_issue_frame_policy == "skip":
skipped_results.append(
build_case_result(
prepared_case,
status="skipped_missing_issue_record",
detail="issue id not found in issue_json",
)
)
continue
prepared_case = InferenceCase(
issue_id=case.issue_id,
issue_dir=case.issue_dir,
case_dir=case.case_dir,
camera4_bin=case.camera4_bin,
relative_case_dir=case.relative_case_dir,
output_dir=case.output_dir,
frame_window_source="full_missing_issue_record",
)
prepared_cases.append(prepared_case)
continue
target_frame, target_frame_error = parse_target_frame(item.get("问题发生frameid"))
if target_frame is None:
if args.missing_issue_frame_policy == "skip":
skipped_results.append(
build_case_result(
prepared_case,
status="skipped_missing_issue_frame",
detail=target_frame_error,
)
)
continue
prepared_case = InferenceCase(
issue_id=case.issue_id,
issue_dir=case.issue_dir,
case_dir=case.case_dir,
camera4_bin=case.camera4_bin,
relative_case_dir=case.relative_case_dir,
output_dir=case.output_dir,
frame_window_source=(
"full_missing_issue_frame"
if target_frame_error.startswith("missing ")
else "full_invalid_issue_frame"
),
)
prepared_cases.append(prepared_case)
continue
if target_frame.camera not in {"camera4", "any"}:
if args.missing_issue_frame_policy == "skip":
skipped_results.append(
build_case_result(
prepared_case,
status="skipped_unsupported_issue_camera",
detail=f"unsupported target camera for window inference: {target_frame.camera}",
)
)
continue
prepared_case = InferenceCase(
issue_id=case.issue_id,
issue_dir=case.issue_dir,
case_dir=case.case_dir,
camera4_bin=case.camera4_bin,
relative_case_dir=case.relative_case_dir,
output_dir=case.output_dir,
frame_window_source="full_unsupported_issue_camera",
target_frame_text=target_frame.raw_text,
)
prepared_cases.append(prepared_case)
continue
prepared_case = InferenceCase(
issue_id=case.issue_id,
issue_dir=case.issue_dir,
case_dir=case.case_dir,
camera4_bin=case.camera4_bin,
relative_case_dir=case.relative_case_dir,
output_dir=case.output_dir,
frame_window_source="issue_frame",
target_frame_text=target_frame.raw_text,
target_frame_id=target_frame.frame_id,
requested_frame_id_start=max(0, target_frame.frame_id - args.frame_before),
requested_frame_id_end=target_frame.frame_id + args.frame_after,
)
prepared_cases.append(prepared_case)
return prepared_cases, skipped_results
def has_existing_outputs(output_dir: Path) -> bool:
if (output_dir / "predictions_merged.json").is_file():
return True
merge_json_dir = output_dir / "predictions" / "merge"
if merge_json_dir.is_dir():
for child in merge_json_dir.iterdir():
if child.is_file():
return True
return False
def build_command(case: InferenceCase, args: argparse.Namespace) -> list[str]:
command = [
str(Path(args.python_bin).resolve()),
str(Path(args.inference_script).resolve()),
"--video-case-dir",
str(case.camera4_bin),
"--output-dir",
str(case.output_dir),
"--video-stride",
str(args.video_stride),
]
if args.max_images > 0:
command.extend(["--max-images", str(args.max_images)])
if case.target_frame_id is not None:
command.extend(["--target-frame-id", str(case.target_frame_id)])
if case.requested_frame_index_start is not None:
command.extend(["--frame-index-start", str(case.requested_frame_index_start)])
if case.requested_frame_index_end is not None:
command.extend(["--frame-index-end", str(case.requested_frame_index_end)])
if case.requested_frame_id_start is not None:
command.extend(["--frame-id-start", str(case.requested_frame_id_start)])
if case.requested_frame_id_end is not None:
command.extend(["--frame-id-end", str(case.requested_frame_id_end)])
if args.exported_model:
command.extend(["--exported-model", args.exported_model])
if args.device:
command.extend(["--device", args.device])
if args.providers:
command.extend(["--providers", *args.providers])
if args.enable_attr:
command.append("--enable-attr")
if args.enable_cross_class_merge_prior:
command.append("--enable-cross-class-merge-prior")
if args.save_aggregate_predictions:
command.append("--save-aggregate-predictions")
for extra_arg in args.inference_arg:
command.append(extra_arg)
return command
def write_case_status_files(case: InferenceCase, command: list[str], log_text: str, dry_run: bool) -> str:
status_dir = case.output_dir / "_status"
if dry_run:
return str(status_dir / "inference.log")
ensure_dir(status_dir, dry_run=False)
(status_dir / "command.txt").write_text(" ".join(command) + "\n", encoding="utf-8")
log_path = status_dir / "inference.log"
log_path.write_text(log_text, encoding="utf-8")
return str(log_path)
def summarize_process_output(completed: subprocess.CompletedProcess[str]) -> str:
stdout = completed.stdout.strip()
stderr = completed.stderr.strip()
if completed.returncode == 0:
if stdout:
return stdout.splitlines()[-1]
return "inference completed"
if stderr:
return stderr.splitlines()[-1]
if stdout:
return stdout.splitlines()[-1]
return f"inference failed with return code {completed.returncode}"
def run_case(case: InferenceCase, args: argparse.Namespace) -> CaseResult:
command = build_command(case, args)
if args.skip_existing and has_existing_outputs(case.output_dir):
return build_case_result(
case,
status="skipped_existing",
detail="existing inference outputs found",
command=command,
log_path=None,
)
if args.dry_run:
return build_case_result(
case,
status="planned",
detail="would run inference",
command=command,
log_path=str(case.output_dir / "_status" / "inference.log"),
)
ensure_dir(case.output_dir, dry_run=False)
completed = subprocess.run(
command,
cwd=str(ROOT),
check=False,
capture_output=True,
text=True,
encoding="utf-8",
)
combined_log = "\n".join(
part for part in (completed.stdout.strip(), completed.stderr.strip()) if part
)
log_path = write_case_status_files(case, command, combined_log + ("\n" if combined_log else ""), dry_run=False)
status = "success" if completed.returncode == 0 else "failed"
detail = summarize_process_output(completed)
return build_case_result(
case,
status=status,
detail=detail,
command=command,
log_path=log_path,
)
def build_manifest(
args: argparse.Namespace,
download_root: Path,
output_root: Path,
discovered_cases: list[InferenceCase],
results: list[CaseResult],
) -> dict:
summary: dict[str, int] = {}
for result in results:
summary[result.status] = summary.get(result.status, 0) + 1
return {
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"download_root": str(download_root),
"output_root": str(output_root),
"python_bin": str(Path(args.python_bin).resolve()),
"inference_script": str(Path(args.inference_script).resolve()),
"dry_run": args.dry_run,
"skip_existing": args.skip_existing,
"issue_filter": args.issue_ids or [],
"issue_json": args.issue_json,
"use_issue_frame_window": args.use_issue_frame_window,
"missing_issue_frame_policy": args.missing_issue_frame_policy,
"video_stride": args.video_stride,
"max_images": args.max_images,
"frame_index_start": args.frame_index_start,
"frame_index_end": args.frame_index_end,
"frame_id_start": args.frame_id_start,
"frame_id_end": args.frame_id_end,
"target_frame_id": args.target_frame_id,
"frame_before": args.frame_before,
"frame_after": args.frame_after,
"exported_model": args.exported_model,
"device": args.device,
"providers": args.providers or [],
"enable_attr": args.enable_attr,
"enable_cross_class_merge_prior": args.enable_cross_class_merge_prior,
"save_aggregate_predictions": args.save_aggregate_predictions,
"inference_args": args.inference_arg,
"total_cases": len(discovered_cases),
"summary": summary,
"cases": [result.to_dict() for result in results],
}
def print_summary(manifest: dict) -> None:
print(f"download_root: {manifest['download_root']}")
print(f"output_root: {manifest['output_root']}")
print(f"dry_run: {manifest['dry_run']}")
if manifest["use_issue_frame_window"]:
print(
"issue_frame_window: "
f"enabled issue_json={manifest['issue_json']} "
f"before={manifest['frame_before']} after={manifest['frame_after']} "
f"missing_policy={manifest['missing_issue_frame_policy']}"
)
elif (
manifest["target_frame_id"] is not None
or manifest["frame_id_start"] is not None
or manifest["frame_id_end"] is not None
or manifest["frame_index_start"] is not None
or manifest["frame_index_end"] is not None
):
print(
"manual_frame_window: "
f"target_frame_id={manifest['target_frame_id']} "
f"frame_id=[{manifest['frame_id_start'] if manifest['frame_id_start'] is not None else '-inf'}, "
f"{manifest['frame_id_end'] if manifest['frame_id_end'] is not None else '+inf'}] "
f"frame_index=[{manifest['frame_index_start'] if manifest['frame_index_start'] is not None else '-inf'}, "
f"{manifest['frame_index_end'] if manifest['frame_index_end'] is not None else '+inf'}]"
)
print(f"total_cases: {manifest['total_cases']}")
for status, count in sorted(manifest["summary"].items()):
print(f"{status}: {count}")
def main() -> int:
args = parse_args()
download_root = Path(args.download_root).resolve()
output_root = Path(args.output_root).resolve()
manifest_path = (
Path(args.manifest_path).resolve()
if args.manifest_path
else output_root / "inference_manifest.json"
)
issue_filter = set(args.issue_ids) if args.issue_ids else None
cases = discover_cases(download_root, output_root, issue_filter)
if not cases:
raise FileNotFoundError(f"No */sigmastar.1/camera4.bin cases found under {download_root}")
issue_item_lookup: dict[int, dict] = {}
if args.use_issue_frame_window:
issue_item_lookup = build_issue_item_lookup(Path(args.issue_json).resolve())
prepared_cases, skipped_results = apply_frame_windows(cases, args, issue_item_lookup)
log_progress(
"cases discovered: "
f"total={len(cases)} runnable={len(prepared_cases)} skipped_precheck={len(skipped_results)}"
)
for skipped_result in skipped_results:
log_progress(
f"skip issue_{skipped_result.issue_id} {compact_text(skipped_result.relative_case_dir)}: "
f"{skipped_result.status} ({compact_text(skipped_result.detail, max_len=72)})"
)
ensure_dir(output_root, dry_run=args.dry_run)
results = list(skipped_results)
total_prepared_cases = len(prepared_cases)
for index, case in enumerate(prepared_cases, start=1):
log_progress(
f"[{index}/{total_prepared_cases}] issue_{case.issue_id} "
f"{compact_text(case.relative_case_dir)}: start (window={case.frame_window_source})"
)
case_result = run_case(case, args)
results.append(case_result)
log_progress(
f"[{index}/{total_prepared_cases}] issue_{case.issue_id} "
f"{compact_text(case.relative_case_dir)}: "
f"{case_result.status} ({compact_text(case_result.detail, max_len=72)})"
)
manifest = build_manifest(args, download_root, output_root, cases, results)
if not args.dry_run:
ensure_dir(manifest_path.parent, dry_run=False)
manifest_path.write_text(
json.dumps(manifest, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
print_summary(manifest)
if args.dry_run:
print(f"manifest (not written in dry-run): {manifest_path}")
else:
print(f"manifest: {manifest_path}")
return 0
if __name__ == "__main__":
sys.exit(main())