Files

487 lines
16 KiB
Python
Raw Permalink Normal View History

2026-06-24 09:35:46 +08:00
"""
AEB 场地计划表或 aeb_rawid.json 读取 rawid查询每个 rawid 对应的
clip ukey 列表并保存为可直接用于批量推理的单个 aeb_clips*.json
默认输入
tools/pdcl_inference/G1Q3项目AEB场地计划表.xlsx
默认读取 "实际测试进展" "复测case" 两个 sheet
兼容旧流程
也支持通过 --input 读取已生成的 aeb_rawid.json
依赖
pip install pdcl_dss -i https://pypi.minieye.tech/
默认输出命名
未指定 --output Excel "CVE数据" 列的最小/最大时间自动命名
例如 aeb_clips-20260322152509_to_20260410183944.json
输出格式
{
"summary": {
"total_scenarios": 123,
"total_rawids": 456,
"scenario_total_rawids": {
"CPFA-25-6.5-20": 12
}
},
"scenarios": {
"CPFA-25-6.5-20": [
{
"sheet": "实际测试进展",
"场景": "CPFA",
"偏置": "25",
"目标速度": "6.5",
"自车速度": "20",
"CVE数据": "20260323113612",
"rawid": "ADAS_...",
"clips": ["clip_ukey1", ...]
}
]
}
}
"""
import argparse
import json
import os
import re
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pandas as pd
FILE = Path(__file__).resolve()
ROOT = FILE.parents[2]
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT))
DEFAULT_XLSX = os.path.join(os.path.dirname(__file__), "G1Q3项目AEB场地计划表.xlsx")
DEFAULT_SHEET_NAME = "实际测试进展"
DEFAULT_RETEST_SHEET_NAME = "复测case"
DEFAULT_SHEET_NAMES = [DEFAULT_SHEET_NAME, DEFAULT_RETEST_SHEET_NAME]
DEFAULT_OUTPUT_PREFIX = "aeb_clips"
os.environ.setdefault("STS_UID", "dis-uploader")
os.environ.setdefault("STS_SECRET_KEY", "277310cc09724d315514a79701fecb0f")
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
def _to_str(val):
"""将单元格值转换为字符串NaN 返回 None。"""
if pd.isna(val):
return None
if isinstance(val, float) and val == int(val):
return str(int(val))
return str(val).strip()
def _build_column_index(header: List[object]) -> Dict[str, int]:
"""根据表头名称构建列索引映射。"""
header_map = {}
for idx, value in enumerate(header):
name = _to_str(value)
if name:
header_map[name] = idx
required_columns = ["场景", "偏置", "目标速度", "自车速度", "rawid", "CVE数据"]
missing = [name for name in required_columns if name not in header_map]
if missing:
raise ValueError(f"Excel 缺少必需列: {missing};实际表头: {list(header_map.keys())}")
return header_map
def _normalize_cve_timestamp(val: Optional[str]) -> Optional[str]:
"""将 CVE 数据列清洗为可用于命名的时间串。"""
if not val:
return None
digits = re.sub(r"\D", "", val)
if len(digits) >= 14:
return digits[:14]
if len(digits) >= 8:
return digits[:8]
return None
def _forward_fill_series(series: pd.Series) -> pd.Series:
"""仅对当前列做简单前向填充,避免 pandas 对 object 列的告警。"""
filled = []
last_valid = None
for value in series.tolist():
if not pd.isna(value):
last_valid = value
filled.append(value)
else:
filled.append(last_valid)
return pd.Series(filled, index=series.index, dtype=object)
def _build_scenario_key(
scenario: Optional[str],
offset: Optional[str],
target_speed: Optional[str],
ego_speed: Optional[str],
) -> str:
"""构造完整工况键:场景-偏置-目标速度-自车速度。"""
parts = [
scenario or "未知工况",
offset or "未知偏置",
target_speed or "未知目标速度",
ego_speed or "未知自车速度",
]
return "-".join(parts)
def _cluster_records(records: List[dict], fallback_scenario: Optional[str] = None) -> Dict[str, List[dict]]:
"""按完整工况键聚类记录。"""
clustered: Dict[str, List[dict]] = {}
for record in records:
scenario = _to_str(record.get("场景")) or fallback_scenario or "未知工况"
offset = _to_str(record.get("偏置"))
target_speed = _to_str(record.get("目标速度"))
ego_speed = _to_str(record.get("自车速度"))
scenario_key = _build_scenario_key(scenario, offset, target_speed, ego_speed)
normalized_record = dict(record)
normalized_record["场景"] = scenario
clustered.setdefault(scenario_key, []).append(normalized_record)
return clustered
def _merge_clustered_records(grouped_records: List[Dict[str, List[dict]]]) -> Dict[str, List[dict]]:
"""合并多个分组结果。"""
merged: Dict[str, List[dict]] = {}
for grouped in grouped_records:
for scenario_key, records in grouped.items():
merged.setdefault(scenario_key, []).extend(records)
return merged
def parse_excel_sheet(xlsx_source, sheet_name: str = DEFAULT_SHEET_NAME) -> Dict[str, List[dict]]:
"""解析单个 Excel sheet并按完整工况键聚类 rawid 记录。"""
df = pd.read_excel(xlsx_source, sheet_name=sheet_name, header=None)
header = list(df.iloc[0])
column_index = _build_column_index(header)
data = df.iloc[1:].copy().reset_index(drop=True)
for name in ["场景", "偏置", "目标速度", "自车速度"]:
data.iloc[:, column_index[name]] = _forward_fill_series(
data.iloc[:, column_index[name]]
)
records: List[dict] = []
for _, row in data.iterrows():
rawid = _to_str(row.iloc[column_index["rawid"]])
if rawid is None:
continue
record = {
"sheet": sheet_name,
"场景": _to_str(row.iloc[column_index["场景"]]) or "未知工况",
"偏置": _to_str(row.iloc[column_index["偏置"]]),
"目标速度": _to_str(row.iloc[column_index["目标速度"]]),
"自车速度": _to_str(row.iloc[column_index["自车速度"]]),
"CVE数据": _to_str(row.iloc[column_index["CVE数据"]]),
"rawid": rawid,
}
records.append(record)
return _cluster_records(records)
def parse_excel(
xlsx_path: str,
sheet_names: Optional[List[str]] = None,
allow_missing_sheets: bool = False,
) -> Dict[str, List[dict]]:
"""解析一个或多个 Excel sheet并合并为按完整工况键聚类的 rawid 记录。"""
requested_sheet_names = sheet_names or DEFAULT_SHEET_NAMES
excel = pd.ExcelFile(xlsx_path)
available_sheet_names = set(excel.sheet_names)
missing_sheet_names = [
sheet_name
for sheet_name in requested_sheet_names
if sheet_name not in available_sheet_names
]
if missing_sheet_names and not allow_missing_sheets:
raise ValueError(
f"Excel 缺少 sheet: {missing_sheet_names};实际 sheet: {excel.sheet_names}"
)
if missing_sheet_names:
print(
f"警告Excel 缺少 sheet {missing_sheet_names},已跳过;"
f"实际 sheet: {excel.sheet_names}"
)
grouped_records = []
for sheet_name in requested_sheet_names:
if sheet_name not in available_sheet_names:
continue
grouped_records.append(parse_excel_sheet(excel, sheet_name=sheet_name))
if not grouped_records:
raise ValueError(
f"Excel 未找到任何可解析 sheet期望 sheet: {requested_sheet_names}"
f"实际 sheet: {excel.sheet_names}"
)
return _merge_clustered_records(grouped_records)
def load_rawid_json(json_path: str) -> Dict[str, List[dict]]:
"""读取旧格式 aeb_rawid.json。"""
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError(f"输入 JSON 顶层必须是 dict实际: {type(data).__name__}")
if "scenarios" in data:
scenario_data = data["scenarios"]
if not isinstance(scenario_data, dict):
raise ValueError("输入 JSON 的 scenarios 字段必须是 dict")
data = scenario_data
grouped_records = []
for scenario_name, records in data.items():
if not isinstance(records, list):
raise ValueError(
f"场景 {scenario_name} 对应的数据必须是 list实际: {type(records).__name__}"
)
grouped_records.append(_cluster_records(records, fallback_scenario=_to_str(scenario_name)))
return _merge_clustered_records(grouped_records)
def get_clip_ukeys_from_raw(raw_id: str) -> List[str]:
"""通过 raw_id 获取关联的 clip ukey 列表。"""
from pdcl_dss import Raw
with Raw(raw_id) as raw:
return raw.list_clip_ukeys()
def get_cve_time_range(rawid_data: Dict[str, List[dict]]) -> Optional[Tuple[str, str]]:
"""从记录中提取 CVE 数据时间范围。"""
timestamps = []
for records in rawid_data.values():
for record in records:
timestamp = _normalize_cve_timestamp(_to_str(record.get("CVE数据")))
if timestamp:
timestamps.append(timestamp)
if not timestamps:
return None
return min(timestamps), max(timestamps)
def resolve_output_path(
output_arg: Optional[str],
source_path: str,
rawid_data: Dict[str, List[dict]],
) -> str:
"""解析最终输出路径;未显式指定时按 CVE 时间范围自动命名。"""
if output_arg:
return output_arg
cve_time_range = get_cve_time_range(rawid_data)
if cve_time_range is None:
filename = f"{DEFAULT_OUTPUT_PREFIX}.json"
else:
start, end = cve_time_range
suffix = start if start == end else f"{start}_to_{end}"
filename = f"{DEFAULT_OUTPUT_PREFIX}-{suffix}.json"
return os.path.join(os.path.dirname(source_path), filename)
def build_output_payload(scenario_records: Dict[str, List[dict]]) -> dict:
"""构造包含 summary 和 scenarios 的最终输出。"""
total_rawids = sum(len(records) for records in scenario_records.values())
return {
"summary": {
"total_scenarios": len(scenario_records),
"total_rawids": total_rawids,
"scenario_total_rawids": {
scenario: len(records)
for scenario, records in scenario_records.items()
},
},
"scenarios": scenario_records,
}
def build_clips_manifest(rawid_data: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
"""为每条 rawid 记录补充 clips 字段。"""
result: Dict[str, List[dict]] = {}
clip_cache: Dict[str, List[str]] = {}
total_rawids = sum(
1
for records in rawid_data.values()
for record in records
if record.get("rawid")
)
processed = 0
for scenario, records in rawid_data.items():
result[scenario] = []
print(f"\n=== 工况: {scenario} ({len(records)} 条 rawid) ===")
for rec in records:
raw_id = rec.get("rawid")
if not raw_id:
print(" 跳过缺少 rawid 的记录")
continue
processed += 1
print(f" [{processed}/{total_rawids}] {raw_id} ...", end=" ", flush=True)
if raw_id in clip_cache:
clips = clip_cache[raw_id]
print(f"复用缓存,找到 {len(clips)} 个 clip")
else:
try:
clips = get_clip_ukeys_from_raw(raw_id)
clip_cache[raw_id] = clips
print(f"找到 {len(clips)} 个 clip")
except Exception as e:
clips = []
clip_cache[raw_id] = clips
print(f"失败: {e}")
entry = dict(rec)
entry["clips"] = clips
result[scenario].append(entry)
return result
def print_summary(
result: Dict[str, List[dict]],
output_path: str,
cve_time_range: Optional[Tuple[str, str]] = None,
) -> None:
total_rawids = sum(len(records) for records in result.values())
total_clips = sum(
len(entry.get("clips", []))
for records in result.values()
for entry in records
)
print(f"\n已保存至:{output_path}")
if cve_time_range:
start, end = cve_time_range
if start == end:
print(f"CVE数据时间{start}")
else:
print(f"CVE数据时间范围{start} ~ {end}")
print(f"{len(result)} 个工况,{total_rawids} 条 rawid{total_clips} 个 clip")
def parse_args():
parser = argparse.ArgumentParser(
description="从 AEB 表格或 aeb_rawid.json 直接生成 aeb_clips.json。"
)
parser.add_argument(
"--xlsx",
default=None,
help=(
"Excel 文件路径;未指定且 --input 也未指定时,默认读取脚本同目录下的 "
"G1Q3项目AEB场地计划表.xlsx"
),
)
parser.add_argument(
"--sheet-name",
default=None,
help=(
"Excel 单个 sheet 名称;指定后只读取该 sheet。"
f"未指定时默认读取:{', '.join(DEFAULT_SHEET_NAMES)}"
),
)
parser.add_argument(
"--sheet-names",
default=None,
help=(
"Excel 多个 sheet 名称,用英文逗号分隔;"
f"未指定时默认读取:{', '.join(DEFAULT_SHEET_NAMES)}"
),
)
parser.add_argument(
"--input",
default=None,
help="兼容旧流程:输入 aeb_rawid.json 文件路径",
)
parser.add_argument(
"--output",
default=None,
help="输出 JSON 文件路径;未指定时自动按 CVE数据 时间范围命名",
)
return parser.parse_args()
def main():
args = parse_args()
if args.xlsx and args.input:
raise ValueError("--xlsx 和 --input 不能同时指定,请二选一。")
if args.sheet_name and args.sheet_names:
raise ValueError("--sheet-name 和 --sheet-names 不能同时指定,请二选一。")
if args.input:
print(f"读取 rawid JSON{args.input}")
rawid_data = load_rawid_json(args.input)
source_path = args.input
else:
xlsx_path = args.xlsx or DEFAULT_XLSX
print(f"读取 Excel{xlsx_path}")
if args.sheet_name:
sheet_names = [args.sheet_name]
allow_missing_sheets = False
elif args.sheet_names:
sheet_names = [
sheet_name.strip()
for sheet_name in args.sheet_names.split(",")
if sheet_name.strip()
]
if not sheet_names:
raise ValueError("--sheet-names 至少需要指定一个有效 sheet 名称")
allow_missing_sheets = False
else:
sheet_names = DEFAULT_SHEET_NAMES
allow_missing_sheets = True
print(f"读取 sheet{', '.join(sheet_names)}")
rawid_data = parse_excel(
xlsx_path,
sheet_names=sheet_names,
allow_missing_sheets=allow_missing_sheets,
)
source_path = xlsx_path
output_path = resolve_output_path(args.output, source_path, rawid_data)
cve_time_range = get_cve_time_range(rawid_data)
total_rawids = sum(len(records) for records in rawid_data.values())
print(f"待查询 {len(rawid_data)} 个工况,{total_rawids} 条 rawid 记录")
result = build_clips_manifest(rawid_data)
output_payload = build_output_payload(result)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(output_payload, f, ensure_ascii=False, indent=2)
print_summary(result, output_path, cve_time_range=cve_time_range)
if __name__ == "__main__":
main()