Files
yolov26_3d/tools/feishu_project/sync_issue_data.py
2026-06-24 09:35:46 +08:00

1583 lines
60 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Refresh a Feishu issue view and optionally process only incremental issue data."""
from __future__ import annotations
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_PYTHON_BIN = Path("/deeplearning_team/ydong/dongying/miniconda/envs/dev/bin/python")
DEFAULT_EXPORT_SCRIPT = ROOT / "tools" / "feishu_project" / "export_feishu_view_issues.py"
DEFAULT_DOWNLOAD_SCRIPT = ROOT / "tools" / "feishu_project" / "download_issue_data.py"
DEFAULT_INFERENCE_SCRIPT = ROOT / "tools" / "feishu_project" / "run_issue_data_inference.py"
DEFAULT_CLIP_FALLBACK_INFERENCE_SCRIPT = ROOT / "tools" / "model_inference" / "core" / "run_two_roi_exported_onnx_infer.py"
DEFAULT_ISSUE_TRACKING_SCRIPT = ROOT / "tools" / "feishu_project" / "run_issue_data_tracking.sh"
DATA_FIELDS = ("问题数据地址", "问题数据地址_PDCL")
PLACEHOLDER_TEXTS = {"", "待填", "待补充", "none", "null", "待提供"}
PDCL_REF_RE = re.compile(r"ADAS_[^:/\\\s]+::[^/\\\s]*")
STANDARD_PATH_SPLIT_RE = re.compile(r"[,\n;]+")
STATUS_LABELS = {
"OPEN": "待处理",
"Fi7of4O9X": "分析中(未解决)",
"IN PROGRESS": "处理中(已解决)",
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--project-key", required=True)
parser.add_argument("--user-key", required=True)
parser.add_argument("--view-name", required=True)
parser.add_argument("--work-item-type", default="issue")
parser.add_argument("--output-json", required=True, help="Path to the latest exported issue JSON.")
parser.add_argument(
"--sync-manifest-path",
default="",
help="Optional manifest path. Defaults to <output-json>.sync_manifest.json",
)
parser.add_argument(
"--snapshot-dir",
default="",
help="Optional directory for timestamped exported JSON snapshots.",
)
parser.add_argument(
"--python-bin",
default=str(DEFAULT_PYTHON_BIN),
help="Python interpreter used to launch repo scripts.",
)
parser.add_argument(
"--export-script",
default=str(DEFAULT_EXPORT_SCRIPT),
help="Path to export_feishu_view_issues.py.",
)
parser.add_argument(
"--download-script",
default=str(DEFAULT_DOWNLOAD_SCRIPT),
help="Path to download_issue_data.py.",
)
parser.add_argument(
"--inference-script",
default=str(DEFAULT_INFERENCE_SCRIPT),
help="Path to run_issue_data_inference.py.",
)
parser.add_argument(
"--download-root",
default="",
help="Optional download root. Required when --run-download is used.",
)
parser.add_argument(
"--download-manifest-path",
default="",
help="Optional explicit download manifest path.",
)
parser.add_argument(
"--inference-root",
default="",
help="Optional inference root. Required when --run-inference is used.",
)
parser.add_argument(
"--inference-manifest-path",
default="",
help="Optional explicit inference manifest path.",
)
parser.add_argument(
"--exported-model",
default="",
help="Optional exported model path forwarded to run_issue_data_inference.py.",
)
parser.add_argument(
"--use-issue-frame-window",
action="store_true",
help="Forward issue-frame window inference options to run_issue_data_inference.py.",
)
parser.add_argument("--frame-before", type=int, default=100)
parser.add_argument("--frame-after", type=int, default=100)
parser.add_argument(
"--missing-issue-frame-policy",
choices=("full", "skip"),
default="full",
help="How to handle cases without usable 问题发生frameid when issue-frame window inference is enabled.",
)
parser.add_argument(
"--issue-tracking-script",
default=str(DEFAULT_ISSUE_TRACKING_SCRIPT),
help="Path to run_issue_data_tracking.sh.",
)
parser.add_argument(
"--tracking-model-version",
default="",
help="Optional model version override forwarded to run_issue_data_tracking.sh.",
)
parser.add_argument(
"--run-download",
action="store_true",
help="Download issue data for newly actionable issues.",
)
parser.add_argument(
"--run-inference",
action="store_true",
help="Run inference for downloaded cases belonging to actionable issues.",
)
parser.add_argument(
"--run-tracking",
action="store_true",
help="Run issue-data tracking after inference.",
)
parser.add_argument(
"--refresh-changed-issues",
action="store_true",
help=(
"Allow destructive refresh for issues whose data-address fields changed after already "
"having data. This removes issue-specific download/inference outputs before reprocessing."
),
)
parser.add_argument(
"--skip-existing-inference",
action="store_true",
help="Forward --skip-existing to run_issue_data_inference.py.",
)
parser.add_argument(
"--save-snapshot",
action="store_true",
help="Write a timestamped exported JSON snapshot under --snapshot-dir.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Refresh the live view and plan actions without mutating local outputs.",
)
parser.add_argument(
"--issue-id",
action="append",
type=int,
default=[],
help="Only process the specified issue id. Can be repeated.",
)
parser.add_argument(
"--issue-id-min",
type=int,
default=None,
help="Only process issue ids greater than or equal to this value.",
)
parser.add_argument(
"--issue-id-max",
type=int,
default=None,
help="Only process issue ids less than or equal to this value.",
)
parser.add_argument(
"--issue-name-keyword",
action="append",
default=[],
help=(
"Optional issue-name keyword filter. Matches issues whose name contains any provided "
"keyword. Can be repeated."
),
)
parser.add_argument(
"--download-arg",
action="append",
default=[],
help="Extra argument forwarded to download_issue_data.py. Can be repeated.",
)
parser.add_argument(
"--inference-arg",
action="append",
default=[],
help="Extra argument forwarded to run_issue_data_inference.py. Can be repeated.",
)
return parser.parse_args()
def canonicalize(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, sort_keys=True)
def load_json(path: Path) -> dict:
return json.loads(path.read_text(encoding="utf-8"))
def save_json(path: Path, payload: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def log_progress(message: str) -> None:
timestamp = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S")
print(f"[sync_issue_data {timestamp}] {message}", flush=True)
def compact_text(value: Any, max_len: int = 80) -> str:
text = "" if value is None else str(value).strip()
text = re.sub(r"\s+", " ", text)
if len(text) <= max_len:
return text
return f"{text[: max_len - 3]}..."
def normalize_issue_name_keyword(value: Any) -> str:
text = compact_text(value, max_len=10_000)
return text.casefold()
def sanitize_issue_name_keywords(keywords: Iterable[object]) -> list[str]:
cleaned: list[str] = []
seen: set[str] = set()
for keyword in keywords:
display_text = compact_text(keyword, max_len=10_000)
normalized = display_text.casefold()
if not normalized or normalized in seen:
continue
seen.add(normalized)
cleaned.append(display_text)
return cleaned
def build_issue_lookup(payload: dict) -> dict[int, dict]:
return {int(item["id"]): item for item in payload.get("items", [])}
def issue_matches_name_keywords(item: dict, keywords: list[str]) -> bool:
if not keywords:
return True
issue_name = normalize_issue_name_keyword(item.get("name"))
return any(normalize_issue_name_keyword(keyword) in issue_name for keyword in keywords)
def collect_issue_ids_by_name_keywords(payload: dict, keywords: list[str]) -> list[int]:
if not keywords:
return []
return sorted(
int(item["id"])
for item in payload.get("items", [])
if issue_matches_name_keywords(item, keywords)
)
def issue_matches_id_range(issue_id: int, issue_id_min: int | None, issue_id_max: int | None) -> bool:
if issue_id_min is not None and issue_id < issue_id_min:
return False
if issue_id_max is not None and issue_id > issue_id_max:
return False
return True
def collect_issue_ids_by_id_range(
payload: dict,
issue_id_min: int | None,
issue_id_max: int | None,
) -> list[int]:
if issue_id_min is None and issue_id_max is None:
return []
return sorted(
int(item["id"])
for item in payload.get("items", [])
if issue_matches_id_range(int(item["id"]), issue_id_min, issue_id_max)
)
def format_issue_label(item: dict) -> str:
issue_id = int(item["id"])
name = compact_text(item.get("name"), max_len=48)
label = f"issue_{issue_id}"
if item.get("status") is not None:
label += f"({status_label(item.get('status'))})"
if name:
label += f": {name}"
return label
def describe_issue_ids(payload: dict, issue_ids: Iterable[int], limit: int = 5) -> str:
unique_issue_ids = sorted(set(issue_ids))
if not unique_issue_ids:
return "0 issues"
issue_lookup = build_issue_lookup(payload)
labels: list[str] = []
for issue_id in unique_issue_ids[:limit]:
item = issue_lookup.get(issue_id)
labels.append(format_issue_label(item) if item is not None else f"issue_{issue_id}")
remainder = len(unique_issue_ids) - len(labels)
suffix = f"; ... (+{remainder} more)" if remainder > 0 else ""
return f"{len(unique_issue_ids)} issues -> " + "; ".join(labels) + suffix
def summarize_status_counts(summary: dict[str, Any]) -> str:
if not summary:
return "no summary"
return ", ".join(f"{status}={summary[status]}" for status in sorted(summary))
def run_command(
command: list[str],
cwd: Path | None = None,
env: dict[str, str] | None = None,
stream_output: bool = False,
) -> subprocess.CompletedProcess[str]:
if stream_output:
effective_env = os.environ.copy()
if env is not None:
effective_env.update(env)
effective_env.setdefault("PYTHONUNBUFFERED", "1")
process = subprocess.Popen(
command,
cwd=None if cwd is None else str(cwd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
env=effective_env,
bufsize=1,
)
stdout_chunks: list[str] = []
assert process.stdout is not None
for line in process.stdout:
print(line, end="", flush=True)
stdout_chunks.append(line)
process.stdout.close()
completed = subprocess.CompletedProcess(
command,
process.wait(),
stdout="".join(stdout_chunks),
stderr="",
)
if completed.returncode != 0:
detail = completed.stdout.strip() or "unknown error"
raise RuntimeError(f"command failed ({completed.returncode}): {' '.join(command)}\n{detail}")
return completed
completed = subprocess.run(
command,
cwd=None if cwd is None else str(cwd),
check=False,
capture_output=True,
text=True,
encoding="utf-8",
env=env,
)
if completed.returncode != 0:
detail = completed.stderr.strip() or completed.stdout.strip() or "unknown error"
raise RuntimeError(f"command failed ({completed.returncode}): {' '.join(command)}\n{detail}")
return completed
def export_latest_view(args: argparse.Namespace) -> tuple[dict, Path]:
output_path = Path(args.output_json).resolve()
python_bin = str(Path(args.python_bin).resolve())
export_script = str(Path(args.export_script).resolve())
log_progress(f"[stage=export] refreshing Feishu view '{args.view_name}'")
with tempfile.NamedTemporaryFile(
prefix="sync_issue_data_",
suffix=".json",
delete=False,
) as tmp_file:
tmp_output = Path(tmp_file.name)
command = [
python_bin,
export_script,
"--project-key",
args.project_key,
"--user-key",
args.user_key,
"--view-name",
args.view_name,
"--work-item-type",
args.work_item_type,
"--output",
str(tmp_output),
]
run_command(command, cwd=ROOT)
payload = load_json(tmp_output)
log_progress(
f"[stage=export] fetched {len(payload.get('items', []))} issues from '{args.view_name}'"
)
if not args.dry_run:
output_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(tmp_output, output_path)
if args.save_snapshot and args.snapshot_dir:
snapshot_dir = Path(args.snapshot_dir).resolve()
snapshot_name = build_snapshot_name(payload)
snapshot_path = snapshot_dir / snapshot_name
snapshot_dir.mkdir(parents=True, exist_ok=True)
shutil.copy2(tmp_output, snapshot_path)
return payload, tmp_output
def build_snapshot_name(payload: dict) -> str:
exported_at = str(payload.get("exported_at") or "")
compact = exported_at.replace(":", "").replace("-", "").replace("+", "_").replace("T", "_")
compact = compact.replace("/", "_")
if not compact:
compact = datetime.now().astimezone().strftime("%Y%m%d_%H%M%S")
return f"{compact}.json"
def meaningful_text(value: Any) -> bool:
if value is None:
return False
text = str(value).strip()
if not text:
return False
return text.lower() not in PLACEHOLDER_TEXTS
def has_downloadable_data(item: dict) -> bool:
return any(meaningful_text(item.get(field_name)) for field_name in DATA_FIELDS)
def iter_issue_fields(item: dict) -> Iterable[tuple[str, object]]:
for field_name in DATA_FIELDS:
yield field_name, item.get(field_name)
def extract_pdcl_refs(raw_value: object) -> list[str]:
if raw_value is None:
return []
text = str(raw_value).strip()
if not text:
return []
return list(dict.fromkeys(PDCL_REF_RE.findall(text)))
def extract_standard_paths(raw_value: object) -> list[str]:
if raw_value is None:
return []
text = str(raw_value).strip()
if not text:
return []
if text.lower() in PLACEHOLDER_TEXTS:
return []
if extract_pdcl_refs(text):
return []
if "/" not in text and "\\" not in text:
return []
parts = [part.strip() for part in STANDARD_PATH_SPLIT_RE.split(text)]
return [part for part in parts if part]
def expected_download_refs(item: dict) -> list[dict]:
refs: list[dict] = []
seen: set[tuple[str, str]] = set()
for field_name, raw_value in iter_issue_fields(item):
for ref in extract_pdcl_refs(raw_value):
key = ("pdcl_mdi_download", ref)
if key in seen:
continue
seen.add(key)
refs.append(
{
"source_field": field_name,
"source_kind": "pdcl_mdi_download",
"normalized_ref": ref,
}
)
for path in extract_standard_paths(raw_value):
normalized_path = path.strip()
key = ("standard_path", normalized_path)
if key in seen:
continue
seen.add(key)
refs.append(
{
"source_field": field_name,
"source_kind": "standard_path",
"normalized_ref": normalized_path,
}
)
return refs
def data_signature(item: dict) -> str:
return canonicalize({field_name: item.get(field_name) for field_name in DATA_FIELDS})
def top_level_changed_fields(previous: dict, current: dict) -> list[str]:
changed = []
for key in sorted(set(previous) | set(current)):
if canonicalize(previous.get(key)) != canonicalize(current.get(key)):
changed.append(key)
return changed
def status_label(status: Any) -> str:
text = "" if status is None else str(status)
return STATUS_LABELS.get(text, text)
def summarize_issue(item: dict) -> dict:
return {
"id": int(item["id"]),
"name": item.get("name"),
"status": item.get("status"),
"status_label": status_label(item.get("status")),
"created_at": item.get("created_at"),
"updated_at": item.get("updated_at"),
"has_downloadable_data": has_downloadable_data(item),
}
def summarize_removed_issue(issue_id: int, item: dict) -> dict:
summary = summarize_issue(item)
summary["id"] = issue_id
return summary
def format_issue_set(items_by_id: dict[int, dict], issue_ids: Iterable[int]) -> list[dict]:
return [summarize_issue(items_by_id[issue_id]) for issue_id in sorted(set(issue_ids))]
def build_diff(previous_payload: dict | None, current_payload: dict, refresh_changed_issues: bool) -> dict:
previous_items = previous_payload.get("items", []) if previous_payload else []
current_items = current_payload.get("items", [])
previous_by_id = {int(item["id"]): item for item in previous_items}
current_by_id = {int(item["id"]): item for item in current_items}
new_issue_ids: list[int] = []
removed_issue_ids: list[int] = []
status_changed: list[dict] = []
data_added: list[dict] = []
data_changed_requires_refresh: list[dict] = []
data_removed: list[dict] = []
metadata_changed: list[dict] = []
updated_only: list[dict] = []
download_issue_ids: set[int] = set()
refresh_issue_ids: set[int] = set()
manual_review_issue_ids: set[int] = set()
for issue_id, current_item in sorted(current_by_id.items()):
previous_item = previous_by_id.get(issue_id)
if previous_item is None:
new_issue_ids.append(issue_id)
if has_downloadable_data(current_item):
download_issue_ids.add(issue_id)
continue
changed_fields = top_level_changed_fields(previous_item, current_item)
if not changed_fields:
continue
previous_has_data = has_downloadable_data(previous_item)
current_has_data = has_downloadable_data(current_item)
previous_data_signature = data_signature(previous_item)
current_data_signature = data_signature(current_item)
status_changed_flag = previous_item.get("status") != current_item.get("status")
data_changed_flag = previous_data_signature != current_data_signature
if status_changed_flag:
status_changed.append(
{
"id": issue_id,
"name": current_item.get("name"),
"previous_status": previous_item.get("status"),
"previous_status_label": status_label(previous_item.get("status")),
"current_status": current_item.get("status"),
"current_status_label": status_label(current_item.get("status")),
"previous_updated_at": previous_item.get("updated_at"),
"current_updated_at": current_item.get("updated_at"),
}
)
if data_changed_flag and not previous_has_data and current_has_data:
data_added.append(
{
"id": issue_id,
"name": current_item.get("name"),
"previous_data": {field_name: previous_item.get(field_name) for field_name in DATA_FIELDS},
"current_data": {field_name: current_item.get(field_name) for field_name in DATA_FIELDS},
}
)
download_issue_ids.add(issue_id)
elif data_changed_flag and previous_has_data and current_has_data:
entry = {
"id": issue_id,
"name": current_item.get("name"),
"previous_data": {field_name: previous_item.get(field_name) for field_name in DATA_FIELDS},
"current_data": {field_name: current_item.get(field_name) for field_name in DATA_FIELDS},
}
data_changed_requires_refresh.append(entry)
if refresh_changed_issues:
refresh_issue_ids.add(issue_id)
download_issue_ids.add(issue_id)
else:
manual_review_issue_ids.add(issue_id)
elif data_changed_flag and previous_has_data and not current_has_data:
data_removed.append(
{
"id": issue_id,
"name": current_item.get("name"),
"previous_data": {field_name: previous_item.get(field_name) for field_name in DATA_FIELDS},
"current_data": {field_name: current_item.get(field_name) for field_name in DATA_FIELDS},
}
)
manual_review_issue_ids.add(issue_id)
remaining_fields = [
field_name
for field_name in changed_fields
if field_name not in {"updated_at", "status", *DATA_FIELDS}
]
if remaining_fields:
metadata_changed.append(
{
"id": issue_id,
"name": current_item.get("name"),
"changed_fields": remaining_fields,
"previous_updated_at": previous_item.get("updated_at"),
"current_updated_at": current_item.get("updated_at"),
}
)
elif changed_fields == ["updated_at"]:
updated_only.append(
{
"id": issue_id,
"name": current_item.get("name"),
"previous_updated_at": previous_item.get("updated_at"),
"current_updated_at": current_item.get("updated_at"),
}
)
for issue_id, previous_item in sorted(previous_by_id.items()):
if issue_id not in current_by_id:
removed_issue_ids.append(issue_id)
return {
"previous_total": len(previous_by_id),
"current_total": len(current_by_id),
"new_issues": format_issue_set(current_by_id, new_issue_ids),
"removed_issues": [summarize_removed_issue(issue_id, previous_by_id[issue_id]) for issue_id in removed_issue_ids],
"status_changed_issues": status_changed,
"data_added_issues": data_added,
"data_changed_requires_refresh": data_changed_requires_refresh,
"data_removed_issues": data_removed,
"metadata_changed_issues": metadata_changed,
"updated_only_issues": updated_only,
"download_issue_ids": sorted(download_issue_ids),
"refresh_issue_ids": sorted(refresh_issue_ids),
"manual_review_issue_ids": sorted(manual_review_issue_ids),
}
def remove_path(path: Path, dry_run: bool) -> dict:
if not path.exists():
return {"path": str(path), "status": "missing"}
if dry_run:
return {"path": str(path), "status": "planned_remove"}
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
return {"path": str(path), "status": "removed"}
def issue_root(path_root: Path, issue_id: int) -> Path:
return path_root / f"issue_{issue_id}"
def count_local_video_cases(download_root: Path, issue_id: int) -> int:
issue_dir = issue_root(download_root, issue_id)
if not issue_dir.is_dir():
return 0
return sum(1 for path in issue_dir.rglob("camera4.bin") if path.parent.name == "sigmastar.1")
def count_local_clip_export_cases(download_root: Path, issue_id: int) -> int:
issue_dir = issue_root(download_root, issue_id)
if not issue_dir.is_dir():
return 0
return sum(
1
for path in issue_dir.iterdir()
if path.is_dir() and path.name.startswith("clip_export_") and (path / "images").is_dir() and (path / "calib" / "L2_calib" / "camera4.json").is_file()
)
def count_local_downloaded_cases(download_root: Path, issue_id: int) -> int:
return count_local_video_cases(download_root, issue_id) + count_local_clip_export_cases(download_root, issue_id)
def build_local_download_check(current_payload: dict, download_root: Path | None) -> dict:
if download_root is None:
return {
"enabled": False,
"download_root": "",
"download_root_exists": False,
"issues_with_downloadable_data": [],
"missing_issue_ids": [],
"incomplete_issue_ids": [],
"unparsed_issue_ids": [],
"actionable_issue_ids": [],
}
current_items = current_payload.get("items", [])
download_root_exists = download_root.exists()
issues_with_downloadable_data: list[dict] = []
missing_issue_ids: list[int] = []
incomplete_issue_ids: list[int] = []
unparsed_issue_ids: list[int] = []
for item in sorted(current_items, key=lambda issue: int(issue["id"])):
if not has_downloadable_data(item):
continue
issue_id = int(item["id"])
expected_refs = expected_download_refs(item)
expected_case_count = len(expected_refs)
local_case_count = count_local_downloaded_cases(download_root, issue_id)
issue_summary = summarize_issue(item)
issue_summary.update(
{
"expected_case_count": expected_case_count,
"local_case_count": local_case_count,
"expected_refs": expected_refs,
}
)
issues_with_downloadable_data.append(issue_summary)
if expected_case_count == 0:
unparsed_issue_ids.append(issue_id)
continue
if local_case_count == 0:
missing_issue_ids.append(issue_id)
continue
if local_case_count < expected_case_count:
incomplete_issue_ids.append(issue_id)
actionable_issue_ids = sorted(set(missing_issue_ids) | set(incomplete_issue_ids))
return {
"enabled": True,
"download_root": str(download_root),
"download_root_exists": download_root_exists,
"issues_with_downloadable_data": issues_with_downloadable_data,
"missing_issue_ids": missing_issue_ids,
"incomplete_issue_ids": incomplete_issue_ids,
"unparsed_issue_ids": unparsed_issue_ids,
"actionable_issue_ids": actionable_issue_ids,
}
def prepare_refresh_outputs(
download_root: Path,
inference_root: Path | None,
refresh_issue_ids: Iterable[int],
dry_run: bool,
) -> dict:
download_cleanup = []
inference_cleanup = []
for issue_id in sorted(set(refresh_issue_ids)):
download_cleanup.append(remove_path(issue_root(download_root, issue_id), dry_run=dry_run))
if inference_root is not None:
inference_cleanup.append(remove_path(issue_root(inference_root, issue_id), dry_run=dry_run))
return {
"download_cleanup": download_cleanup,
"inference_cleanup": inference_cleanup,
}
def run_download(
args: argparse.Namespace,
issue_ids: list[int],
output_json: Path,
) -> dict:
if not args.download_root:
raise ValueError("--download-root is required when --run-download is used")
if not issue_ids:
return {"ran": False, "issue_ids": [], "manifest_path": None, "summary": {}}
manifest_path = (
Path(args.download_manifest_path).resolve()
if args.download_manifest_path
else Path(args.download_root).resolve() / "download_manifest.json"
)
command = [
str(Path(args.python_bin).resolve()),
str(Path(args.download_script).resolve()),
"--input-json",
str(output_json),
"--output-root",
str(Path(args.download_root).resolve()),
"--manifest-path",
str(manifest_path),
]
if args.dry_run:
command.append("--dry-run")
for issue_id in issue_ids:
command.extend(["--issue-id", str(issue_id)])
command.extend(args.download_arg)
completed = run_command(command, cwd=ROOT, stream_output=True)
manifest = {}
if not args.dry_run and manifest_path.is_file():
manifest = load_json(manifest_path)
return {
"ran": True,
"issue_ids": issue_ids,
"manifest_path": str(manifest_path),
"summary": manifest.get("summary", {}),
"stdout": completed.stdout.strip(),
}
def issue_has_downloaded_cases(download_root: Path, issue_id: int) -> bool:
return count_local_downloaded_cases(download_root, issue_id) > 0
def issue_has_video_cases(download_root: Path, issue_id: int) -> bool:
return count_local_video_cases(download_root, issue_id) > 0
def resolve_pdcl_auth_env() -> dict[str, str]:
resolved = {
key: value
for key in ("STS_UID", "STS_SECRET_KEY")
if (value := os.environ.get(key))
}
if len(resolved) == 2:
return resolved
auth_source = ROOT / "tools" / "pdcl_inference" / "get_clips_of_aeb.py"
text = auth_source.read_text(encoding="utf-8")
patterns = {
"STS_UID": re.compile(r'STS_UID"?,\s*"([^"]+)"|STS_UID\']\s*=\s*\'([^\']+)\''),
"STS_SECRET_KEY": re.compile(r'STS_SECRET_KEY"?,\s*"([^"]+)"|STS_SECRET_KEY\']\s*=\s*\'([^\']+)\''),
}
for key, pattern in patterns.items():
if key in resolved:
continue
match = pattern.search(text)
if match is None:
raise ValueError(f"Failed to resolve {key} from {auth_source}")
resolved[key] = next(group for group in match.groups() if group)
return resolved
def extract_pdcl_raw_ids(item: dict) -> list[str]:
raw_ids: list[str] = []
seen: set[str] = set()
for _, raw_value in iter_issue_fields(item):
for ref in extract_pdcl_refs(raw_value):
raw_id = ref.split("::", 1)[0].strip()
if not raw_id or raw_id in seen:
continue
seen.add(raw_id)
raw_ids.append(raw_id)
return raw_ids
def resolve_clip_ids_for_issue(item: dict, pdcl_auth_env: dict[str, str]) -> tuple[list[str], list[str]]:
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT))
previous_env = {key: os.environ.get(key) for key in pdcl_auth_env}
os.environ.update(pdcl_auth_env)
try:
from tools.pdcl_inference.get_clips_of_aeb import get_clip_ukeys_from_raw
raw_ids = extract_pdcl_raw_ids(item)
clip_ids: list[str] = []
seen: set[str] = set()
for raw_id in raw_ids:
for clip_id in get_clip_ukeys_from_raw(raw_id):
clip_id = str(clip_id).strip()
if not clip_id or clip_id in seen:
continue
seen.add(clip_id)
clip_ids.append(clip_id)
return raw_ids, clip_ids
finally:
for key, value in previous_env.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
def run_clip_fallback_inference(
args: argparse.Namespace,
issue_ids: list[int],
issue_json_path: Path,
) -> dict:
if not issue_ids:
return {
"ran": False,
"issue_ids": [],
"summary": {},
"detail": "no issues require clip fallback inference",
"fallback_results": [],
}
payload = load_json(issue_json_path)
issue_lookup = build_issue_lookup(payload)
pdcl_auth_env = resolve_pdcl_auth_env()
clip_list_dir = Path(args.inference_root).resolve() / "_clip_fallback"
if not args.dry_run:
clip_list_dir.mkdir(parents=True, exist_ok=True)
summary: dict[str, int] = {}
fallback_results: list[dict[str, Any]] = []
successful_issue_ids: list[int] = []
for issue_id in issue_ids:
item = issue_lookup.get(issue_id)
if item is None:
summary["skipped_missing_issue_record"] = summary.get("skipped_missing_issue_record", 0) + 1
fallback_results.append({
"issue_id": issue_id,
"status": "skipped_missing_issue_record",
"detail": "issue id not found in exported issue json",
})
continue
raw_ids = extract_pdcl_raw_ids(item)
if not raw_ids:
summary["skipped_no_pdcl_raw"] = summary.get("skipped_no_pdcl_raw", 0) + 1
fallback_results.append({
"issue_id": issue_id,
"status": "skipped_no_pdcl_raw",
"detail": "no PDCL raw ids found in issue data fields",
})
continue
if args.dry_run:
summary["planned_clip_fallback"] = summary.get("planned_clip_fallback", 0) + 1
fallback_results.append({
"issue_id": issue_id,
"status": "planned_clip_fallback",
"detail": f"would resolve clips from raw ids: {raw_ids}",
"raw_ids": raw_ids,
})
successful_issue_ids.append(issue_id)
continue
try:
raw_ids, clip_ids = resolve_clip_ids_for_issue(item, pdcl_auth_env)
if not clip_ids:
summary["failed_no_clip_ids"] = summary.get("failed_no_clip_ids", 0) + 1
fallback_results.append({
"issue_id": issue_id,
"status": "failed_no_clip_ids",
"detail": f"resolved raw ids but found no clip ids: {raw_ids}",
"raw_ids": raw_ids,
})
continue
clip_list_path = clip_list_dir / f"issue_{issue_id}.clip_fallback.txt"
clip_list_path.write_text("\n".join(clip_ids) + "\n", encoding="utf-8")
command = [
str(Path(args.python_bin).resolve()),
str(DEFAULT_CLIP_FALLBACK_INFERENCE_SCRIPT.resolve()),
"--clip-list-file",
str(clip_list_path),
"--export-root",
str(issue_root(Path(args.download_root).resolve(), issue_id)),
"--output-dir",
str(issue_root(Path(args.inference_root).resolve(), issue_id)),
"--output-prefix",
"clip_export",
"--camera-topic",
"camera4",
"--save-aggregate-predictions",
]
if args.exported_model:
command.extend(["--exported-model", args.exported_model])
if args.skip_existing_inference:
command.append("--skip-done")
for extra_arg in args.inference_arg:
command.append(extra_arg)
env = os.environ.copy()
env.update(pdcl_auth_env)
completed = run_command(command, cwd=ROOT, env=env, stream_output=True)
summary["clip_fallback_success"] = summary.get("clip_fallback_success", 0) + 1
fallback_results.append({
"issue_id": issue_id,
"status": "clip_fallback_success",
"detail": f"resolved {len(clip_ids)} clips via {len(raw_ids)} raw ids",
"raw_ids": raw_ids,
"clip_ids": clip_ids,
"clip_list_file": str(clip_list_path),
"stdout": completed.stdout.strip(),
})
successful_issue_ids.append(issue_id)
except Exception as exc:
summary["clip_fallback_failed"] = summary.get("clip_fallback_failed", 0) + 1
fallback_results.append({
"issue_id": issue_id,
"status": "clip_fallback_failed",
"detail": f"{type(exc).__name__}: {exc}",
"raw_ids": raw_ids,
})
return {
"ran": bool(fallback_results),
"issue_ids": successful_issue_ids,
"summary": summary,
"detail": "" if successful_issue_ids else "clip fallback produced no runnable issues",
"fallback_results": fallback_results,
}
def resolve_tracking_model_version(tracking_model_version: str, exported_model: str) -> str:
if tracking_model_version:
return tracking_model_version
if exported_model:
match = re.search(r"([0-9]{8})", exported_model)
if match:
return match.group(1)
return ""
def run_inference(
args: argparse.Namespace,
issue_ids: list[int],
issue_json_path: Path,
) -> dict:
if not args.inference_root:
raise ValueError("--inference-root is required when --run-inference is used")
if not args.download_root:
raise ValueError("--download-root is required when --run-inference is used")
if not issue_ids:
return {
"ran": False,
"issue_ids": [],
"manifest_path": None,
"summary": {},
"detail": "no issue ids selected for inference",
"fallback": {
"ran": False,
"issue_ids": [],
"summary": {},
"detail": "",
"fallback_results": [],
},
}
download_root = Path(args.download_root).resolve()
runnable_issue_ids = [issue_id for issue_id in issue_ids if issue_has_video_cases(download_root, issue_id)]
fallback_issue_ids = [issue_id for issue_id in issue_ids if issue_id not in runnable_issue_ids]
manifest_path = (
Path(args.inference_manifest_path).resolve()
if args.inference_manifest_path
else Path(args.inference_root).resolve() / "inference_manifest.json"
)
summary: dict[str, int] = {}
successful_issue_ids: list[int] = []
combined_stdout_parts: list[str] = []
fallback_result = {
"ran": False,
"issue_ids": [],
"summary": {},
"detail": "",
"fallback_results": [],
}
if runnable_issue_ids:
command = [
str(Path(args.python_bin).resolve()),
str(Path(args.inference_script).resolve()),
"--download-root",
str(download_root),
"--output-root",
str(Path(args.inference_root).resolve()),
"--manifest-path",
str(manifest_path),
]
if args.use_issue_frame_window:
command.extend(
[
"--issue-json",
str(issue_json_path),
"--use-issue-frame-window",
"--frame-before",
str(args.frame_before),
"--frame-after",
str(args.frame_after),
"--missing-issue-frame-policy",
args.missing_issue_frame_policy,
]
)
if args.exported_model:
command.extend(["--exported-model", args.exported_model])
if args.skip_existing_inference:
command.append("--skip-existing")
if args.dry_run:
command.append("--dry-run")
for issue_id in runnable_issue_ids:
command.extend(["--issue-id", str(issue_id)])
for extra_arg in args.inference_arg:
command.append(f"--inference-arg={extra_arg}")
completed = run_command(command, cwd=ROOT, stream_output=True)
manifest = {}
if not args.dry_run and manifest_path.is_file():
manifest = load_json(manifest_path)
summary.update(manifest.get("summary", {}))
successful_issue_ids.extend(runnable_issue_ids)
combined_stdout_parts.append(completed.stdout.strip())
if fallback_issue_ids:
log_progress(
"[stage=inference] clip fallback required: "
+ describe_issue_ids(load_json(issue_json_path), fallback_issue_ids)
)
fallback_result = run_clip_fallback_inference(args, fallback_issue_ids, issue_json_path)
for key, value in fallback_result.get("summary", {}).items():
summary[key] = summary.get(key, 0) + value
successful_issue_ids.extend(fallback_result.get("issue_ids", []))
if not runnable_issue_ids and not fallback_result.get("ran"):
return {
"ran": False,
"issue_ids": [],
"manifest_path": None,
"summary": {},
"detail": "no downloaded camera4.bin cases found and no clip fallback issues were runnable",
"fallback": fallback_result,
}
return {
"ran": bool(runnable_issue_ids) or bool(fallback_result.get("ran")),
"issue_ids": sorted(set(successful_issue_ids)),
"manifest_path": str(manifest_path),
"summary": summary,
"stdout": "\n".join(part for part in combined_stdout_parts if part),
"fallback": fallback_result,
}
def run_tracking(
args: argparse.Namespace,
issue_ids: list[int],
) -> dict:
if not args.inference_root:
raise ValueError("--inference-root is required when --run-tracking is used")
if args.dry_run:
return {
"ran": False,
"issue_ids": [],
"script": str(Path(args.issue_tracking_script).resolve()),
"tracking_model_version": "",
"detail": "tracking skipped because dry-run is enabled",
}
if not issue_ids:
return {
"ran": False,
"issue_ids": [],
"script": str(Path(args.issue_tracking_script).resolve()),
"tracking_model_version": "",
"detail": "no inferred issue ids available for tracking",
}
tracking_script = Path(args.issue_tracking_script).resolve()
tracking_model_version = resolve_tracking_model_version(
args.tracking_model_version,
args.exported_model,
)
command = ["bash", str(tracking_script), str(Path(args.inference_root).resolve())]
for issue_id in issue_ids:
command.extend(["--issue-id", str(issue_id)])
env = os.environ.copy()
env["PYTHON_BIN"] = str(Path(args.python_bin).resolve())
if tracking_model_version:
env["TRACKING_MODEL_VERSION"] = tracking_model_version
completed = run_command(command, cwd=ROOT, env=env, stream_output=True)
return {
"ran": True,
"issue_ids": issue_ids,
"script": str(tracking_script),
"tracking_model_version": tracking_model_version,
"stdout": completed.stdout.strip(),
}
def build_manifest(
args: argparse.Namespace,
previous_payload: dict | None,
current_payload: dict,
diff: dict,
local_download_check: dict,
requested_issue_ids: list[int],
issue_name_keywords: list[str],
matched_issue_ids: list[int],
issue_id_min: int | None,
issue_id_max: int | None,
matched_issue_ids_by_range: list[int],
matched_issue_ids_without_downloadable_data: list[int],
planned_download_issue_ids: list[int],
planned_inference_issue_ids: list[int],
planned_refresh_issue_ids: list[int],
planned_manual_review_issue_ids: list[int],
refresh_cleanup: dict,
download_result: dict,
inference_result: dict,
tracking_result: dict,
) -> dict:
output_json = Path(args.output_json).resolve()
manifest_path = (
Path(args.sync_manifest_path).resolve()
if args.sync_manifest_path
else output_json.with_name(f"{output_json.name}.sync_manifest.json")
)
return {
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"project_key": args.project_key,
"user_key": args.user_key,
"view_name": args.view_name,
"work_item_type": args.work_item_type,
"output_json": str(output_json),
"sync_manifest_path": str(manifest_path),
"snapshot_dir": str(Path(args.snapshot_dir).resolve()) if args.snapshot_dir else "",
"dry_run": args.dry_run,
"refresh_changed_issues": args.refresh_changed_issues,
"run_download": args.run_download,
"run_inference": args.run_inference,
"run_tracking": args.run_tracking,
"use_issue_frame_window": args.use_issue_frame_window,
"frame_before": args.frame_before,
"frame_after": args.frame_after,
"missing_issue_frame_policy": args.missing_issue_frame_policy,
"skip_existing_inference": args.skip_existing_inference,
"requested_issue_ids": requested_issue_ids,
"issue_name_keywords": issue_name_keywords,
"matched_issue_ids": matched_issue_ids,
"issue_id_min": issue_id_min,
"issue_id_max": issue_id_max,
"matched_issue_ids_by_range": matched_issue_ids_by_range,
"matched_issue_ids_without_downloadable_data": matched_issue_ids_without_downloadable_data,
"planned_download_issue_ids": planned_download_issue_ids,
"planned_inference_issue_ids": planned_inference_issue_ids,
"planned_refresh_issue_ids": planned_refresh_issue_ids,
"planned_manual_review_issue_ids": planned_manual_review_issue_ids,
"exported_model": args.exported_model,
"tracking_model_version": args.tracking_model_version,
"previous_exported_at": None if previous_payload is None else previous_payload.get("exported_at"),
"current_exported_at": current_payload.get("exported_at"),
"previous_total": diff["previous_total"],
"current_total": diff["current_total"],
"changes": diff,
"local_download_check": local_download_check,
"actionable_download_issue_ids": planned_download_issue_ids,
"refresh_cleanup": refresh_cleanup,
"download": download_result,
"inference": inference_result,
"tracking": tracking_result,
}
def print_summary(manifest: dict) -> None:
changes = manifest["changes"]
print(f"view_name: {manifest['view_name']}")
print(f"output_json: {manifest['output_json']}")
print(f"dry_run: {manifest['dry_run']}")
if manifest.get("exported_model"):
print(f"exported_model: {manifest['exported_model']}")
if manifest.get("use_issue_frame_window"):
print(
"issue_frame_window: "
f"before={manifest['frame_before']} after={manifest['frame_after']} "
f"missing_policy={manifest['missing_issue_frame_policy']}"
)
if manifest.get("run_tracking"):
tracking_version = manifest["tracking"].get("tracking_model_version") or manifest.get("tracking_model_version")
if tracking_version:
print(f"tracking_model_version: {tracking_version}")
print(f"previous_total: {manifest['previous_total']}")
print(f"current_total: {manifest['current_total']}")
print(f"new_issues: {len(changes['new_issues'])}")
print(f"status_changed_issues: {len(changes['status_changed_issues'])}")
print(f"data_added_issues: {len(changes['data_added_issues'])}")
print(f"data_changed_requires_refresh: {len(changes['data_changed_requires_refresh'])}")
print(f"data_removed_issues: {len(changes['data_removed_issues'])}")
print(f"metadata_changed_issues: {len(changes['metadata_changed_issues'])}")
print(f"updated_only_issues: {len(changes['updated_only_issues'])}")
print(f"removed_issues: {len(changes['removed_issues'])}")
if manifest.get("requested_issue_ids"):
print(f"requested_issue_ids: {manifest['requested_issue_ids']}")
if manifest.get("issue_name_keywords"):
print(f"issue_name_keywords: {manifest['issue_name_keywords']}")
print(f"matched_issue_ids: {manifest['matched_issue_ids']}")
print(
"matched_issue_ids_without_downloadable_data: "
f"{manifest['matched_issue_ids_without_downloadable_data']}"
)
if manifest.get("issue_id_min") is not None or manifest.get("issue_id_max") is not None:
print(f"issue_id_min: {manifest['issue_id_min']}")
print(f"issue_id_max: {manifest['issue_id_max']}")
print(f"matched_issue_ids_by_range: {manifest['matched_issue_ids_by_range']}")
print(f"download_issue_ids: {changes['download_issue_ids']}")
print(f"refresh_issue_ids: {changes['refresh_issue_ids']}")
print(f"manual_review_issue_ids: {changes['manual_review_issue_ids']}")
local_download_check = manifest["local_download_check"]
if local_download_check.get("enabled"):
print(
"local_downloadable_issues: "
f"{len(local_download_check.get('issues_with_downloadable_data', []))}"
)
print(f"local_missing_issue_ids: {local_download_check.get('missing_issue_ids', [])}")
print(f"local_incomplete_issue_ids: {local_download_check.get('incomplete_issue_ids', [])}")
print(f"local_unparsed_issue_ids: {local_download_check.get('unparsed_issue_ids', [])}")
print(f"planned_download_issue_ids: {manifest['planned_download_issue_ids']}")
print(f"planned_inference_issue_ids: {manifest['planned_inference_issue_ids']}")
print(f"planned_refresh_issue_ids: {manifest['planned_refresh_issue_ids']}")
print(f"planned_manual_review_issue_ids: {manifest['planned_manual_review_issue_ids']}")
if manifest["download"].get("ran"):
print(f"download_summary: {manifest['download'].get('summary', {})}")
if manifest["inference"].get("ran"):
print(f"inference_summary: {manifest['inference'].get('summary', {})}")
elif manifest["inference"].get("detail"):
print(f"inference_detail: {manifest['inference']['detail']}")
if manifest["tracking"].get("ran"):
print(f"tracking_issue_ids: {manifest['tracking'].get('issue_ids', [])}")
elif manifest["tracking"].get("detail"):
print(f"tracking_detail: {manifest['tracking']['detail']}")
def main() -> int:
args = parse_args()
if (
args.issue_id_min is not None
and args.issue_id_max is not None
and args.issue_id_min > args.issue_id_max
):
raise ValueError(
f"--issue-id-min must be <= --issue-id-max: {args.issue_id_min} > {args.issue_id_max}"
)
output_json = Path(args.output_json).resolve()
requested_issue_ids = sorted(set(args.issue_id))
issue_name_keywords = sanitize_issue_name_keywords(args.issue_name_keyword)
previous_payload = load_json(output_json) if output_json.is_file() else None
current_payload, live_export_path = export_latest_view(args)
try:
diff = build_diff(previous_payload, current_payload, refresh_changed_issues=args.refresh_changed_issues)
issue_lookup = build_issue_lookup(current_payload)
matched_issue_ids = collect_issue_ids_by_name_keywords(current_payload, issue_name_keywords)
matched_issue_ids_without_downloadable_data = [
issue_id
for issue_id in matched_issue_ids
if not has_downloadable_data(issue_lookup[issue_id])
]
matched_issue_ids_by_range = collect_issue_ids_by_id_range(
current_payload,
args.issue_id_min,
args.issue_id_max,
)
download_root = Path(args.download_root).resolve() if args.download_root else None
local_download_check = build_local_download_check(current_payload, download_root)
default_actionable_download_issue_ids = sorted(
set(diff["download_issue_ids"]) | set(local_download_check["actionable_issue_ids"])
)
planned_download_issue_ids = list(default_actionable_download_issue_ids)
planned_inference_issue_ids = list(default_actionable_download_issue_ids)
planned_refresh_issue_ids = list(diff["refresh_issue_ids"])
planned_manual_review_issue_ids = list(diff["manual_review_issue_ids"])
if issue_name_keywords:
planned_download_issue_ids = sorted(
set(default_actionable_download_issue_ids) & set(matched_issue_ids)
)
planned_inference_issue_ids = [
issue_id
for issue_id in matched_issue_ids
if has_downloadable_data(issue_lookup[issue_id])
]
planned_refresh_issue_ids = [
issue_id for issue_id in planned_refresh_issue_ids if issue_id in matched_issue_ids
]
planned_manual_review_issue_ids = [
issue_id
for issue_id in planned_manual_review_issue_ids
if issue_id in matched_issue_ids
]
log_progress(
"[stage=plan] issue-name keyword filter enabled: "
f"{issue_name_keywords} -> {describe_issue_ids(current_payload, matched_issue_ids)}"
)
if matched_issue_ids_without_downloadable_data:
log_progress(
"[stage=plan] matched issues without downloadable data: "
+ describe_issue_ids(
current_payload,
matched_issue_ids_without_downloadable_data,
)
)
if args.issue_id_min is not None or args.issue_id_max is not None:
range_filter = set(matched_issue_ids_by_range)
planned_download_issue_ids = [
issue_id for issue_id in planned_download_issue_ids if issue_id in range_filter
]
planned_inference_issue_ids = [
issue_id for issue_id in planned_inference_issue_ids if issue_id in range_filter
]
planned_refresh_issue_ids = [
issue_id for issue_id in planned_refresh_issue_ids if issue_id in range_filter
]
planned_manual_review_issue_ids = [
issue_id for issue_id in planned_manual_review_issue_ids if issue_id in range_filter
]
log_progress(
"[stage=plan] issue-id range filter enabled: "
f"min={args.issue_id_min} max={args.issue_id_max} -> "
+ describe_issue_ids(current_payload, matched_issue_ids_by_range)
)
if requested_issue_ids:
planned_download_issue_ids = requested_issue_ids
planned_inference_issue_ids = requested_issue_ids
planned_refresh_issue_ids = [
issue_id for issue_id in planned_refresh_issue_ids if issue_id in requested_issue_ids
]
planned_manual_review_issue_ids = [
issue_id
for issue_id in planned_manual_review_issue_ids
if issue_id in requested_issue_ids
]
log_progress(
"[stage=plan] explicit issue filter enabled: "
+ describe_issue_ids(current_payload, requested_issue_ids)
)
if issue_name_keywords:
log_progress(
"[stage=plan] explicit issue filter overrides the issue-name keyword plan"
)
log_progress(
"[stage=plan] planned download set: "
+ describe_issue_ids(current_payload, planned_download_issue_ids)
)
log_progress(
"[stage=plan] planned inference set: "
+ describe_issue_ids(current_payload, planned_inference_issue_ids)
)
if planned_manual_review_issue_ids:
log_progress(
"[stage=plan] manual review required: "
+ describe_issue_ids(current_payload, planned_manual_review_issue_ids)
)
refresh_cleanup = {"download_cleanup": [], "inference_cleanup": []}
if args.refresh_changed_issues and planned_refresh_issue_ids and download_root is not None:
inference_root = Path(args.inference_root).resolve() if args.inference_root else None
log_progress(
"[stage=refresh] cleaning outputs for changed issues: "
+ describe_issue_ids(current_payload, planned_refresh_issue_ids)
)
refresh_cleanup = prepare_refresh_outputs(
download_root=download_root,
inference_root=inference_root,
refresh_issue_ids=planned_refresh_issue_ids,
dry_run=args.dry_run,
)
effective_output_json = live_export_path if args.dry_run else output_json
download_result = {"ran": False, "issue_ids": [], "manifest_path": None, "summary": {}}
if args.run_download:
if planned_download_issue_ids:
log_progress(
"[stage=download] start: "
+ describe_issue_ids(current_payload, planned_download_issue_ids)
)
else:
log_progress("[stage=download] skipped: no actionable issues")
download_result = run_download(args, planned_download_issue_ids, effective_output_json)
if download_result.get("ran"):
log_progress(
"[stage=download] done: "
+ summarize_status_counts(download_result.get("summary", {}))
)
inference_result = {"ran": False, "issue_ids": [], "manifest_path": None, "summary": {}}
if args.run_inference:
log_progress(
"[stage=inference] start: "
+ describe_issue_ids(current_payload, planned_inference_issue_ids)
)
inference_result = run_inference(args, planned_inference_issue_ids, effective_output_json)
if inference_result.get("ran"):
log_progress(
"[stage=inference] done: "
+ summarize_status_counts(inference_result.get("summary", {}))
)
elif inference_result.get("detail"):
log_progress(f"[stage=inference] skipped: {inference_result['detail']}")
tracking_result = {
"ran": False,
"issue_ids": [],
"script": str(Path(args.issue_tracking_script).resolve()),
"tracking_model_version": "",
}
if args.run_tracking:
tracking_issue_ids = inference_result.get("issue_ids", []) if args.run_inference else []
log_progress(
"[stage=tracking] start: "
+ describe_issue_ids(current_payload, tracking_issue_ids)
)
tracking_result = run_tracking(args, tracking_issue_ids)
if tracking_result.get("ran"):
log_progress(
"[stage=tracking] done: "
+ describe_issue_ids(current_payload, tracking_result.get("issue_ids", []))
)
elif tracking_result.get("detail"):
log_progress(f"[stage=tracking] skipped: {tracking_result['detail']}")
manifest = build_manifest(
args=args,
previous_payload=previous_payload,
current_payload=current_payload,
diff=diff,
local_download_check=local_download_check,
requested_issue_ids=requested_issue_ids,
issue_name_keywords=issue_name_keywords,
matched_issue_ids=matched_issue_ids,
issue_id_min=args.issue_id_min,
issue_id_max=args.issue_id_max,
matched_issue_ids_by_range=matched_issue_ids_by_range,
matched_issue_ids_without_downloadable_data=matched_issue_ids_without_downloadable_data,
planned_download_issue_ids=planned_download_issue_ids,
planned_inference_issue_ids=planned_inference_issue_ids,
planned_refresh_issue_ids=planned_refresh_issue_ids,
planned_manual_review_issue_ids=planned_manual_review_issue_ids,
refresh_cleanup=refresh_cleanup,
download_result=download_result,
inference_result=inference_result,
tracking_result=tracking_result,
)
manifest_path = Path(manifest["sync_manifest_path"])
if not args.dry_run:
save_json(manifest_path, manifest)
print_summary(manifest)
if args.dry_run:
print(f"sync_manifest (not written in dry-run): {manifest_path}")
else:
print(f"sync_manifest: {manifest_path}")
return 0
finally:
live_export_path.unlink(missing_ok=True)
if __name__ == "__main__":
sys.exit(main())