""" 按 group_id 提取真值产物(标注 JSON + 标定文件,可选保存图片)。 用法: python extract_gt_by_group_id.py \ --group_ids ... \ --output /data/output_dir \ [--car_name G1M3_0630] \ [--product_type 2d-3d-association] \ [--camera_id camera4] \ [--image_suffix png] \ [--annotation_stride 2] \ [--skip_images] \ [--workers 4] 或者通过文本文件批量输入(每行: group_uuid [date] [car_name]): python extract_gt_by_group_id.py \ --uuid_file /data/group_ids.txt \ --output /data/output_dir """ import argparse import io import os import tarfile from datetime import datetime, timezone, timedelta from math import ceil from multiprocessing import Pool from typing import Dict, List, Optional, Tuple import numpy as np import pandas as pd from dotenv import load_dotenv load_dotenv() from pdcl_dss import Group os.environ['STS_UID'] = 'dis-uploader' os.environ['STS_SECRET_KEY'] = '277310cc09724d315514a79701fecb0f' BEIJING_TZ = timezone(timedelta(hours=8)) DEFAULT_CAR_NAME = 'unknown' DEFAULT_DATE_NAME = '00000000000000' DEFAULT_IMAGE_SUFFIX = 'png' DEFAULT_ANNOTATION_STRIDE = 1 SUPPORTED_IMAGE_SUFFIXES = {'png', 'jpg', 'jpeg'} # --------------------------------------------------------------------------- # 视频解码工具 # --------------------------------------------------------------------------- def _plane_to_ndarray(plane) -> np.ndarray: stride = plane.line_size height = plane.height width = plane.width arr = np.frombuffer(plane, dtype=np.uint8) if stride == width: return arr.reshape(height, width) return arr.reshape(height, stride)[:, :width] def _yuvj420p_to_bgr(frame) -> np.ndarray: """将 av YUVJ420P 帧转换为 BGR ndarray。""" import cv2 y = _plane_to_ndarray(frame.planes[0]) u = _plane_to_ndarray(frame.planes[1]) v = _plane_to_ndarray(frame.planes[2]) h, w = y.shape uv = np.zeros((h // 2, w), dtype=np.uint8) uv[:, 0::2] = u uv[:, 1::2] = v yuv = np.concatenate([y, uv], axis=0) return cv2.cvtColor(yuv, cv2.COLOR_YUV2BGR_NV12) def _h265_bytes_to_bgr(frame_bytes: bytes) -> np.ndarray: import av container = av.open(io.BytesIO(frame_bytes)) for av_frame in container.decode(video=0): return _yuvj420p_to_bgr(av_frame) raise ValueError("H.265 解码失败") # --------------------------------------------------------------------------- # PDCL 数据读取 # --------------------------------------------------------------------------- def get_frame_timestamps(group_uuid: str) -> pd.DataFrame: """从 clean_data_archive.tar 中读取 frame_timestamps.csv。""" with Group(group_uuid) as group: with group.open('clean_data_archive.tar', mode='rb') as f: tar = tarfile.open(fileobj=f) csv_obj = tar.extractfile(tar.getmember('frame_timestamps.csv')) return pd.read_csv(csv_obj) def get_group_meta(group_uuid: str) -> Dict: """读取 group 元数据。""" with Group(group_uuid) as group: return dict(group.meta) def get_clip_paths(group_uuid: str) -> List[str]: """获取 group 下所有 clip 的本地缓存路径。""" from pdcl_dss import Clip paths = [] with Group(group_uuid) as group: for clip_ukey in group.list_clip_ukeys(): clip = Clip(clip_ukey) files, _ = clip.list_files() paths.append(clip.get_cache_path(files[0])) return paths def _find_index_by_timestamp(df: pd.DataFrame, camera_id: str, ts_ns: int) -> int: return df[abs(df[camera_id] - ts_ns) < 1e-3].index[0] def _build_timestamp_index(df: pd.DataFrame, camera_id: str) -> Dict[int, int]: if camera_id not in df.columns: return {} timestamp_index: Dict[int, int] = {} for json_index, value in df[camera_id].items(): if pd.isna(value): continue timestamp_index[int(value)] = int(json_index) return timestamp_index def _find_index_by_timestamp_fast( timestamp_index: Dict[int, int], df: pd.DataFrame, camera_id: str, ts_ns: int, ) -> int: json_index = timestamp_index.get(int(ts_ns)) if json_index is not None: return json_index return _find_index_by_timestamp(df, camera_id, ts_ns) def _is_missing_car_name(car_name: str) -> bool: return not car_name or car_name == DEFAULT_CAR_NAME def _is_missing_date_name(date_name: str) -> bool: return not date_name or date_name == DEFAULT_DATE_NAME def _format_ms_to_date_name(timestamp_ms: int) -> str: dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc).astimezone(BEIJING_TZ) return dt.strftime('%Y%m%d%H%M%S') def _get_frame_id_column(camera_id: str) -> str: return f'{camera_id}_frame_id' def _is_target_camera_annotation_member(member_name: str, camera_id: str) -> bool: normalized_name = member_name.replace('\\', '/') target_fragment = f'/{camera_id}/camera_radar_lidar_jsons/' return normalized_name.endswith('.json') and target_fragment in f'/{normalized_name}' def normalize_image_suffix(image_suffix: str) -> str: normalized = image_suffix.strip().lower().lstrip('.') if normalized not in SUPPORTED_IMAGE_SUFFIXES: supported = ', '.join(sorted(SUPPORTED_IMAGE_SUFFIXES)) raise ValueError(f'不支持的 image_suffix: {image_suffix}. 支持: {supported}') return normalized def normalize_annotation_stride(annotation_stride: int) -> int: if annotation_stride < 1: raise ValueError(f'annotation_stride 必须 >= 1,当前为: {annotation_stride}') return annotation_stride def _get_frame_timestamp_ns(df: pd.DataFrame, camera_id: str, json_index: int) -> Optional[int]: try: value = df.at[json_index, camera_id] except (KeyError, ValueError): return None if pd.isna(value): return None return int(value) def _build_frame_basename( car_name: str, date_name: str, group_uuid: str, json_index: int, camera_id: str, df: pd.DataFrame, ) -> str: parts = [car_name, date_name, group_uuid, f'{json_index:06d}'] frame_id_col = _get_frame_id_column(camera_id) if frame_id_col in df.columns: frame_id = df.at[json_index, frame_id_col] if not pd.isna(frame_id): parts.append(str(int(frame_id))) frame_timestamp_ns = _get_frame_timestamp_ns(df, camera_id, json_index) if frame_timestamp_ns is not None: parts.append(str(frame_timestamp_ns)) return '_'.join(parts) def _infer_car_name(group_meta: Dict) -> Tuple[str, str]: for key in ('plate_number', 'vehicle_name', 'car_name', 'plateNumber', 'vehicleName', 'carName'): value = str(group_meta.get(key, '')).strip() if value: return value, key project_name = str(group_meta.get('project_name', '')).strip() if project_name: return project_name, 'project_name' return DEFAULT_CAR_NAME, 'default' def _infer_date_name(group_meta: Dict, df: pd.DataFrame, camera_id: str) -> Tuple[str, str]: collection_time = group_meta.get('collection_time') if isinstance(collection_time, (int, float)) and collection_time > 0: return _format_ms_to_date_name(int(collection_time)), 'collection_time' candidate_columns = [camera_id] if camera_id in df.columns else [] candidate_columns.extend(col for col in df.columns if col not in candidate_columns) for col in candidate_columns: col_name = str(col) if col_name.startswith('Unnamed:'): continue if any(token in col_name for token in ('frame_id', 'clip_uuid', 'point_count', 'speed')): continue values = pd.to_numeric(df[col], errors='coerce').dropna() if values.empty: continue timestamp_ns = int(values.iloc[0]) dt = datetime.fromtimestamp(timestamp_ns / 1e9, tz=timezone.utc).astimezone(BEIJING_TZ) return dt.strftime('%Y%m%d%H%M%S'), f'frame_timestamps:{col_name}' return DEFAULT_DATE_NAME, 'default' def resolve_group_identifiers( group_uuid: str, car_name: str, date_name: str, df: pd.DataFrame, camera_id: str, ) -> Tuple[str, str, List[str]]: """ 仅在 car_name/date 为缺省值时,尝试从 group 元数据和时间戳中自动补全。 """ resolved_car_name = car_name resolved_date_name = date_name messages: List[str] = [] need_car_name = _is_missing_car_name(car_name) need_date_name = _is_missing_date_name(date_name) if not need_car_name and not need_date_name: return resolved_car_name, resolved_date_name, messages group_meta = get_group_meta(group_uuid) if need_car_name: resolved_car_name, source = _infer_car_name(group_meta) messages.append(f'[{group_uuid}] 自动补全 car_name={resolved_car_name} (source={source})') if need_date_name: resolved_date_name, source = _infer_date_name(group_meta, df, camera_id) messages.append(f'[{group_uuid}] 自动补全 date={resolved_date_name} (source={source})') return resolved_car_name, resolved_date_name, messages # --------------------------------------------------------------------------- # 图片保存 # --------------------------------------------------------------------------- def save_images( group_uuid: str, car_name: str, date_name: str, valid_ids: Optional[List[int]], clip_paths: List[str], camera_id: str, df: pd.DataFrame, timestamp_index: Dict[int, int], output_path: str, image_suffix: str, ) -> int: """ 解析视频流并保存为指定后缀图片。 Args: valid_ids: 要保存的 json_index 列表;若为 None 则保存所有帧。 Returns: 保存的图片数量。 """ import cv2 from pdcl_pyclip.decoder_struct import StructDecoder from pdcl_pyclip.msg_camera import VideoMessage from pdcl_pyclip.reader import ClipReader image_dir = os.path.join(output_path, 'images') os.makedirs(image_dir, exist_ok=True) valid_set = set(valid_ids) if valid_ids is not None else None struct_decoder = StructDecoder() saved = 0 for clip_path in clip_paths: reader = ClipReader(clip_path) for schema, channel, msg in reader.iter_messages(topics=[camera_id]): if schema.encoding != 'struct': continue data = struct_decoder.decode(schema, channel, msg) if not isinstance(data, VideoMessage): continue json_index = _find_index_by_timestamp_fast(timestamp_index, df, camera_id, msg.log_time) if valid_set is not None and json_index not in valid_set: continue frame_timestamp_ns = _get_frame_timestamp_ns(df, camera_id, json_index) if frame_timestamp_ns is not None and msg.log_time != frame_timestamp_ns: print( f" [WARN] 帧 {json_index} 时间戳不一致: " f"msg.log_time={msg.log_time}, df[{camera_id}]={frame_timestamp_ns}" ) try: img = _h265_bytes_to_bgr(data.payload) except Exception as e: print(f" [WARN] 帧 {json_index} 解码失败: {e}") continue fname = ( f'{_build_frame_basename(car_name, date_name, group_uuid, json_index, camera_id, df)}' f'.{image_suffix}' ) if not cv2.imwrite(os.path.join(image_dir, fname), img): print(f" [WARN] 帧 {json_index} 写图失败: {fname}") continue saved += 1 return saved # --------------------------------------------------------------------------- # 标注 / 标定文件解包 # --------------------------------------------------------------------------- def extract_annotations_and_calib( group_uuid: str, car_name: str, date_name: str, product_type: str, camera_id: str, df: pd.DataFrame, output_path: str, annotation_stride: int = DEFAULT_ANNOTATION_STRIDE, ) -> List[int]: """ 从真值产物 tar 包中提取标注 JSON 和标定文件。 Returns: 实际保存的有效帧 json_index 列表(仅目标 camera_id 下的帧)。 """ tar_filename = f'{product_type}_archive.tar' valid_ids: List[int] = [] candidate_annotation_count = 0 saved_annotation_names = set() with Group(group_uuid) as group: if group.get_product(product_type) is None: print(f" [WARN] Group {group_uuid} 不含产物 {product_type},跳过") return valid_ids calib_dir = os.path.join(output_path, 'calib') annotations_dir = os.path.join(output_path, 'annotations') os.makedirs(calib_dir, exist_ok=True) os.makedirs(annotations_dir, exist_ok=True) with group.open(tar_filename, mode='rb') as f: with tarfile.open(fileobj=f, mode='r|*') as tar: for member in tar: if 'depth_map_fg' in member.name: continue file_obj = tar.extractfile(member) if file_obj is None: continue # ── 标定文件 ───────────────────────────────────────── if 'calib' in member.name: calib_fname = os.path.basename(member.name) with open(os.path.join(calib_dir, calib_fname), 'wb') as fw: fw.write(file_obj.read()) continue # ── 标注 JSON ───────────────────────────────────────── if _is_target_camera_annotation_member(member.name, camera_id): json_fname = os.path.basename(member.name) try: json_index = int(json_fname[:-5]) except ValueError: print(f" [WARN] 无法解析帧号: {member.name}") continue candidate_annotation_count += 1 if (candidate_annotation_count - 1) % annotation_stride != 0: continue try: new_fname = ( f'{_build_frame_basename(car_name, date_name, group_uuid, json_index, camera_id, df)}.json' ) except (KeyError, IndexError, ValueError): new_fname = f'{car_name}_{date_name}_{group_uuid}_{json_index:06d}.json' if new_fname in saved_annotation_names: print(f" [WARN] 重复标注输出名,跳过覆盖: {new_fname} <- {member.name}") continue with open(os.path.join(annotations_dir, new_fname), 'wb') as fw: fw.write(file_obj.read()) saved_annotation_names.add(new_fname) valid_ids.append(json_index) print( f" [INFO] annotations({camera_id}) 保存 {len(valid_ids)} / " f"{candidate_annotation_count} (annotation_stride={annotation_stride})" ) return valid_ids # --------------------------------------------------------------------------- # 单个 group 的完整处理流程 # --------------------------------------------------------------------------- def process_group( group_uuid: str, car_name: str, date_name: str, output_root: str, product_type: str = '2d-3d-association', camera_id: str = 'camera4', image_suffix: str = DEFAULT_IMAGE_SUFFIX, annotation_stride: int = DEFAULT_ANNOTATION_STRIDE, skip_images: bool = False, ) -> Tuple[str, str]: """ 处理单个 group:提取标注、标定文件,并按需保存对应图片。 Returns: (group_uuid, status_message) """ output_path = os.path.join(output_root, group_uuid) try: df = get_frame_timestamps(group_uuid) car_name, date_name, messages = resolve_group_identifiers( group_uuid, car_name, date_name, df, camera_id ) print( f"[{group_uuid}] 开始处理 " f"(car={car_name}, date={date_name}, " f"annotation_stride={annotation_stride}, skip_images={skip_images})" ) for message in messages: print(message) # 1. 提取标注 + 标定,获取有效帧列表 valid_ids = extract_annotations_and_calib( group_uuid, car_name, date_name, product_type, camera_id, df, output_path, annotation_stride, ) if not valid_ids: print(f"[{group_uuid}] 无有效标注帧,跳过后续处理") return group_uuid, 'success (no valid frames)' valid_ids.sort() if skip_images: print(f"[{group_uuid}] 共 {len(valid_ids)} 个有效帧,按要求跳过图片保存") return group_uuid, f'success ({len(valid_ids)} annotations only)' print(f"[{group_uuid}] 共 {len(valid_ids)} 个有效帧,开始保存图片") # 2. 保存对应帧图片 timestamp_index = _build_timestamp_index(df, camera_id) clip_paths = get_clip_paths(group_uuid) n_saved = save_images( group_uuid, car_name, date_name, valid_ids, clip_paths, camera_id, df, timestamp_index, output_path, image_suffix ) print(f"[{group_uuid}] 完成,保存图片 {n_saved} 张") return group_uuid, f'success ({n_saved} images)' except Exception as e: import traceback print(f"[{group_uuid}] 处理失败: {e}\n{traceback.format_exc()}") return group_uuid, f'failed: {e}' # --------------------------------------------------------------------------- # 多进程入口 # --------------------------------------------------------------------------- def _worker(args): return process_group(*args) def run( tasks: List[Tuple[str, str, str]], # [(group_uuid, car_name, date_name), ...] output_root: str, product_type: str, camera_id: str, image_suffix: str, annotation_stride: int, skip_images: bool, workers: int, ): if not tasks: print('没有需要处理的 group') return full_tasks = [ ( uuid, car, date, output_root, product_type, camera_id, image_suffix, annotation_stride, skip_images, ) for uuid, car, date in tasks ] actual_workers = max(1, min(workers, len(full_tasks))) if actual_workers == 1: results = [_worker(t) for t in full_tasks] else: chunksize = max(1, ceil(len(full_tasks) / (actual_workers * 4))) with Pool(processes=actual_workers) as pool: results = list(pool.imap_unordered(_worker, full_tasks, chunksize=chunksize)) success = sum(1 for _, r in results if r.startswith('success')) failed = sum(1 for _, r in results if r.startswith('failed')) print(f"\n完成!总计 {len(results)} 成功 {success} 失败 {failed} workers {actual_workers}") for uuid, res in results: if res.startswith('failed'): print(f" FAILED {uuid}: {res}") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_uuid_file(filepath: str) -> List[Tuple[str, str, str]]: """ 解析 UUID 文本文件,每行格式: group_uuid [date] [car_name] 缺省 date/car_name 时会在处理时自动尝试补全。 """ tasks = [] seen = set() duplicate_count = 0 with open(filepath) as f: for line in f: parts = line.strip().split() if not parts: continue uuid = parts[0] date = parts[1] if len(parts) > 1 else DEFAULT_DATE_NAME car_name = parts[2] if len(parts) > 2 else DEFAULT_CAR_NAME task = (uuid, car_name, date) if task in seen: duplicate_count += 1 continue seen.add(task) tasks.append(task) if duplicate_count: print(f"[uuid_file] 跳过重复 group_uuid {duplicate_count} 条") return tasks def main(): parser = argparse.ArgumentParser(description='按 group_id 提取 PDCL 真值产物') src = parser.add_mutually_exclusive_group(required=True) src.add_argument('--group_ids', nargs='+', metavar='UUID', help='直接指定一个或多个 group UUID') src.add_argument('--uuid_file', metavar='FILE', help='UUID 列表文件(每行: uuid [date] [car_name])') parser.add_argument('--output', required=True, help='输出根目录') parser.add_argument('--car_name', default=DEFAULT_CAR_NAME, help='车辆标识;默认会尝试从 group 元数据自动推断') parser.add_argument('--date', default=DEFAULT_DATE_NAME, help='日期字符串;默认会尝试从 collection_time 或时间戳自动推断') parser.add_argument('--product_type', default='2d-3d-association', help='真值产物类型(默认: 2d-3d-association)') parser.add_argument('--camera_id', default='camera4', help='相机 topic 名称') parser.add_argument('--image_suffix', default=DEFAULT_IMAGE_SUFFIX, help='输出图像后缀,支持 png/jpg/jpeg(默认: png)') parser.add_argument('--annotation_stride', type=int, default=DEFAULT_ANNOTATION_STRIDE, help='标注保存抽样步长;1 为全保存,2 为 2 抽 1,同时图片也会与保留标注对齐') parser.add_argument('--skip_images', action='store_true', help='只提取 annotations/calib,不解码或保存图片帧') parser.add_argument('--workers', type=int, default=4, help='并行进程数') args = parser.parse_args() image_suffix = normalize_image_suffix(args.image_suffix) annotation_stride = normalize_annotation_stride(args.annotation_stride) if args.uuid_file: tasks = parse_uuid_file(args.uuid_file) else: tasks = [(uuid, args.car_name, args.date) for uuid in args.group_ids] os.makedirs(args.output, exist_ok=True) run( tasks, args.output, args.product_type, args.camera_id, image_suffix, annotation_stride, args.skip_images, args.workers, ) if __name__ == '__main__': main()