Files
yolov26_3d/tools/feishu_project/download_issue_data.py
2026-06-24 09:35:46 +08:00

806 lines
28 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Download issue data referenced by the Feishu issue export JSON."""
from __future__ import annotations
import argparse
import json
import re
import shutil
import subprocess
import sys
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Iterable
FILE = Path(__file__).resolve()
ROOT = FILE.parents[2]
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT))
from tools.feishu_project.case_calib_recovery import recover_camera4_json
PDCL_REF_RE = re.compile(r"ADAS_[^:/\\\s]+::[^/\\\s]*")
MDI_RAW_REF_ARG_RE = re.compile(r"(?:^|\s)mdi\s+raw\b.*?(?:^|\s)-r\s+([^\s]+)")
STANDARD_PATH_SPLIT_RE = re.compile(r"[,\n;]+")
SHARED_CALIB_REL = Path("test_data") / "calibs" / "camera4.json"
PLACEHOLDER_TEXTS = {"待填", "待补充", "none", "null", "待提供"}
NETWORK_SHARE_PREFIX_MAPPINGS = (
("//hfs.minieye.tech/project-D4Q2", "/mnt/D4Q2"),
("//192.168.2.122/project-D4Q2", "/mnt/D4Q2"),
("//hfs.minieye.tech/project-G1M3", "/mnt/G1M3"),
("//192.168.2.122/project-G1M3", "/mnt/G1M3"),
("//hfs.minieye.tech/G1M3", "/mnt/G1M3"),
("//192.168.2.122/G1M3", "/mnt/G1M3"),
)
@dataclass
class ActionResult:
issue_id: int
issue_name: str
source_field: str
source_kind: str
raw_value: str | None
normalized_ref: str | None
output_dir: str
status: str
detail: str
resolved_source_path: str | None = None
command: list[str] | None = None
candidate_paths: list[str] | None = None
selected_subpath: str | None = None
def to_dict(self) -> dict:
return {
"issue_id": self.issue_id,
"issue_name": self.issue_name,
"source_field": self.source_field,
"source_kind": self.source_kind,
"raw_value": self.raw_value,
"normalized_ref": self.normalized_ref,
"output_dir": self.output_dir,
"status": self.status,
"detail": self.detail,
"resolved_source_path": self.resolved_source_path,
"command": self.command,
"candidate_paths": self.candidate_paths,
"selected_subpath": self.selected_subpath,
}
@dataclass(frozen=True)
class PDCLDownloadRequest:
normalized_ref: str
selected_subpath: str | None = None
raw_token: str | None = None
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Download or stage issue data from a Feishu issue export JSON."
)
parser.add_argument(
"--input-json",
default="tools/feishu_project/dongying_g1q3_issue_list.json",
help="Path to the issue export JSON.",
)
parser.add_argument(
"--output-root",
required=True,
help="Directory where downloaded or copied data should be stored.",
)
parser.add_argument(
"--manifest-path",
default=None,
help="Optional explicit path for the execution manifest JSON.",
)
parser.add_argument(
"--issue-id",
action="append",
dest="issue_ids",
type=int,
help="Optional issue id filter. Can be repeated.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Plan actions without running mdi or copying files.",
)
parser.add_argument(
"--skip-mdi",
action="store_true",
help="Skip PDCL/MDI downloads and only process standard paths.",
)
parser.add_argument(
"--skip-copy",
action="store_true",
help="Skip standard-path copies and only process PDCL/MDI downloads.",
)
parser.add_argument(
"--only-redownload-affected-cases",
action="store_true",
help=(
"Only re-copy standard-path cases affected by the historical sigmastar.1/camera4.bin "
"copy bug. This mode skips PDCL/MDI downloads and replaces stale copied targets."
),
)
parser.add_argument(
"--skip-calib-recovery",
action="store_true",
help=(
"Skip recovering camera4.json from camera_config_folder.bin or mcap attachments "
"after standard-path copies."
),
)
return parser.parse_args()
def load_issue_items(path: Path) -> list[dict]:
payload = json.loads(path.read_text(encoding="utf-8"))
return payload["items"]
def ensure_dir(path: Path, dry_run: bool) -> None:
if dry_run:
return
path.mkdir(parents=True, exist_ok=True)
def log_progress(message: str) -> None:
timestamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S")
print(f"[download_issue_data {timestamp}] {message}", flush=True)
def compact_text(value: object, max_len: int = 96) -> str:
text = "" if value is None else str(value).strip()
text = re.sub(r"\s+", " ", text)
if len(text) <= max_len:
return text
return f"{text[: max_len - 3]}..."
def summarize_issue_results(results: list[ActionResult]) -> str:
if not results:
return "no actions"
summary: dict[str, int] = {}
for result in results:
summary[result.status] = summary.get(result.status, 0) + 1
return ", ".join(f"{status}={summary[status]}" for status in sorted(summary))
def normalize_issue_dirname(issue_id: int) -> str:
return f"issue_{issue_id}"
def iter_issue_fields(item: dict) -> Iterable[tuple[str, object]]:
yield "问题数据地址", item.get("问题数据地址")
yield "问题数据地址_PDCL", item.get("问题数据地址_PDCL")
def _normalize_pdcl_selected_subpath(raw_subpath: str) -> str | None:
cleaned = raw_subpath.strip().strip("/")
if not cleaned:
return None
candidate = Path(cleaned)
if candidate.is_absolute():
return None
if any(part in {"", ".", ".."} for part in candidate.parts):
return None
return str(candidate)
def _build_pdcl_request_from_token(token: str) -> PDCLDownloadRequest | None:
stripped = token.strip().strip("\"'`")
match = PDCL_REF_RE.match(stripped)
if match is None:
return None
normalized_ref = match.group(0)
suffix = stripped[match.end():]
selected_subpath = None
if suffix.startswith("/"):
raw_subpath = suffix[1:]
if raw_subpath and not raw_subpath.startswith("ADAS_"):
selected_subpath = _normalize_pdcl_selected_subpath(raw_subpath)
return PDCLDownloadRequest(
normalized_ref=normalized_ref,
selected_subpath=selected_subpath,
raw_token=stripped,
)
def extract_pdcl_requests(raw_value: object) -> list[PDCLDownloadRequest]:
if raw_value is None:
return []
text = str(raw_value).strip()
if not text:
return []
requests: list[PDCLDownloadRequest] = []
for segment in (part.strip() for part in STANDARD_PATH_SPLIT_RE.split(text)):
if not segment:
continue
mdi_match = MDI_RAW_REF_ARG_RE.search(segment)
if mdi_match is not None:
request = _build_pdcl_request_from_token(mdi_match.group(1))
if request is not None:
requests.append(request)
continue
search_pos = 0
while True:
match = PDCL_REF_RE.search(segment, search_pos)
if match is None:
break
normalized_ref = match.group(0)
suffix = segment[match.end():]
selected_subpath = None
if suffix.startswith("/") and not suffix[1:].startswith("ADAS_"):
selected_subpath = _normalize_pdcl_selected_subpath(suffix[1:])
requests.append(
PDCLDownloadRequest(
normalized_ref=normalized_ref,
selected_subpath=selected_subpath,
raw_token=segment,
)
)
break
requests.append(
PDCLDownloadRequest(
normalized_ref=normalized_ref,
selected_subpath=None,
raw_token=normalized_ref,
)
)
search_pos = match.end()
deduped: list[PDCLDownloadRequest] = []
seen = set()
for request in requests:
key = (request.normalized_ref, request.selected_subpath)
if key in seen:
continue
seen.add(key)
deduped.append(request)
return deduped
def extract_standard_paths(raw_value: object) -> list[str]:
if raw_value is None:
return []
text = str(raw_value).strip()
if not text:
return []
if text.lower() in PLACEHOLDER_TEXTS:
return []
if extract_pdcl_requests(text):
return []
if "/" not in text and "\\" not in text:
return []
parts = [part.strip() for part in STANDARD_PATH_SPLIT_RE.split(text)]
return [part for part in parts if part]
def normalize_standard_source_path(path: Path) -> Path:
normalized = path
if normalized.name == "camera4.bin" and normalized.parent.name == "sigmastar.1":
return normalized.parent.parent
if normalized.name == "sigmastar.1":
return normalized.parent
return normalized
def is_affected_standard_path(raw_path: str) -> bool:
raw_path_obj = Path(raw_path.strip())
return normalize_standard_source_path(raw_path_obj) != raw_path_obj
def normalize_share_path_separators(path_str: str) -> str:
normalized = path_str.strip().replace("\\", "/")
normalized = re.sub(r"/{3,}", "//", normalized)
return normalized
def rewrite_network_share_path(path_str: str) -> str | None:
normalized = normalize_share_path_separators(path_str)
for prefix_src, prefix_dst in NETWORK_SHARE_PREFIX_MAPPINGS:
if normalized.startswith(prefix_src):
return f"{prefix_dst}{normalized[len(prefix_src):]}"
return None
def build_path_candidates(raw_path: str) -> list[Path]:
candidates: list[str] = [raw_path]
normalized_share = normalize_share_path_separators(raw_path)
if normalized_share != raw_path:
candidates.append(normalized_share)
network_share_rewritten = rewrite_network_share_path(raw_path)
if network_share_rewritten:
candidates.append(network_share_rewritten)
for needle in ("hfs/project-G1M3", "project-G1M3"):
for candidate in list(candidates):
if needle in candidate:
candidates.append(candidate.replace(needle, "G1M3"))
normalized_candidates = [normalize_standard_source_path(Path(candidate)) for candidate in candidates]
unique_candidates = list(OrderedDict.fromkeys(str(candidate) for candidate in normalized_candidates))
return [Path(candidate) for candidate in unique_candidates]
def resolve_existing_path(raw_path: str) -> tuple[Path | None, list[Path]]:
candidates = build_path_candidates(raw_path)
for candidate in candidates:
if candidate.exists():
return candidate, candidates
return None, candidates
def remove_existing_target(path: Path) -> None:
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
def copy_source_path(
source_path: Path,
output_dir: Path,
dry_run: bool,
replace_existing: bool = False,
legacy_target_names: Iterable[str] | None = None,
) -> tuple[str, str]:
target = output_dir / source_path.name
cleanup_targets: list[Path] = [target]
if legacy_target_names:
for target_name in legacy_target_names:
cleanup_targets.append(output_dir / target_name)
unique_cleanup_targets = list(OrderedDict.fromkeys(str(path) for path in cleanup_targets))
cleanup_paths = [Path(path) for path in unique_cleanup_targets]
existing_targets = [path for path in cleanup_paths if path.exists()]
if existing_targets and not replace_existing:
return "exists", f"target already exists: {target}"
if dry_run:
if existing_targets and replace_existing:
existing_str = ", ".join(str(path) for path in existing_targets)
return "planned_redownload", f"would replace {existing_str} with {source_path} -> {target}"
return "planned", f"would copy {source_path} -> {target}"
ensure_dir(output_dir, dry_run=False)
if replace_existing:
for existing_target in existing_targets:
remove_existing_target(existing_target)
if source_path.is_dir():
shutil.copytree(source_path, target)
else:
shutil.copy2(source_path, target)
if existing_targets and replace_existing:
replaced = ", ".join(str(path) for path in existing_targets)
return "redownloaded", f"replaced {replaced} with {target}"
return "copied", f"copied to {target}"
def build_copied_target_root(output_dir: Path, source_path: Path) -> Path:
return output_dir / source_path.name
def find_shared_test_data_dir(source_path: Path, max_parent_levels: int = 4) -> Path | None:
if (source_path / SHARED_CALIB_REL).is_file():
return None
current = source_path.parent
for _ in range(max_parent_levels):
candidate = current / "test_data"
if (candidate / "calibs" / "camera4.json").is_file():
return candidate
if current.parent == current:
break
current = current.parent
return None
def sync_shared_test_data(
source_path: Path,
target_root: Path,
dry_run: bool,
) -> tuple[str | None, str | None, str | None]:
shared_test_data_dir = find_shared_test_data_dir(source_path)
if shared_test_data_dir is None:
return None, None, None
target_shared_test_data_dir = target_root / "test_data"
target_shared_calib = target_shared_test_data_dir / "calibs" / "camera4.json"
if target_shared_calib.is_file():
return None, None, None
if dry_run:
return (
"planned_shared_calib_sync",
f"would copy shared test_data {shared_test_data_dir} -> {target_shared_test_data_dir}",
str(shared_test_data_dir),
)
shutil.copytree(shared_test_data_dir, target_shared_test_data_dir, dirs_exist_ok=True)
return (
"synced_shared_calib",
f"copied shared test_data {shared_test_data_dir} -> {target_shared_test_data_dir}",
str(shared_test_data_dir),
)
def recover_target_root_calib(
source_root: Path,
target_root: Path,
dry_run: bool,
) -> tuple[str, str, str | None]:
recovery = recover_camera4_json(
source_root=source_root,
target_root=target_root,
dry_run=dry_run,
)
return recovery.status, recovery.detail, None if recovery.source_path is None else str(recovery.source_path)
def _expected_pdcl_root_dir(output_dir: Path, ref: str) -> Path:
if "::" not in ref:
raise ValueError(f"Unexpected PDCL ref without '::': {ref}")
return output_dir / ref.split("::", 1)[1]
def _prune_pdcl_download_to_selected_subpath(
root_dir: Path,
selected_subpath: str,
dry_run: bool,
) -> tuple[str, str]:
selected_rel = Path(selected_subpath)
selected_path = root_dir / selected_rel
if dry_run:
return (
"planned_selected_subpath",
f"would keep {selected_path} and shared test_data under {root_dir}",
)
if not selected_path.exists():
return (
"failed_selected_subpath_missing",
f"selected subpath not found after mdi download: {selected_path}",
)
keep_names = {selected_rel.parts[0], "test_data"}
removed_children: list[str] = []
for child in root_dir.iterdir():
if child.name in keep_names:
continue
removed_children.append(child.name)
remove_existing_target(child)
detail = f"kept selected subpath {selected_path}"
if removed_children:
detail += f"; removed siblings: {', '.join(sorted(removed_children))}"
return "downloaded_selected_subpath", detail
def run_mdi_download(request: PDCLDownloadRequest, output_dir: Path, dry_run: bool) -> tuple[str, str, list[str]]:
command = ["mdi", "raw", "-r", request.normalized_ref, "-s", str(output_dir)]
if dry_run:
if request.selected_subpath:
root_dir = _expected_pdcl_root_dir(output_dir, request.normalized_ref)
status, detail = _prune_pdcl_download_to_selected_subpath(root_dir, request.selected_subpath, dry_run=True)
return status, f"would run {' '.join(command)}; {detail}", command
return "planned", f"would run {' '.join(command)}", command
ensure_dir(output_dir, dry_run=False)
completed = subprocess.run(
command,
check=False,
capture_output=True,
text=True,
encoding="utf-8",
)
if completed.returncode == 0:
base_detail = completed.stdout.strip() or "mdi raw completed"
if request.selected_subpath:
root_dir = _expected_pdcl_root_dir(output_dir, request.normalized_ref)
status, detail = _prune_pdcl_download_to_selected_subpath(root_dir, request.selected_subpath, dry_run=False)
return status, f"{base_detail}\n{detail}", command
return "downloaded", base_detail, command
detail = completed.stderr.strip() or completed.stdout.strip() or "mdi raw failed"
return "failed", detail, command
def process_issue(
item: dict,
output_root: Path,
dry_run: bool,
skip_mdi: bool,
skip_copy: bool,
only_redownload_affected_cases: bool,
skip_calib_recovery: bool,
) -> list[ActionResult]:
issue_id = int(item["id"])
issue_name = str(item["name"])
issue_dir = output_root / normalize_issue_dirname(issue_id)
results: list[ActionResult] = []
seen_pdcl_requests: set[tuple[str, str | None]] = set()
seen_paths: set[str] = set()
pdcl_index = 0
path_index = 0
for field_name, raw_value in iter_issue_fields(item):
if not skip_mdi and not only_redownload_affected_cases:
for request in extract_pdcl_requests(raw_value):
request_key = (request.normalized_ref, request.selected_subpath)
if request_key in seen_pdcl_requests:
results.append(
ActionResult(
issue_id=issue_id,
issue_name=issue_name,
source_field=field_name,
source_kind="pdcl_mdi_download",
raw_value=None if raw_value is None else str(raw_value),
normalized_ref=request.normalized_ref,
output_dir=str(issue_dir),
status="skipped_duplicate",
detail=(
f"duplicate PDCL ref: {request.normalized_ref}"
if request.selected_subpath is None
else f"duplicate PDCL ref+subpath: {request.normalized_ref} / {request.selected_subpath}"
),
selected_subpath=request.selected_subpath,
)
)
continue
seen_pdcl_requests.add(request_key)
pdcl_index += 1
download_dir = issue_dir / f"pdcl_{pdcl_index:02d}"
request_desc = request.normalized_ref
if request.selected_subpath:
request_desc += f"/{request.selected_subpath}"
log_progress(
f"issue_{issue_id} [download] pdcl_{pdcl_index:02d}: {compact_text(request_desc)}"
)
status, detail, command = run_mdi_download(request, download_dir, dry_run=dry_run)
results.append(
ActionResult(
issue_id=issue_id,
issue_name=issue_name,
source_field=field_name,
source_kind="pdcl_mdi_download",
raw_value=None if raw_value is None else str(raw_value),
normalized_ref=request.normalized_ref,
output_dir=str(download_dir),
status=status,
detail=detail,
command=command,
selected_subpath=request.selected_subpath,
)
)
if skip_copy:
continue
for raw_path in extract_standard_paths(raw_value):
if raw_path in seen_paths:
results.append(
ActionResult(
issue_id=issue_id,
issue_name=issue_name,
source_field=field_name,
source_kind="standard_path",
raw_value=raw_path,
normalized_ref=None,
output_dir=str(issue_dir),
status="skipped_duplicate",
detail=f"duplicate standard path: {raw_path}",
)
)
continue
seen_paths.add(raw_path)
path_index += 1
copy_dir = issue_dir / f"path_{path_index:02d}"
affected_standard_path = is_affected_standard_path(raw_path)
if only_redownload_affected_cases and not affected_standard_path:
continue
log_progress(
f"issue_{issue_id} [download] path_{path_index:02d}: {compact_text(raw_path)}"
)
resolved_source_path, candidates = resolve_existing_path(raw_path)
if resolved_source_path is None:
results.append(
ActionResult(
issue_id=issue_id,
issue_name=issue_name,
source_field=field_name,
source_kind="standard_path",
raw_value=raw_path,
normalized_ref=None,
output_dir=str(copy_dir),
status="skipped_missing",
detail="source path not found after rewrite attempts",
candidate_paths=[str(candidate) for candidate in candidates],
)
)
continue
legacy_target_names = []
if affected_standard_path:
legacy_target_names.append(Path(raw_path.strip()).name)
status, detail = copy_source_path(
resolved_source_path,
copy_dir,
dry_run=dry_run,
replace_existing=only_redownload_affected_cases and affected_standard_path,
legacy_target_names=legacy_target_names,
)
target_root = build_copied_target_root(copy_dir, resolved_source_path)
results.append(
ActionResult(
issue_id=issue_id,
issue_name=issue_name,
source_field=field_name,
source_kind="standard_path",
raw_value=raw_path,
normalized_ref=None,
output_dir=str(copy_dir),
status=status,
detail=detail,
resolved_source_path=str(resolved_source_path),
candidate_paths=[str(candidate) for candidate in candidates],
)
)
sync_status, sync_detail, shared_source_dir = sync_shared_test_data(
resolved_source_path,
target_root,
dry_run=dry_run,
)
if sync_status is not None:
results.append(
ActionResult(
issue_id=issue_id,
issue_name=issue_name,
source_field=field_name,
source_kind="shared_test_data",
raw_value=raw_path,
normalized_ref=None,
output_dir=str(target_root / "test_data"),
status=sync_status,
detail=sync_detail or "",
resolved_source_path=shared_source_dir,
candidate_paths=[str(candidate) for candidate in candidates],
)
)
if not skip_calib_recovery:
calib_status, calib_detail, calib_source_path = recover_target_root_calib(
source_root=resolved_source_path,
target_root=target_root,
dry_run=dry_run,
)
results.append(
ActionResult(
issue_id=issue_id,
issue_name=issue_name,
source_field=field_name,
source_kind="case_calib_recovery",
raw_value=raw_path,
normalized_ref=None,
output_dir=str(target_root / "test_data" / "calibs"),
status=calib_status,
detail=calib_detail,
resolved_source_path=calib_source_path,
candidate_paths=[str(candidate) for candidate in candidates],
)
)
return results
def build_manifest(
args: argparse.Namespace,
input_json: Path,
output_root: Path,
action_results: list[ActionResult],
) -> dict:
summary: dict[str, int] = {}
for action in action_results:
summary[action.status] = summary.get(action.status, 0) + 1
return {
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"input_json": str(input_json),
"output_root": str(output_root),
"dry_run": args.dry_run,
"skip_mdi": args.skip_mdi,
"skip_copy": args.skip_copy,
"skip_calib_recovery": args.skip_calib_recovery,
"only_redownload_affected_cases": args.only_redownload_affected_cases,
"issue_filter": args.issue_ids or [],
"summary": summary,
"actions": [action.to_dict() for action in action_results],
}
def print_summary(manifest: dict) -> None:
print(f"input_json: {manifest['input_json']}")
print(f"output_root: {manifest['output_root']}")
print(f"dry_run: {manifest['dry_run']}")
for status, count in sorted(manifest["summary"].items()):
print(f"{status}: {count}")
def main() -> int:
args = parse_args()
input_json = Path(args.input_json).resolve()
output_root = Path(args.output_root).resolve()
manifest_path = (
Path(args.manifest_path).resolve()
if args.manifest_path
else output_root / "download_manifest.json"
)
items = load_issue_items(input_json)
if args.issue_ids:
issue_filter = set(args.issue_ids)
items = [item for item in items if int(item["id"]) in issue_filter]
ensure_dir(output_root, dry_run=args.dry_run)
action_results: list[ActionResult] = []
total_items = len(items)
log_progress(f"issues_to_process: {total_items}")
for index, item in enumerate(items, start=1):
issue_id = int(item["id"])
issue_name = compact_text(item.get("name"), max_len=64)
log_progress(f"[{index}/{total_items}] issue_{issue_id} start: {issue_name}")
issue_results = process_issue(
item=item,
output_root=output_root,
dry_run=args.dry_run,
skip_mdi=args.skip_mdi,
skip_copy=args.skip_copy,
only_redownload_affected_cases=args.only_redownload_affected_cases,
skip_calib_recovery=args.skip_calib_recovery,
)
action_results.extend(issue_results)
log_progress(
f"[{index}/{total_items}] issue_{issue_id} done: {summarize_issue_results(issue_results)}"
)
manifest = build_manifest(args, input_json, output_root, action_results)
ensure_dir(manifest_path.parent, dry_run=args.dry_run)
if not args.dry_run:
manifest_path.write_text(
json.dumps(manifest, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
print_summary(manifest)
if args.dry_run:
print(f"manifest (not written in dry-run): {manifest_path}")
else:
print(f"manifest: {manifest_path}")
return 0
if __name__ == "__main__":
sys.exit(main())