Files
yolov26_3d/tools/feishu_project/build_cncap_case_issue_json.py
2026-06-24 09:35:46 +08:00

222 lines
7.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""Build a synthetic issue JSON from a plain CNCAP case list."""
from __future__ import annotations
import argparse
import json
import re
import sys
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Any
DEFAULT_INPUT = Path(__file__).with_name("cncap_case.txt")
DEFAULT_ID_BASE = 9_000_000_000
PDCL_REF_RE = re.compile(r"ADAS_[^:/\\\s]+::[^/\\\s]*")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--input",
type=Path,
default=DEFAULT_INPUT,
help="Path to the CNCAP case list text file.",
)
parser.add_argument(
"--output",
type=Path,
required=True,
help="Path to the synthetic issue JSON output.",
)
parser.add_argument(
"--case-index-output",
type=Path,
default=None,
help="Optional companion index JSON output path.",
)
parser.add_argument(
"--id-base",
type=int,
default=DEFAULT_ID_BASE,
help="Synthetic issue id base. The first generated issue id is id_base + 1.",
)
return parser.parse_args()
def parse_case_line(raw_line: str, line_no: int) -> tuple[str, str] | None:
stripped = raw_line.strip()
if not stripped or stripped.startswith("#"):
return None
if "|" in stripped:
case_name, source = (part.strip() for part in stripped.split("|", 1))
else:
try:
case_name, source = stripped.rsplit(None, 1)
except ValueError as exc:
raise ValueError(
f"Line {line_no}: expected '<case_name> <case_ref>' or '<case_name> | <case_ref>', "
f"got: {raw_line.rstrip()!r}"
) from exc
if not case_name or not source:
raise ValueError(f"Line {line_no}: empty case name or case reference")
return case_name, source
def classify_source(source: str, line_no: int) -> tuple[str, str, str]:
if "mdi raw" in source.lower() or PDCL_REF_RE.search(source):
return "", source, "pdcl_mdi_download"
if "/" in source or "\\" in source:
return source, "", "standard_path"
raise ValueError(f"Line {line_no}: unsupported case reference format: {source!r}")
def build_payload(
*,
input_path: Path,
case_rows: list[dict[str, Any]],
id_base: int,
) -> tuple[dict[str, Any], dict[str, Any]]:
exported_at = datetime.now().astimezone().isoformat(timespec="seconds")
name_counter: Counter[str] = Counter()
items: list[dict[str, Any]] = []
index_cases: list[dict[str, Any]] = []
source_kind_counts: Counter[str] = Counter()
for ordinal, row in enumerate(case_rows, start=1):
case_name = row["case_name"]
source = row["source"]
line_no = row["line_no"]
raw_line = row["raw_line"]
name_counter[case_name] += 1
occurrence_index = name_counter[case_name]
issue_id = id_base + ordinal
standard_path, pdcl_ref, source_kind = classify_source(source, line_no)
source_kind_counts[source_kind] += 1
item = {
"id": issue_id,
"name": case_name,
"status": "CNCAP_CASE",
"created_at": None,
"updated_at": exported_at,
"问题数据地址": standard_path,
"问题数据地址_PDCL": pdcl_ref,
"问题发生frameid": None,
"source_line_no": line_no,
"case_ref": source,
"case_occurrence_index": occurrence_index,
"synthetic_issue_key": f"cncap_case_{ordinal:03d}",
}
items.append(item)
index_cases.append(
{
"id": issue_id,
"name": case_name,
"source_kind": source_kind,
"source_line_no": line_no,
"case_ref": source,
"case_occurrence_index": occurrence_index,
"raw_line": raw_line,
}
)
duplicate_names = sorted(name for name, count in name_counter.items() if count > 1)
payload = {
"exported_at": exported_at,
"source": {
"type": "cncap_case_txt",
"input_path": str(input_path.resolve()),
"id_base": id_base,
"work_item_type": "synthetic_issue",
"included_detail_fields": [
"问题数据地址",
"问题数据地址_PDCL",
"问题发生frameid",
],
},
"summary": {
"success": True,
"total": len(items),
"duplicate_name_count": len(duplicate_names),
"pdcl_case_count": source_kind_counts.get("pdcl_mdi_download", 0),
"standard_path_case_count": source_kind_counts.get("standard_path", 0),
},
"items": items,
}
case_index = {
"generated_at": exported_at,
"input_path": str(input_path.resolve()),
"id_base": id_base,
"total_cases": len(index_cases),
"duplicate_names": duplicate_names,
"cases": index_cases,
}
return payload, case_index
def write_json(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def main() -> int:
args = parse_args()
input_path = args.input.resolve()
output_path = args.output.resolve()
case_index_output = (
args.case_index_output.resolve()
if args.case_index_output is not None
else output_path.with_name(f"{output_path.stem}.case_index.json")
)
if args.id_base < 0:
raise ValueError("--id-base must be greater than or equal to 0")
if not input_path.is_file():
raise FileNotFoundError(f"Input case list not found: {input_path}")
case_rows: list[dict[str, Any]] = []
for line_no, raw_line in enumerate(input_path.read_text(encoding="utf-8").splitlines(), start=1):
parsed = parse_case_line(raw_line, line_no)
if parsed is None:
continue
case_name, source = parsed
case_rows.append(
{
"line_no": line_no,
"raw_line": raw_line,
"case_name": case_name,
"source": source,
}
)
if not case_rows:
raise ValueError(f"No runnable cases were found in {input_path}")
payload, case_index = build_payload(
input_path=input_path,
case_rows=case_rows,
id_base=args.id_base,
)
write_json(output_path, payload)
write_json(case_index_output, case_index)
print(f"input: {input_path}")
print(f"output: {output_path}")
print(f"case_index_output: {case_index_output}")
print(f"total_cases: {payload['summary']['total']}")
print(f"duplicate_name_count: {payload['summary']['duplicate_name_count']}")
print(f"pdcl_case_count: {payload['summary']['pdcl_case_count']}")
print(f"standard_path_case_count: {payload['summary']['standard_path_case_count']}")
return 0
if __name__ == "__main__":
sys.exit(main())