222 lines
7.1 KiB
Python
222 lines
7.1 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Build a synthetic issue JSON from a plain CNCAP case list."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import re
|
||
|
|
import sys
|
||
|
|
from collections import Counter
|
||
|
|
from datetime import datetime
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
|
||
|
|
DEFAULT_INPUT = Path(__file__).with_name("cncap_case.txt")
|
||
|
|
DEFAULT_ID_BASE = 9_000_000_000
|
||
|
|
PDCL_REF_RE = re.compile(r"ADAS_[^:/\\\s]+::[^/\\\s]*")
|
||
|
|
|
||
|
|
|
||
|
|
def parse_args() -> argparse.Namespace:
|
||
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
||
|
|
parser.add_argument(
|
||
|
|
"--input",
|
||
|
|
type=Path,
|
||
|
|
default=DEFAULT_INPUT,
|
||
|
|
help="Path to the CNCAP case list text file.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--output",
|
||
|
|
type=Path,
|
||
|
|
required=True,
|
||
|
|
help="Path to the synthetic issue JSON output.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--case-index-output",
|
||
|
|
type=Path,
|
||
|
|
default=None,
|
||
|
|
help="Optional companion index JSON output path.",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--id-base",
|
||
|
|
type=int,
|
||
|
|
default=DEFAULT_ID_BASE,
|
||
|
|
help="Synthetic issue id base. The first generated issue id is id_base + 1.",
|
||
|
|
)
|
||
|
|
return parser.parse_args()
|
||
|
|
|
||
|
|
|
||
|
|
def parse_case_line(raw_line: str, line_no: int) -> tuple[str, str] | None:
|
||
|
|
stripped = raw_line.strip()
|
||
|
|
if not stripped or stripped.startswith("#"):
|
||
|
|
return None
|
||
|
|
|
||
|
|
if "|" in stripped:
|
||
|
|
case_name, source = (part.strip() for part in stripped.split("|", 1))
|
||
|
|
else:
|
||
|
|
try:
|
||
|
|
case_name, source = stripped.rsplit(None, 1)
|
||
|
|
except ValueError as exc:
|
||
|
|
raise ValueError(
|
||
|
|
f"Line {line_no}: expected '<case_name> <case_ref>' or '<case_name> | <case_ref>', "
|
||
|
|
f"got: {raw_line.rstrip()!r}"
|
||
|
|
) from exc
|
||
|
|
|
||
|
|
if not case_name or not source:
|
||
|
|
raise ValueError(f"Line {line_no}: empty case name or case reference")
|
||
|
|
return case_name, source
|
||
|
|
|
||
|
|
|
||
|
|
def classify_source(source: str, line_no: int) -> tuple[str, str, str]:
|
||
|
|
if "mdi raw" in source.lower() or PDCL_REF_RE.search(source):
|
||
|
|
return "", source, "pdcl_mdi_download"
|
||
|
|
if "/" in source or "\\" in source:
|
||
|
|
return source, "", "standard_path"
|
||
|
|
raise ValueError(f"Line {line_no}: unsupported case reference format: {source!r}")
|
||
|
|
|
||
|
|
|
||
|
|
def build_payload(
|
||
|
|
*,
|
||
|
|
input_path: Path,
|
||
|
|
case_rows: list[dict[str, Any]],
|
||
|
|
id_base: int,
|
||
|
|
) -> tuple[dict[str, Any], dict[str, Any]]:
|
||
|
|
exported_at = datetime.now().astimezone().isoformat(timespec="seconds")
|
||
|
|
name_counter: Counter[str] = Counter()
|
||
|
|
items: list[dict[str, Any]] = []
|
||
|
|
index_cases: list[dict[str, Any]] = []
|
||
|
|
source_kind_counts: Counter[str] = Counter()
|
||
|
|
|
||
|
|
for ordinal, row in enumerate(case_rows, start=1):
|
||
|
|
case_name = row["case_name"]
|
||
|
|
source = row["source"]
|
||
|
|
line_no = row["line_no"]
|
||
|
|
raw_line = row["raw_line"]
|
||
|
|
name_counter[case_name] += 1
|
||
|
|
occurrence_index = name_counter[case_name]
|
||
|
|
issue_id = id_base + ordinal
|
||
|
|
|
||
|
|
standard_path, pdcl_ref, source_kind = classify_source(source, line_no)
|
||
|
|
source_kind_counts[source_kind] += 1
|
||
|
|
|
||
|
|
item = {
|
||
|
|
"id": issue_id,
|
||
|
|
"name": case_name,
|
||
|
|
"status": "CNCAP_CASE",
|
||
|
|
"created_at": None,
|
||
|
|
"updated_at": exported_at,
|
||
|
|
"问题数据地址": standard_path,
|
||
|
|
"问题数据地址_PDCL": pdcl_ref,
|
||
|
|
"问题发生frameid": None,
|
||
|
|
"source_line_no": line_no,
|
||
|
|
"case_ref": source,
|
||
|
|
"case_occurrence_index": occurrence_index,
|
||
|
|
"synthetic_issue_key": f"cncap_case_{ordinal:03d}",
|
||
|
|
}
|
||
|
|
items.append(item)
|
||
|
|
index_cases.append(
|
||
|
|
{
|
||
|
|
"id": issue_id,
|
||
|
|
"name": case_name,
|
||
|
|
"source_kind": source_kind,
|
||
|
|
"source_line_no": line_no,
|
||
|
|
"case_ref": source,
|
||
|
|
"case_occurrence_index": occurrence_index,
|
||
|
|
"raw_line": raw_line,
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
duplicate_names = sorted(name for name, count in name_counter.items() if count > 1)
|
||
|
|
payload = {
|
||
|
|
"exported_at": exported_at,
|
||
|
|
"source": {
|
||
|
|
"type": "cncap_case_txt",
|
||
|
|
"input_path": str(input_path.resolve()),
|
||
|
|
"id_base": id_base,
|
||
|
|
"work_item_type": "synthetic_issue",
|
||
|
|
"included_detail_fields": [
|
||
|
|
"问题数据地址",
|
||
|
|
"问题数据地址_PDCL",
|
||
|
|
"问题发生frameid",
|
||
|
|
],
|
||
|
|
},
|
||
|
|
"summary": {
|
||
|
|
"success": True,
|
||
|
|
"total": len(items),
|
||
|
|
"duplicate_name_count": len(duplicate_names),
|
||
|
|
"pdcl_case_count": source_kind_counts.get("pdcl_mdi_download", 0),
|
||
|
|
"standard_path_case_count": source_kind_counts.get("standard_path", 0),
|
||
|
|
},
|
||
|
|
"items": items,
|
||
|
|
}
|
||
|
|
case_index = {
|
||
|
|
"generated_at": exported_at,
|
||
|
|
"input_path": str(input_path.resolve()),
|
||
|
|
"id_base": id_base,
|
||
|
|
"total_cases": len(index_cases),
|
||
|
|
"duplicate_names": duplicate_names,
|
||
|
|
"cases": index_cases,
|
||
|
|
}
|
||
|
|
return payload, case_index
|
||
|
|
|
||
|
|
|
||
|
|
def write_json(path: Path, payload: dict[str, Any]) -> None:
|
||
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
args = parse_args()
|
||
|
|
input_path = args.input.resolve()
|
||
|
|
output_path = args.output.resolve()
|
||
|
|
case_index_output = (
|
||
|
|
args.case_index_output.resolve()
|
||
|
|
if args.case_index_output is not None
|
||
|
|
else output_path.with_name(f"{output_path.stem}.case_index.json")
|
||
|
|
)
|
||
|
|
|
||
|
|
if args.id_base < 0:
|
||
|
|
raise ValueError("--id-base must be greater than or equal to 0")
|
||
|
|
if not input_path.is_file():
|
||
|
|
raise FileNotFoundError(f"Input case list not found: {input_path}")
|
||
|
|
|
||
|
|
case_rows: list[dict[str, Any]] = []
|
||
|
|
for line_no, raw_line in enumerate(input_path.read_text(encoding="utf-8").splitlines(), start=1):
|
||
|
|
parsed = parse_case_line(raw_line, line_no)
|
||
|
|
if parsed is None:
|
||
|
|
continue
|
||
|
|
case_name, source = parsed
|
||
|
|
case_rows.append(
|
||
|
|
{
|
||
|
|
"line_no": line_no,
|
||
|
|
"raw_line": raw_line,
|
||
|
|
"case_name": case_name,
|
||
|
|
"source": source,
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
if not case_rows:
|
||
|
|
raise ValueError(f"No runnable cases were found in {input_path}")
|
||
|
|
|
||
|
|
payload, case_index = build_payload(
|
||
|
|
input_path=input_path,
|
||
|
|
case_rows=case_rows,
|
||
|
|
id_base=args.id_base,
|
||
|
|
)
|
||
|
|
write_json(output_path, payload)
|
||
|
|
write_json(case_index_output, case_index)
|
||
|
|
|
||
|
|
print(f"input: {input_path}")
|
||
|
|
print(f"output: {output_path}")
|
||
|
|
print(f"case_index_output: {case_index_output}")
|
||
|
|
print(f"total_cases: {payload['summary']['total']}")
|
||
|
|
print(f"duplicate_name_count: {payload['summary']['duplicate_name_count']}")
|
||
|
|
print(f"pdcl_case_count: {payload['summary']['pdcl_case_count']}")
|
||
|
|
print(f"standard_path_case_count: {payload['summary']['standard_path_case_count']}")
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(main())
|