487 lines
16 KiB
Python
487 lines
16 KiB
Python
|
|
"""
|
|||
|
|
从 AEB 场地计划表或 aeb_rawid.json 读取 rawid,查询每个 rawid 对应的
|
|||
|
|
clip ukey 列表,并保存为可直接用于批量推理的单个 aeb_clips*.json。
|
|||
|
|
|
|||
|
|
默认输入:
|
|||
|
|
tools/pdcl_inference/G1Q3项目AEB场地计划表.xlsx
|
|||
|
|
默认读取 "实际测试进展" 和 "复测case" 两个 sheet。
|
|||
|
|
|
|||
|
|
兼容旧流程:
|
|||
|
|
也支持通过 --input 读取已生成的 aeb_rawid.json。
|
|||
|
|
|
|||
|
|
依赖:
|
|||
|
|
pip install pdcl_dss -i https://pypi.minieye.tech/
|
|||
|
|
|
|||
|
|
默认输出命名:
|
|||
|
|
未指定 --output 时,按 Excel 中 "CVE数据" 列的最小/最大时间自动命名,
|
|||
|
|
例如 aeb_clips-20260322152509_to_20260410183944.json
|
|||
|
|
|
|||
|
|
输出格式:
|
|||
|
|
{
|
|||
|
|
"summary": {
|
|||
|
|
"total_scenarios": 123,
|
|||
|
|
"total_rawids": 456,
|
|||
|
|
"scenario_total_rawids": {
|
|||
|
|
"CPFA-25-6.5-20": 12
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
"scenarios": {
|
|||
|
|
"CPFA-25-6.5-20": [
|
|||
|
|
{
|
|||
|
|
"sheet": "实际测试进展",
|
|||
|
|
"场景": "CPFA",
|
|||
|
|
"偏置": "25",
|
|||
|
|
"目标速度": "6.5",
|
|||
|
|
"自车速度": "20",
|
|||
|
|
"CVE数据": "20260323113612",
|
|||
|
|
"rawid": "ADAS_...",
|
|||
|
|
"clips": ["clip_ukey1", ...]
|
|||
|
|
}
|
|||
|
|
]
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import argparse
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
import re
|
|||
|
|
import sys
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Dict, List, Optional, Tuple
|
|||
|
|
|
|||
|
|
import pandas as pd
|
|||
|
|
|
|||
|
|
FILE = Path(__file__).resolve()
|
|||
|
|
ROOT = FILE.parents[2]
|
|||
|
|
if str(ROOT) not in sys.path:
|
|||
|
|
sys.path.append(str(ROOT))
|
|||
|
|
|
|||
|
|
DEFAULT_XLSX = os.path.join(os.path.dirname(__file__), "G1Q3项目AEB场地计划表.xlsx")
|
|||
|
|
DEFAULT_SHEET_NAME = "实际测试进展"
|
|||
|
|
DEFAULT_RETEST_SHEET_NAME = "复测case"
|
|||
|
|
DEFAULT_SHEET_NAMES = [DEFAULT_SHEET_NAME, DEFAULT_RETEST_SHEET_NAME]
|
|||
|
|
DEFAULT_OUTPUT_PREFIX = "aeb_clips"
|
|||
|
|
|
|||
|
|
os.environ.setdefault("STS_UID", "dis-uploader")
|
|||
|
|
os.environ.setdefault("STS_SECRET_KEY", "277310cc09724d315514a79701fecb0f")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from dotenv import load_dotenv
|
|||
|
|
|
|||
|
|
load_dotenv()
|
|||
|
|
except ImportError:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _to_str(val):
|
|||
|
|
"""将单元格值转换为字符串,NaN 返回 None。"""
|
|||
|
|
if pd.isna(val):
|
|||
|
|
return None
|
|||
|
|
if isinstance(val, float) and val == int(val):
|
|||
|
|
return str(int(val))
|
|||
|
|
return str(val).strip()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _build_column_index(header: List[object]) -> Dict[str, int]:
|
|||
|
|
"""根据表头名称构建列索引映射。"""
|
|||
|
|
header_map = {}
|
|||
|
|
for idx, value in enumerate(header):
|
|||
|
|
name = _to_str(value)
|
|||
|
|
if name:
|
|||
|
|
header_map[name] = idx
|
|||
|
|
|
|||
|
|
required_columns = ["场景", "偏置", "目标速度", "自车速度", "rawid", "CVE数据"]
|
|||
|
|
missing = [name for name in required_columns if name not in header_map]
|
|||
|
|
if missing:
|
|||
|
|
raise ValueError(f"Excel 缺少必需列: {missing};实际表头: {list(header_map.keys())}")
|
|||
|
|
return header_map
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _normalize_cve_timestamp(val: Optional[str]) -> Optional[str]:
|
|||
|
|
"""将 CVE 数据列清洗为可用于命名的时间串。"""
|
|||
|
|
if not val:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
digits = re.sub(r"\D", "", val)
|
|||
|
|
if len(digits) >= 14:
|
|||
|
|
return digits[:14]
|
|||
|
|
if len(digits) >= 8:
|
|||
|
|
return digits[:8]
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _forward_fill_series(series: pd.Series) -> pd.Series:
|
|||
|
|
"""仅对当前列做简单前向填充,避免 pandas 对 object 列的告警。"""
|
|||
|
|
filled = []
|
|||
|
|
last_valid = None
|
|||
|
|
for value in series.tolist():
|
|||
|
|
if not pd.isna(value):
|
|||
|
|
last_valid = value
|
|||
|
|
filled.append(value)
|
|||
|
|
else:
|
|||
|
|
filled.append(last_valid)
|
|||
|
|
return pd.Series(filled, index=series.index, dtype=object)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _build_scenario_key(
|
|||
|
|
scenario: Optional[str],
|
|||
|
|
offset: Optional[str],
|
|||
|
|
target_speed: Optional[str],
|
|||
|
|
ego_speed: Optional[str],
|
|||
|
|
) -> str:
|
|||
|
|
"""构造完整工况键:场景-偏置-目标速度-自车速度。"""
|
|||
|
|
parts = [
|
|||
|
|
scenario or "未知工况",
|
|||
|
|
offset or "未知偏置",
|
|||
|
|
target_speed or "未知目标速度",
|
|||
|
|
ego_speed or "未知自车速度",
|
|||
|
|
]
|
|||
|
|
return "-".join(parts)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _cluster_records(records: List[dict], fallback_scenario: Optional[str] = None) -> Dict[str, List[dict]]:
|
|||
|
|
"""按完整工况键聚类记录。"""
|
|||
|
|
clustered: Dict[str, List[dict]] = {}
|
|||
|
|
for record in records:
|
|||
|
|
scenario = _to_str(record.get("场景")) or fallback_scenario or "未知工况"
|
|||
|
|
offset = _to_str(record.get("偏置"))
|
|||
|
|
target_speed = _to_str(record.get("目标速度"))
|
|||
|
|
ego_speed = _to_str(record.get("自车速度"))
|
|||
|
|
scenario_key = _build_scenario_key(scenario, offset, target_speed, ego_speed)
|
|||
|
|
|
|||
|
|
normalized_record = dict(record)
|
|||
|
|
normalized_record["场景"] = scenario
|
|||
|
|
clustered.setdefault(scenario_key, []).append(normalized_record)
|
|||
|
|
return clustered
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _merge_clustered_records(grouped_records: List[Dict[str, List[dict]]]) -> Dict[str, List[dict]]:
|
|||
|
|
"""合并多个分组结果。"""
|
|||
|
|
merged: Dict[str, List[dict]] = {}
|
|||
|
|
for grouped in grouped_records:
|
|||
|
|
for scenario_key, records in grouped.items():
|
|||
|
|
merged.setdefault(scenario_key, []).extend(records)
|
|||
|
|
return merged
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_excel_sheet(xlsx_source, sheet_name: str = DEFAULT_SHEET_NAME) -> Dict[str, List[dict]]:
|
|||
|
|
"""解析单个 Excel sheet,并按完整工况键聚类 rawid 记录。"""
|
|||
|
|
df = pd.read_excel(xlsx_source, sheet_name=sheet_name, header=None)
|
|||
|
|
|
|||
|
|
header = list(df.iloc[0])
|
|||
|
|
column_index = _build_column_index(header)
|
|||
|
|
|
|||
|
|
data = df.iloc[1:].copy().reset_index(drop=True)
|
|||
|
|
for name in ["场景", "偏置", "目标速度", "自车速度"]:
|
|||
|
|
data.iloc[:, column_index[name]] = _forward_fill_series(
|
|||
|
|
data.iloc[:, column_index[name]]
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
records: List[dict] = []
|
|||
|
|
for _, row in data.iterrows():
|
|||
|
|
rawid = _to_str(row.iloc[column_index["rawid"]])
|
|||
|
|
if rawid is None:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
record = {
|
|||
|
|
"sheet": sheet_name,
|
|||
|
|
"场景": _to_str(row.iloc[column_index["场景"]]) or "未知工况",
|
|||
|
|
"偏置": _to_str(row.iloc[column_index["偏置"]]),
|
|||
|
|
"目标速度": _to_str(row.iloc[column_index["目标速度"]]),
|
|||
|
|
"自车速度": _to_str(row.iloc[column_index["自车速度"]]),
|
|||
|
|
"CVE数据": _to_str(row.iloc[column_index["CVE数据"]]),
|
|||
|
|
"rawid": rawid,
|
|||
|
|
}
|
|||
|
|
records.append(record)
|
|||
|
|
|
|||
|
|
return _cluster_records(records)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_excel(
|
|||
|
|
xlsx_path: str,
|
|||
|
|
sheet_names: Optional[List[str]] = None,
|
|||
|
|
allow_missing_sheets: bool = False,
|
|||
|
|
) -> Dict[str, List[dict]]:
|
|||
|
|
"""解析一个或多个 Excel sheet,并合并为按完整工况键聚类的 rawid 记录。"""
|
|||
|
|
requested_sheet_names = sheet_names or DEFAULT_SHEET_NAMES
|
|||
|
|
excel = pd.ExcelFile(xlsx_path)
|
|||
|
|
available_sheet_names = set(excel.sheet_names)
|
|||
|
|
|
|||
|
|
missing_sheet_names = [
|
|||
|
|
sheet_name
|
|||
|
|
for sheet_name in requested_sheet_names
|
|||
|
|
if sheet_name not in available_sheet_names
|
|||
|
|
]
|
|||
|
|
if missing_sheet_names and not allow_missing_sheets:
|
|||
|
|
raise ValueError(
|
|||
|
|
f"Excel 缺少 sheet: {missing_sheet_names};实际 sheet: {excel.sheet_names}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if missing_sheet_names:
|
|||
|
|
print(
|
|||
|
|
f"警告:Excel 缺少 sheet {missing_sheet_names},已跳过;"
|
|||
|
|
f"实际 sheet: {excel.sheet_names}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
grouped_records = []
|
|||
|
|
for sheet_name in requested_sheet_names:
|
|||
|
|
if sheet_name not in available_sheet_names:
|
|||
|
|
continue
|
|||
|
|
grouped_records.append(parse_excel_sheet(excel, sheet_name=sheet_name))
|
|||
|
|
|
|||
|
|
if not grouped_records:
|
|||
|
|
raise ValueError(
|
|||
|
|
f"Excel 未找到任何可解析 sheet;期望 sheet: {requested_sheet_names};"
|
|||
|
|
f"实际 sheet: {excel.sheet_names}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return _merge_clustered_records(grouped_records)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def load_rawid_json(json_path: str) -> Dict[str, List[dict]]:
|
|||
|
|
"""读取旧格式 aeb_rawid.json。"""
|
|||
|
|
with open(json_path, "r", encoding="utf-8") as f:
|
|||
|
|
data = json.load(f)
|
|||
|
|
if not isinstance(data, dict):
|
|||
|
|
raise ValueError(f"输入 JSON 顶层必须是 dict,实际: {type(data).__name__}")
|
|||
|
|
|
|||
|
|
if "scenarios" in data:
|
|||
|
|
scenario_data = data["scenarios"]
|
|||
|
|
if not isinstance(scenario_data, dict):
|
|||
|
|
raise ValueError("输入 JSON 的 scenarios 字段必须是 dict")
|
|||
|
|
data = scenario_data
|
|||
|
|
|
|||
|
|
grouped_records = []
|
|||
|
|
for scenario_name, records in data.items():
|
|||
|
|
if not isinstance(records, list):
|
|||
|
|
raise ValueError(
|
|||
|
|
f"场景 {scenario_name} 对应的数据必须是 list,实际: {type(records).__name__}"
|
|||
|
|
)
|
|||
|
|
grouped_records.append(_cluster_records(records, fallback_scenario=_to_str(scenario_name)))
|
|||
|
|
return _merge_clustered_records(grouped_records)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_clip_ukeys_from_raw(raw_id: str) -> List[str]:
|
|||
|
|
"""通过 raw_id 获取关联的 clip ukey 列表。"""
|
|||
|
|
from pdcl_dss import Raw
|
|||
|
|
|
|||
|
|
with Raw(raw_id) as raw:
|
|||
|
|
return raw.list_clip_ukeys()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_cve_time_range(rawid_data: Dict[str, List[dict]]) -> Optional[Tuple[str, str]]:
|
|||
|
|
"""从记录中提取 CVE 数据时间范围。"""
|
|||
|
|
timestamps = []
|
|||
|
|
for records in rawid_data.values():
|
|||
|
|
for record in records:
|
|||
|
|
timestamp = _normalize_cve_timestamp(_to_str(record.get("CVE数据")))
|
|||
|
|
if timestamp:
|
|||
|
|
timestamps.append(timestamp)
|
|||
|
|
|
|||
|
|
if not timestamps:
|
|||
|
|
return None
|
|||
|
|
return min(timestamps), max(timestamps)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def resolve_output_path(
|
|||
|
|
output_arg: Optional[str],
|
|||
|
|
source_path: str,
|
|||
|
|
rawid_data: Dict[str, List[dict]],
|
|||
|
|
) -> str:
|
|||
|
|
"""解析最终输出路径;未显式指定时按 CVE 时间范围自动命名。"""
|
|||
|
|
if output_arg:
|
|||
|
|
return output_arg
|
|||
|
|
|
|||
|
|
cve_time_range = get_cve_time_range(rawid_data)
|
|||
|
|
if cve_time_range is None:
|
|||
|
|
filename = f"{DEFAULT_OUTPUT_PREFIX}.json"
|
|||
|
|
else:
|
|||
|
|
start, end = cve_time_range
|
|||
|
|
suffix = start if start == end else f"{start}_to_{end}"
|
|||
|
|
filename = f"{DEFAULT_OUTPUT_PREFIX}-{suffix}.json"
|
|||
|
|
|
|||
|
|
return os.path.join(os.path.dirname(source_path), filename)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_output_payload(scenario_records: Dict[str, List[dict]]) -> dict:
|
|||
|
|
"""构造包含 summary 和 scenarios 的最终输出。"""
|
|||
|
|
total_rawids = sum(len(records) for records in scenario_records.values())
|
|||
|
|
return {
|
|||
|
|
"summary": {
|
|||
|
|
"total_scenarios": len(scenario_records),
|
|||
|
|
"total_rawids": total_rawids,
|
|||
|
|
"scenario_total_rawids": {
|
|||
|
|
scenario: len(records)
|
|||
|
|
for scenario, records in scenario_records.items()
|
|||
|
|
},
|
|||
|
|
},
|
|||
|
|
"scenarios": scenario_records,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def build_clips_manifest(rawid_data: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
|
|||
|
|
"""为每条 rawid 记录补充 clips 字段。"""
|
|||
|
|
result: Dict[str, List[dict]] = {}
|
|||
|
|
clip_cache: Dict[str, List[str]] = {}
|
|||
|
|
|
|||
|
|
total_rawids = sum(
|
|||
|
|
1
|
|||
|
|
for records in rawid_data.values()
|
|||
|
|
for record in records
|
|||
|
|
if record.get("rawid")
|
|||
|
|
)
|
|||
|
|
processed = 0
|
|||
|
|
|
|||
|
|
for scenario, records in rawid_data.items():
|
|||
|
|
result[scenario] = []
|
|||
|
|
print(f"\n=== 工况: {scenario} ({len(records)} 条 rawid) ===")
|
|||
|
|
|
|||
|
|
for rec in records:
|
|||
|
|
raw_id = rec.get("rawid")
|
|||
|
|
if not raw_id:
|
|||
|
|
print(" 跳过缺少 rawid 的记录")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
processed += 1
|
|||
|
|
print(f" [{processed}/{total_rawids}] {raw_id} ...", end=" ", flush=True)
|
|||
|
|
|
|||
|
|
if raw_id in clip_cache:
|
|||
|
|
clips = clip_cache[raw_id]
|
|||
|
|
print(f"复用缓存,找到 {len(clips)} 个 clip")
|
|||
|
|
else:
|
|||
|
|
try:
|
|||
|
|
clips = get_clip_ukeys_from_raw(raw_id)
|
|||
|
|
clip_cache[raw_id] = clips
|
|||
|
|
print(f"找到 {len(clips)} 个 clip")
|
|||
|
|
except Exception as e:
|
|||
|
|
clips = []
|
|||
|
|
clip_cache[raw_id] = clips
|
|||
|
|
print(f"失败: {e}")
|
|||
|
|
|
|||
|
|
entry = dict(rec)
|
|||
|
|
entry["clips"] = clips
|
|||
|
|
result[scenario].append(entry)
|
|||
|
|
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
|
|||
|
|
def print_summary(
|
|||
|
|
result: Dict[str, List[dict]],
|
|||
|
|
output_path: str,
|
|||
|
|
cve_time_range: Optional[Tuple[str, str]] = None,
|
|||
|
|
) -> None:
|
|||
|
|
total_rawids = sum(len(records) for records in result.values())
|
|||
|
|
total_clips = sum(
|
|||
|
|
len(entry.get("clips", []))
|
|||
|
|
for records in result.values()
|
|||
|
|
for entry in records
|
|||
|
|
)
|
|||
|
|
print(f"\n已保存至:{output_path}")
|
|||
|
|
if cve_time_range:
|
|||
|
|
start, end = cve_time_range
|
|||
|
|
if start == end:
|
|||
|
|
print(f"CVE数据时间:{start}")
|
|||
|
|
else:
|
|||
|
|
print(f"CVE数据时间范围:{start} ~ {end}")
|
|||
|
|
print(f"共 {len(result)} 个工况,{total_rawids} 条 rawid,{total_clips} 个 clip")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def parse_args():
|
|||
|
|
parser = argparse.ArgumentParser(
|
|||
|
|
description="从 AEB 表格或 aeb_rawid.json 直接生成 aeb_clips.json。"
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--xlsx",
|
|||
|
|
default=None,
|
|||
|
|
help=(
|
|||
|
|
"Excel 文件路径;未指定且 --input 也未指定时,默认读取脚本同目录下的 "
|
|||
|
|
"G1Q3项目AEB场地计划表.xlsx"
|
|||
|
|
),
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--sheet-name",
|
|||
|
|
default=None,
|
|||
|
|
help=(
|
|||
|
|
"Excel 单个 sheet 名称;指定后只读取该 sheet。"
|
|||
|
|
f"未指定时默认读取:{', '.join(DEFAULT_SHEET_NAMES)}"
|
|||
|
|
),
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--sheet-names",
|
|||
|
|
default=None,
|
|||
|
|
help=(
|
|||
|
|
"Excel 多个 sheet 名称,用英文逗号分隔;"
|
|||
|
|
f"未指定时默认读取:{', '.join(DEFAULT_SHEET_NAMES)}"
|
|||
|
|
),
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--input",
|
|||
|
|
default=None,
|
|||
|
|
help="兼容旧流程:输入 aeb_rawid.json 文件路径",
|
|||
|
|
)
|
|||
|
|
parser.add_argument(
|
|||
|
|
"--output",
|
|||
|
|
default=None,
|
|||
|
|
help="输出 JSON 文件路径;未指定时自动按 CVE数据 时间范围命名",
|
|||
|
|
)
|
|||
|
|
return parser.parse_args()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
args = parse_args()
|
|||
|
|
|
|||
|
|
if args.xlsx and args.input:
|
|||
|
|
raise ValueError("--xlsx 和 --input 不能同时指定,请二选一。")
|
|||
|
|
if args.sheet_name and args.sheet_names:
|
|||
|
|
raise ValueError("--sheet-name 和 --sheet-names 不能同时指定,请二选一。")
|
|||
|
|
|
|||
|
|
if args.input:
|
|||
|
|
print(f"读取 rawid JSON:{args.input}")
|
|||
|
|
rawid_data = load_rawid_json(args.input)
|
|||
|
|
source_path = args.input
|
|||
|
|
else:
|
|||
|
|
xlsx_path = args.xlsx or DEFAULT_XLSX
|
|||
|
|
print(f"读取 Excel:{xlsx_path}")
|
|||
|
|
if args.sheet_name:
|
|||
|
|
sheet_names = [args.sheet_name]
|
|||
|
|
allow_missing_sheets = False
|
|||
|
|
elif args.sheet_names:
|
|||
|
|
sheet_names = [
|
|||
|
|
sheet_name.strip()
|
|||
|
|
for sheet_name in args.sheet_names.split(",")
|
|||
|
|
if sheet_name.strip()
|
|||
|
|
]
|
|||
|
|
if not sheet_names:
|
|||
|
|
raise ValueError("--sheet-names 至少需要指定一个有效 sheet 名称")
|
|||
|
|
allow_missing_sheets = False
|
|||
|
|
else:
|
|||
|
|
sheet_names = DEFAULT_SHEET_NAMES
|
|||
|
|
allow_missing_sheets = True
|
|||
|
|
|
|||
|
|
print(f"读取 sheet:{', '.join(sheet_names)}")
|
|||
|
|
rawid_data = parse_excel(
|
|||
|
|
xlsx_path,
|
|||
|
|
sheet_names=sheet_names,
|
|||
|
|
allow_missing_sheets=allow_missing_sheets,
|
|||
|
|
)
|
|||
|
|
source_path = xlsx_path
|
|||
|
|
|
|||
|
|
output_path = resolve_output_path(args.output, source_path, rawid_data)
|
|||
|
|
cve_time_range = get_cve_time_range(rawid_data)
|
|||
|
|
|
|||
|
|
total_rawids = sum(len(records) for records in rawid_data.values())
|
|||
|
|
print(f"待查询 {len(rawid_data)} 个工况,{total_rawids} 条 rawid 记录")
|
|||
|
|
|
|||
|
|
result = build_clips_manifest(rawid_data)
|
|||
|
|
output_payload = build_output_payload(result)
|
|||
|
|
|
|||
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|||
|
|
json.dump(output_payload, f, ensure_ascii=False, indent=2)
|
|||
|
|
|
|||
|
|
print_summary(result, output_path, cve_time_range=cve_time_range)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|