""" 统计 aeb_clips*.json 中各场景的 rawid 数量和 clip 数量。 默认按每条 rawid 记录里的 "场景" 字段聚合,输出同名 *_stats.md。 示例: python tools/pdcl_inference/stat_aeb_clips_by_scene.py \ --input tools/pdcl_inference/aeb_clips-20260322152509_to_20260430005758.json python tools/pdcl_inference/stat_aeb_clips_by_scene.py \ --input tools/pdcl_inference/aeb_clips-20260322152509_to_20260430005758.json \ --json-output /tmp/aeb_scene_stats.json """ import argparse import json from collections import OrderedDict from pathlib import Path from typing import Any, Iterable FILE = Path(__file__).resolve() DEFAULT_INPUT = FILE.parent / "aeb_clips-20260322152509_to_20260430005758.json" def _to_str(value: Any) -> str: if value is None: return "" return str(value).strip() def _default_output_path(input_path: Path) -> Path: return input_path.with_name(f"{input_path.stem}_stats.md") def normalize_scene_name(scene_name: str) -> str: """归一化历史表格中大小写不一致的场景名。""" aliases = { "CBLA-cn21": "CBLA-CN21", "CBLA-cn24": "CBLA-CN24", } return aliases.get(scene_name, scene_name) def load_scenario_records(input_path: Path) -> OrderedDict[str, list[dict[str, Any]]]: with input_path.open("r", encoding="utf-8") as file: payload = json.load(file) if not isinstance(payload, dict): raise ValueError(f"输入 JSON 顶层必须是 dict,实际: {type(payload).__name__}") scenarios = payload.get("scenarios", payload) if not isinstance(scenarios, dict): raise ValueError("输入 JSON 的 scenarios 字段必须是 dict") records_by_key: OrderedDict[str, list[dict[str, Any]]] = OrderedDict() for scenario_key, records in scenarios.items(): if not isinstance(records, list): raise ValueError( f"场景 {scenario_key} 对应的数据必须是 list,实际: {type(records).__name__}" ) records_by_key[str(scenario_key)] = records return records_by_key def iter_records(records_by_key: dict[str, list[dict[str, Any]]]) -> Iterable[tuple[str, dict[str, Any]]]: for scenario_key, records in records_by_key.items(): for record in records: if not isinstance(record, dict): raise ValueError( f"场景 {scenario_key} 中的记录必须是 dict,实际: {type(record).__name__}" ) yield scenario_key, record def get_record_scene_name(scenario_key: str, record: dict[str, Any], normalize_scene: bool) -> str: scene_name = _to_str(record.get("场景")) if not scene_name: scene_name = scenario_key.split("-", 1)[0] if scenario_key else "未知场景" return normalize_scene_name(scene_name) if normalize_scene else scene_name def get_record_rawid(record: dict[str, Any]) -> str: return _to_str(record.get("rawid") or record.get("raw_id")) def get_record_clips(record: dict[str, Any], raw_id: str) -> list[str]: raw_clips = record.get("clips", []) if raw_clips is None: return [] if not isinstance(raw_clips, list): raise ValueError(f"rawid={raw_id or 'UNKNOWN'} 的 clips 字段必须是 list") return [clip for clip in (_to_str(item) for item in raw_clips) if clip] def build_scene_stats( records_by_key: dict[str, list[dict[str, Any]]], normalize_scene: bool = True, dedupe_rawids: bool = False, dedupe_clips: bool = False, ) -> tuple[list[dict[str, Any]], dict[str, int]]: grouped: OrderedDict[str, dict[str, Any]] = OrderedDict() for scenario_key, record in iter_records(records_by_key): raw_id = get_record_rawid(record) if not raw_id: continue scene_name = get_record_scene_name(scenario_key, record, normalize_scene=normalize_scene) clips = get_record_clips(record, raw_id) item = grouped.setdefault( scene_name, { "scene": scene_name, "rawid_count": 0, "clip_count": 0, "_rawids": OrderedDict(), "_clips": OrderedDict(), }, ) if dedupe_rawids: if raw_id not in item["_rawids"]: item["_rawids"][raw_id] = None item["rawid_count"] += 1 else: item["_rawids"][raw_id] = None item["rawid_count"] += 1 if dedupe_clips: for clip in clips: if clip not in item["_clips"]: item["_clips"][clip] = None item["clip_count"] += 1 else: for clip in clips: item["_clips"][clip] = None item["clip_count"] += len(clips) stats = [ { "scene": item["scene"], "rawid_count": item["rawid_count"], "unique_rawid_count": len(item["_rawids"]), "clip_count": item["clip_count"], "unique_clip_count": len(item["_clips"]), } for item in grouped.values() ] stats.sort(key=lambda item: item["scene"]) unique_rawids = { raw_id for item in grouped.values() for raw_id in item["_rawids"].keys() } unique_clips = { clip for item in grouped.values() for clip in item["_clips"].keys() } totals = { "scene_count": len(stats), "rawid_count": sum(item["rawid_count"] for item in stats), "unique_rawid_count": len(unique_rawids), "clip_count": sum(item["clip_count"] for item in stats), "unique_clip_count": len(unique_clips), } return stats, totals def render_markdown( input_path: Path, stats: list[dict[str, Any]], totals: dict[str, int], normalize_scene: bool, dedupe_rawids: bool, dedupe_clips: bool, ) -> str: lines = [ "# AEB Clips 场景统计", "", f"源文件:`{input_path}`", "", "## 统计口径", "", "- 按每条记录中的 `场景` 字段聚合统计。", ] if dedupe_rawids: lines.append("- `rawid数量` 为当前场景下去重后的 rawid 数。") else: lines.append("- `rawid数量` 为 rawid 记录数。") if dedupe_clips: lines.append("- `clip数量` 为当前场景下去重后的 clip 数。") else: lines.append("- `clip数量` 为各 rawid 记录中 `clips` 列表长度之和。") if normalize_scene: lines.extend( [ "- 场景名大小写归并:", " - `CBLA-cn21` 归并到 `CBLA-CN21`", " - `CBLA-cn24` 归并到 `CBLA-CN24`", ] ) else: lines.append("- 未做场景名归并。") lines.extend( [ "", "## 汇总", "", "| 指标 | 数量 |", "|---|---:|", f"| 场景数 | {totals['scene_count']} |", f"| rawid数量 | {totals['rawid_count']} |", f"| clip数量 | {totals['clip_count']} |", "", "## 场景统计", "", "| 场景 | rawid数量 | clip数量 |", "|---|---:|---:|", ] ) for item in stats: lines.append(f"| {item['scene']} | {item['rawid_count']} | {item['clip_count']} |") lines.append(f"| **TOTAL** | **{totals['rawid_count']}** | **{totals['clip_count']}** |") lines.append("") return "\n".join(lines) def save_json_stats(output_path: Path, stats: list[dict[str, Any]], totals: dict[str, int]) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", encoding="utf-8") as file: json.dump({"summary": totals, "scenes": stats}, file, ensure_ascii=False, indent=2) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="统计 aeb_clips*.json 中各场景的 rawid 数量和 clip 数量。" ) parser.add_argument( "--input", default=str(DEFAULT_INPUT), help=f"输入 aeb_clips*.json,默认: {DEFAULT_INPUT}", ) parser.add_argument( "--output", default=None, help="输出 Markdown 路径;未指定时输出到输入文件同目录的 *_stats.md", ) parser.add_argument( "--json-output", default=None, help="可选:同时输出机器可读 JSON 统计结果", ) parser.add_argument( "--no-normalize-scene", action="store_true", help="关闭历史场景名大小写归并", ) parser.add_argument( "--dedupe-rawids", action="store_true", help="按场景统计去重 rawid 数,而不是 rawid 记录数", ) parser.add_argument( "--dedupe-clips", action="store_true", help="按场景统计去重 clip 数,而不是 clips 列表长度之和", ) return parser.parse_args() def main() -> None: args = parse_args() input_path = Path(args.input) output_path = Path(args.output) if args.output else _default_output_path(input_path) records_by_key = load_scenario_records(input_path) stats, totals = build_scene_stats( records_by_key, normalize_scene=not args.no_normalize_scene, dedupe_rawids=args.dedupe_rawids, dedupe_clips=args.dedupe_clips, ) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text( render_markdown( input_path=input_path, stats=stats, totals=totals, normalize_scene=not args.no_normalize_scene, dedupe_rawids=args.dedupe_rawids, dedupe_clips=args.dedupe_clips, ), encoding="utf-8", ) if args.json_output: save_json_stats(Path(args.json_output), stats, totals) print(f"已保存 Markdown 统计: {output_path}") if args.json_output: print(f"已保存 JSON 统计: {args.json_output}") print( f"场景数={totals['scene_count']} rawid数量={totals['rawid_count']} " f"clip数量={totals['clip_count']}" ) if __name__ == "__main__": main()