import json from pathlib import Path from threading import Lock from typing import Dict, Optional class StatusStore: """Persistent status tracker for resumable batch inference.""" def __init__(self, status_file: Path): self.status_file = status_file self.lock = Lock() self._status: Dict[str, Dict[str, str]] = {} self.status_file.parent.mkdir(parents=True, exist_ok=True) if self.status_file.exists(): try: self._status = json.loads(self.status_file.read_text()) except Exception: self._status = {} def is_done(self, task_id: str) -> bool: return self._status.get(task_id, {}).get("state") == "done" def get(self, task_id: str) -> Optional[Dict[str, str]]: return self._status.get(task_id) def mark_running(self, task_id: str, detail: str) -> None: with self.lock: self._status[task_id] = {"state": "running", "detail": detail} self._flush() def mark_done(self, task_id: str, detail: str) -> None: with self.lock: self._status[task_id] = {"state": "done", "detail": detail} self._flush() def mark_failed(self, task_id: str, detail: str) -> None: with self.lock: self._status[task_id] = {"state": "failed", "detail": detail} self._flush() def summary(self) -> Dict[str, int]: done = 0 running = 0 failed = 0 for item in self._status.values(): state = item.get("state") if state == "done": done += 1 elif state == "running": running += 1 elif state == "failed": failed += 1 return {"done": done, "running": running, "failed": failed, "total": len(self._status)} def _flush(self) -> None: self.status_file.write_text(json.dumps(self._status, indent=2, ensure_ascii=False))