import requests import json,glob import os import tarfile import av import io import cv2 import numpy as np import pandas as pd from typing import Dict, List, Any from enum import IntEnum from datetime import datetime, timezone, timedelta import multiprocessing from multiprocessing import Pool from dotenv import load_dotenv load_dotenv() from pdcl_dss import DSSClient, Raw, Group, Clip from pdcl_pyclip.reader import ClipReader from pdcl_pyclip.msg_camera import VideoMessage from pdcl_pyclip.decoder_struct import StructDecoder from pdcl_pyclip.decoder_protobuf import ProtobufDecoder os.environ['STS_UID'] = 'dis-uploader' os.environ['STS_SECRET_KEY'] = '277310cc09724d315514a79701fecb0f' client = DSSClient() GET_JSON_BY_UKEY_API_URL = ( "https://d.minieye.tech/dis-api/dashboard/calib-file-manage/json-files-by-ukey" ) BEIJING_TZ = timezone(timedelta(hours=8)) det_cls_map = { 'kVehicle':0, 'kPed':1, 'kBike':2, 'kCyclist':3, 'kCone':4, 'kRoadBarrier':4, 'kPedHead':5, 'kSmallTrafficSign': 6, 'kWarningTriangle' :6, 'kBigTrafficSign':7, 'kVehiclePlate' :8, 'kVehicleWheel':9, 'kTrafficLight':10, 'kTrafficLightBulb' :11, 'kTrafficLightDigit':12 } posecls = { 'kInvalid' : 0, # 背景 'kLeftTail' : 1 , # 左侧同向车 'kMidTail' : 2, # 中间同向车 'kRightTail' : 3, # 右侧同向车 'kLeftHead' : 4, # 左侧对向车 'kMidHead' : 5, # 中间对向车 'kRightHead' : 6, # 右侧对向车 'kLeftSide' : 7, # 朝左横向车 'kRightSide' : 8, # 朝右横向车 'kLeftCutIn' : 9, # 左侧切入 'kRightCutIn' : 10, # 右侧切入 'kLeftCutOut' : 11, # 左侧切出 'kRightCutOut' : 12, # 右侧切出 'kOccluded' : 13, # 车头遮挡/车尾遮挡/横向遮挡 (deprecated) 'kSide' : 14, # 横向车(不知道朝左,还是朝右) 'kOccludedTail' : 15, # 车尾遮挡 'kOccludedHead' : 16, # 车头遮挡 'kOccludedSide' : 17, # 横向遮挡 'kUnknownPose' : 20 # unknown pose } subcls = { 'kNegative' : 0, # 背景 'kBus' : 1, # 大巴 'kCar' : 2, # 小轿车,suv 'kMiniBus' : 3, # 面包车 'kBucketTruck' : 4, # 斗卡 'kContainerTruck' : 5, # 箱卡 'kTricycle' : 6, # 三轮车 'kTanker' : 7, # 油罐车,晒水车(车身带有圆形,椭圆形,半圆形的罐) 'kCementTankTruck' : 8, # 水泥罐车 'kPickup' : 9, # 皮卡 'kSedimentTruck' : 10, # 渣土车 'kIveco' : 11, # 依维柯 'kSpecialCar' : 12, # 异型车 'kCityAuto' : 13, # 市政车 'kVehicleUnknown' : 14, # 未知车辆 'kWrecker' : 15, # 小型拖车,清障车 'kFlatTransporter' : 16,# 大型平板拖车 'kTractorHead' : 17, # 卡车车头,牵引车车头 'kSpecialFlatTransporter' : 18, # 大型特种拖车(相对于大型平板拖车会多挡板) 'kCarCarrier' : 19, # 轿运车 # 盖布车(车身上的货物全部被布盖着,同时对车辆尾部进行了遮挡) 'kTruckWithTrap' : 20, 'kEngineeringVehicle' : 21, # 工程车 'kSweeper' : 22, # 扫地车 'kRoadServiceCar' : 23, # 道路维修车 'kSanitationTruck' : 24, # 环卫车 'kGarbageTruck' : 25, # 垃圾车 } def timestamp_to_ymd(timestamp_ms: int) -> str: """ 将毫秒时间戳转换为北京时间 YYYYMMDD 格式 例如:1712345678900 -> '20250406' """ try: timestamp_s = timestamp_ms / 1000 utc_time = datetime.fromtimestamp(timestamp_s, tz=timezone.utc) beijing_time = utc_time.astimezone(BEIJING_TZ) return beijing_time.strftime('%Y%m%d%H%M%S') except Exception as e: raise ValueError(f"无效毫秒时间戳 {timestamp_ms!r}: {e}") from e def exist_product_in_group(group_meta, product_type): ''' 判断group是否存在某种真值产物 ''' if group_meta['failReason'] == '' and group_meta['products'] is not None: with Group(group_meta['id']) as group: if group.get_product(product_type) is not None: return True return False def plane_to_ndarray(plane): """ 将 VideoPlane 对象转换为 NumPy 数组 """ # 获取平面的步长(stride)和行数 stride = plane.line_size height = plane.height width = plane.width # 根据步长创建数组 # 如果步长等于宽度,可以直接重塑 if stride == width: array = np.frombuffer(plane, dtype=np.uint8).reshape(height, width) else: # 如果步长不等于宽度,需要处理填充字节 array = np.frombuffer(plane, dtype=np.uint8) # 取每行的前width个字节 array = array.reshape(height, stride)[:, :width] return array def yuvj420p_to_nv12(y_plane, u_plane, v_plane): """ 将 YUVJ420P(3个平面)转换为 NV12(2个平面)格式。 参数: y_plane (np.ndarray): 亮度Y平面,形状为 (height, width), dtype=uint8 u_plane (np.ndarray): 色度U平面,形状为 (height\\\\2, width\\\\2), dtype=uint8 v_plane (np.ndarray): 色度V平面,形状为 (height\\\\2, width\\\\2), dtype=uint8 返回: tuple: (y_plane_nv12, uv_plane_nv12) - y_plane_nv12: 亮度平面,与输入相同 - uv_plane_nv12: 交织的UV平面,形状为 (height\\\\2, width) """ # 1. Y平面直接复制(保持不变) y_plane_nv12 = y_plane.copy() # 2. 获取尺寸信息 uv_height = u_plane.shape[0] # height \\\\ 2 uv_width = u_plane.shape[1] # width \\\\ 2 # 3. 创建目标UV平面,宽度是原UV宽度的2倍(因为要交错存放U和V) uv_plane_nv12 = np.zeros((uv_height, uv_width * 2), dtype=np.uint8) # 4. 将U和V平面数据交错地填入目标UV平面 # UV平面的偶数索引位置放U,奇数索引位置放V uv_plane_nv12[:, 0::2] = u_plane # 所有行,第0,2,4,...列 = U uv_plane_nv12[:, 1::2] = v_plane # 所有行,第1,3,5,...列 = V return y_plane_nv12, uv_plane_nv12 def nv12_to_bgr_manual(y_plane, uv_plane): """ 手动将 NV12 转换为 BGR """ yuv_image = np.concatenate((y_plane, uv_plane), axis=0) bgr_image = cv2.cvtColor(yuv_image, cv2.COLOR_YUV2BGR_NV12) # 使用OpenCV进行YUV到BGR的转换 return bgr_image def cam_img_kb(x,y,z,cam_intrinsic,distort_param): # objp=np.array([x/z,y/z,1]).reshape(1,-1,3) # rvec = np.array([[[0., 0., 0.]]]) # tvec = np.array([[[0., 0., 0.]]]) # image_coord, _ = cv2.fisheye.projectPoints(objp, rvec, tvec,cam_intrinsic, distort_param) # image_coord = image_coord.reshape(-1) # return image_coord camxyz = np.array([x/z, y/z]).reshape(-1,2) a = camxyz[...,0] b = camxyz[...,1] r = np.sqrt(a * a + b * b) theta = np.arctan(r) k1 = distort_param[0] k2 = distort_param[1] k3 = distort_param[2] k4 = distort_param[3] thetaD = theta * (1 + k1 * theta**2 + k2 * theta**4 + k3 * theta**6 + k4 * theta**8) xp = thetaD / r * a yp = thetaD / r * b fx = cam_intrinsic[0,0] fy = cam_intrinsic[1,1] cx = cam_intrinsic[0,2] cy = cam_intrinsic[1,2] imgx = (fx * xp + cx) imgy = (fy * yp + cy) # mask_2d = (imgx>=0)&(imgx<3840)&(imgy>=0)&(imgy<2160) pix_point = np.round(np.concatenate((imgx.reshape((-1,1)),imgy.reshape((-1,1))),axis=-1)) # pix_point = pix_point[mask_2d].astype(np.int32) pix_point = pix_point.astype(np.int32) return pix_point.reshape(-1) def xyz_xy(x,y,distort_param,K): r = np.sqrt(x * x + y * y) r_2 = r * r r_4 = r_2 * r_2 r_6 = r_4 * r_2 m_distort_coeffs = distort_param coeffs = np.zeros(12,dtype=np.float32) for i in range(min(m_distort_coeffs.shape[0],12)): coeffs[i] = m_distort_coeffs[i] radial_ratio = (1 + coeffs[0] * r_2 + coeffs[1] * r_4 + coeffs[4] * r_6) / (1 + coeffs[5] * r_2 + coeffs[6] * r_4 + coeffs[7] * r_6) res_x = x * radial_ratio + 2 * coeffs[2] * x * y + coeffs[3] * (r_2 + 2 * pow(x, 2)) + coeffs[8] * r_2 + coeffs[9] * r_4 res_y = y * radial_ratio + 2 * coeffs[2] * (r_2 + 2 * pow(y, 2)) + coeffs[3] * x * y + coeffs[10] * r_2 + coeffs[11] * r_4 image_coord = np.dot(K,np.array([res_x,res_y,1])).T return image_coord def Lidar2OrigImg(pts3d, lidar_to_cam, distort_param, K, origW, origH): pts3d_cam = np.dot(lidar_to_cam[:3, :3],[pts3d[0],pts3d[1],pts3d[2]]) + lidar_to_cam[:3, 3] if len(distort_param)==5: pts2d_cam = xyz_xy(pts3d_cam[0] /pts3d_cam[2], pts3d_cam[1] / pts3d_cam[2], distort_param, K) else: pts2d_cam = cam_img_kb(pts3d_cam[0],pts3d_cam[1],pts3d_cam[2],K,distort_param) # pts2d_cam = cam_img_kb(pts3d_cam[0],pts3d_cam[1],pts3d_cam[2],K,distort_param) # pts2d_cam = xyz_xy(pts3d_cam[0] / pts3d_cam[2],pts3d_cam[1] / pts3d_cam[2],distort_param,K) pts2d_cam = pts2d_cam[0:2] pts2d_cam[0] = np.clip(pts2d_cam[0],0,origW-1) pts2d_cam[1] = np.clip(pts2d_cam[1],0,origH-1) return pts2d_cam, pts3d_cam def show_group_uuid(raw_id: str) -> None: ''' 通过 raw_id 列出所有通过清洗的 group_uuid ''' with Raw(raw_id) as raw: group_uuid_list = raw.list_group_ukeys() for group_uuid in group_uuid_list: print(group_uuid) def call_api(body: Dict[str, str], url: str) -> Dict[str, Any]: """ 调用标定文件管理API Args: body: 请求体参数,包含plate_number url: API接口地址 Returns: API返回的完整响应数据 """ headers = { "User-Agent": "Apifox/1.0.0 (https://apifox.com)", "Content-Type": "application/json", "Accept": "*/*", "Host": "d.minieye.tech", "Connection": "keep-alive", } try: response = requests.post(url, headers=headers, json=body) response.raise_for_status() # 检查HTTP错误 return response.json() except requests.exceptions.RequestException as e: print(f"请求失败: {e}") return {} except json.JSONDecodeError as e: print(f"JSON解析失败: {e}") return {} def wirite_calib_by_ukey(ukey: str, output_path: str) -> None: """ 通过 ukey 获取标定信息并写入本地文件 """ full_info = call_api(ukey, GET_JSON_BY_UKEY_API_URL)["json_files_content"] for key, value in full_info.items(): result_dir = os.path.join(output_path, os.path.dirname(key)) os.makedirs(result_dir, exist_ok=True) with open(os.path.join(output_path, key), 'w') as fw: json.dump(value, fw, indent=4) def get_calib_info(group_uuid: str, output_path: str) -> None: ''' 通过 group_uuid 获取标定信息 ''' with Group(group_uuid) as group: calib_ukey = group.meta.get('calib_filepath', None) if calib_ukey is None: print(f"未找到标定信息 for group_uuid: {group_uuid}") return ukey = {"ukey": calib_ukey} wirite_calib_by_ukey(ukey, output_path) def getRoadCurve(textpath): Curuuids = [] textlists = open(textpath,'r').readlines() for text in textlists: context = text.strip('\n').split(' ') uuid = context[0] frameid = context[1] couvature = float(context[2]) if abs(couvature) < 150: if uuid not in Curuuids: Curuuids.append(uuid) return Curuuids def find_index_by_timestamp(df: pd.DataFrame, camera_id: str, timestamp_ns: int) -> int: json_index = df[abs(df[f'{camera_id}'] - timestamp_ns) < 1e-3].index[0] return json_index def parse_video(car_name: str, date_name: str, groupuuid: str, json_list: List, clip_paths: List, camera_id: str, df: pd.DataFrame, output_path: str) -> Dict: ''' 解析视频流,保存为图片 ''' print(f" 开始解析视频并保存图片...") def h265_to_image(frame): container = av.open(io.BytesIO(frame)) for frame_yuvj420p in container.decode(video=0): y_plane = plane_to_ndarray(frame_yuvj420p.planes[0]) u_plane = plane_to_ndarray(frame_yuvj420p.planes[1]) v_plane = plane_to_ndarray(frame_yuvj420p.planes[2]) # 转换为 NV12 y_nv12, uv_nv12 = yuvj420p_to_nv12(y_plane, u_plane, v_plane) frame = nv12_to_bgr_manual(y_nv12, uv_nv12) return frame raise ValueError("解码失败") image_path = os.path.join(output_path, 'images') os.makedirs(image_path, exist_ok=True) struct_decoder = StructDecoder() saved_count = 0 for clip_path in clip_paths: reader = ClipReader(clip_path) for schema, channel, msg in reader.iter_messages(topics=[camera_id]): if schema.encoding == "struct": data = struct_decoder.decode(schema, channel, msg) if not isinstance(data, VideoMessage): continue camera_ts_ns = msg.log_time json_index = find_index_by_timestamp(df, camera_id, camera_ts_ns) if json_list is not None: if json_index not in json_list: continue frame = data.payload img = h265_to_image(frame) # filepath = os.path.join(image_path, f"{json_index:06d}.jpg") filepath = image_path + '/' + '%s_%s_%s_%06d_%d.png'%(car_name, date_name, groupuuid, json_index,df['camera4_frame_id'][json_index]) # 使用OpenCV保存图片 cv2.imwrite(str(filepath), img) saved_count += 1 print(f" 保存了 {saved_count} 张图片到 {image_path}") def get_clip_paths(group_uuid: str) -> List[str]: ''' 获取 group 下的所有 clip 路径 ''' clip_path_list = [] with Group(group_uuid) as group: for clip_ukey in group.list_clip_ukeys(): clip = Clip(clip_ukey) files, raw_files = clip.list_files() cache_file_path = clip.get_cache_path(files[0]) clip_path_list.append(cache_file_path) return clip_path_list class NumericType(IntEnum): UNKNOWN = 0 UINT8 = 1 INT8 = 2 UINT16 = 3 INT16 = 4 UINT32 = 5 INT32 = 6 FLOAT32 = 7 FLOAT64 = 8 def get_frame_timestamps_data_from_tar(group_uuid: str) -> pd.DataFrame: with Group(group_uuid) as group: # 获取时间同步表 with group.open('clean_data_archive.tar', mode="rb") as f: tar = tarfile.open(fileobj=f) csv_obj = tar.extractfile(tar.getmember("frame_timestamps.csv")) df = pd.read_csv(csv_obj) return df def load_point_cloud(json_index, df, clip_path_list): # 类型映射表 type_mapping = { NumericType.FLOAT32: 'f4', NumericType.FLOAT64: 'f8', NumericType.UINT8: 'u1', NumericType.UINT16: 'u2', NumericType.UINT32: 'u4', NumericType.INT8: 'i1', NumericType.INT16: 'i2', NumericType.INT32: 'i4' } # 根据时间戳搜索点云数据 protobuf_decoder = ProtobufDecoder() lidar_ts_ns = df.loc[json_index, 'Pandar128'] start_ts_ns = lidar_ts_ns - 1e9 end_ts_ns = lidar_ts_ns + 1e9 for clip_path in clip_path_list: reader = ClipReader(clip_path) for schema, channel, msg in reader.iter_messages(topics=['Pandar128'], start_time=start_ts_ns, end_time=end_ts_ns): if schema.encoding == "protobuf": data = protobuf_decoder.decode(schema, channel, msg) frame_ts = msg.log_time if abs(frame_ts - lidar_ts_ns) < 1e3: fields = data.fields # 1. 计算每个点的长度和类型 stride = 0 dtype_list = [] for field in fields: dtype = type_mapping[field.type] field_size = np.dtype(dtype).itemsize stride = max(stride, field.offset + field_size) dtype_list.append((field.name, type_mapping[field.type])) # 2. 解析二进制数据 num_points = len(data.data) // stride data = np.frombuffer(data.data, dtype=np.dtype(dtype_list), count=num_points) return np.vstack([data['x'], data['y'], data['z'], data['intensity']]).T raise Exception("未搜索到点云") def parse_truth_data(group_uuid: str, curvelists: list, output_path: str, car_name: str, date_name: str, df: pd.DataFrame) -> None: ''' 解析各类真值数据 ''' def untar_file(group_uuid: str, output_path: str, tar_filename: str, product_type: str, car_name: str, date_name: str, df: pd.DataFrame) -> None: with Group(group_uuid) as group: if group.get_product(product_type) is not None: with group.open(tar_filename, mode="rb") as f: tar = tarfile.open(fileobj=f) # 只提取JSON文件和calib文件,跳过depth_map_fg for member in tar.getmembers(): # 跳过深度图文件夹 if 'depth_map_fg' in member.name: continue # 优先判断calib文件(因为calib文件也可能是.json格式) if 'calib' in member.name: # 修改提取路径:将calib文件放到calib/目录 calib_filename = os.path.basename(member.name) calib_dir = os.path.join(output_path, 'calib') os.makedirs(calib_dir, exist_ok=True) # 提取文件内容并保存 file_obj = tar.extractfile(member) if file_obj: with open(os.path.join(calib_dir, calib_filename), 'wb') as f_out: f_out.write(file_obj.read()) # 提取annotations JSON文件到annotations文件夹 elif 'camera_radar_lidar_jsons' in member.name and member.name.endswith('.json'): # 提取文件名中的json_index json_filename = os.path.basename(member.name) try: # 从文件名中提取json_index (例如 "000001.json" -> 1) json_index = int(json_filename[:-5]) # 去掉.json后缀 # 获取对应的frame_id frame_id = df['camera4_frame_id'][json_index] # 使用和图片相同的命名格式 new_filename = '%s_%s_%s_%06d_%d.json' % (car_name, date_name, group_uuid, json_index, frame_id) annotations_dir = os.path.join(output_path, 'annotations') os.makedirs(annotations_dir, exist_ok=True) # 提取文件内容并保存 file_obj = tar.extractfile(member) if file_obj: with open(os.path.join(annotations_dir, new_filename), 'wb') as f_out: f_out.write(file_obj.read()) except (ValueError, KeyError, IndexError) as e: print(f"警告: 无法处理文件 {member.name}, 错误: {e}") else: print(f"Group {group_uuid} does not have product {product_type}.") def gen_valid_file(group_uuid: str, curvelists: list, output_path: str, tar_filename: str, product_type: str) -> None: valid_list = [] with Group(group_uuid) as group: if group.get_product(product_type) is not None: with group.open(tar_filename, mode="rb") as f: with tarfile.open(fileobj=f, mode="r|*") as tar: for member in tar: # print("读取", member.name) if 'calib/calib_pandar128_to_cam.json' in member.name: fr = tar.extractfile(member) calib_param = json.load(fr) lidar_to_cam = np.array(calib_param['camera4']["Extrinsic"]) cam_intrinsic = np.array(calib_param['camera4']["Intrinsic"]) distortion_param = np.array(calib_param['camera4']["Distortion"]) if 'camera_radar_lidar_jsons' not in member.name or '.json' not in member.name: continue ########## valid_list.append(member.name) # ##############curve#### # if len(curvelists) != 0: # if group_uuid in curvelists: # valid_list.append(member.name) # continue # ############# # fr = tar.extractfile(member) # objs = json.load(fr)['asso_list'] # valid_obj = 0 # for obj in objs: # label_2d = obj['camera_mea'] # label_det_cls = det_cls_map[label_2d['cls']] # pose = posecls.get(label_2d['pose']) # sublabel = subcls.get(label_2d['subcls']) # if label_det_cls != 0: # continue # label_lidar = obj['lidar_mea'] # if label_lidar is None: # continue # obj_lidar = [float(label_lidar[1]),float(label_lidar[2]),float(label_lidar[3]),float(label_lidar[4]),float(label_lidar[5]),float(label_lidar[6]),float(label_lidar[7])] # cam2d,cam3dpoint = Lidar2OrigImg(np.array([obj_lidar[0],obj_lidar[1],obj_lidar[2]]), lidar_to_cam, distortion_param, cam_intrinsic, 1920, 1080) # if abs(cam3dpoint[0]) < 15 and abs(cam3dpoint[2]) < 80 and obj_lidar[3] > 5.5: ####大车 # valid_obj += 1 # valid_list.append(member.name) # break ## # # elif abs(cam3dpoint[0]) < 8 and abs(cam3dpoint[2]) < 8 : ####大车切车 # # valid_obj += 1 # # valid_list.append(member.name) # # break ## else: print(f"Group {group_uuid} does not have product {product_type}.") return valid_list # product_type_list = [ # 'auto-2d-box-label', # 'auto-3d-box-label', # '2d-3d-association', # 'static-map' # ] product_type_list = [ '2d-3d-association' ] for product_type in product_type_list: tar_filename = f"{product_type}_archive.tar" valid_list = gen_valid_file(group_uuid, curvelists, output_path, tar_filename, product_type) if len(valid_list) != 0: untar_file(group_uuid, output_path, tar_filename, product_type, car_name, date_name, df) return valid_list def gettraindatagroupuuid(path): used_uuids = [] imglists = glob.glob(path + '*/*/images/*.png') imglists.sort() for img in imglists: name = os.path.basename(img) co = name.split('_') uuid = co[3] if uuid not in used_uuids: used_uuids.append(uuid) return used_uuids def process_single_uuid(args): """ 封装单个UUID的完整处理流程,作为多进程执行的任务函数 :param args: (group_uuid, uuid_date_dict, vehicle_number) :return: 处理结果(成功/失败),便于后续统计 """ group_uuid, uuid_date_dict, vehicle_number = args # existpath = '/mnt/hfs/xdzhu/G1M3_PDCL/0105_1616/' # used_uuids = os.listdir(existpath) # existpath2 = '/mnt/hfs/xdzhu/G1M3_PDCL/0105_1616_2/' # used_uuids2 = os.listdir(existpath2) # if group_uuid not in used_uuids and group_uuid not in used_uuids2: # 1. 初始化变量(原循环内的逻辑) curvelists = [] # 每个进程独立初始化,避免多进程共享冲突 date = uuid_date_dict[group_uuid][0] # 取该UUID的第一个日期 output_path = os.path.join(output, group_uuid) # 更规范的路径拼接 # 2. 获取时间同步表和clip路径(需要在parse_truth_data之前获取df) df = get_frame_timestamps_data_from_tar(group_uuid) clip_path_list = get_clip_paths(group_uuid) # 3. 解析真值数据(传递vehicle_number, date, df参数以便重命名JSON文件) valid_list = parse_truth_data(group_uuid, curvelists, output_path, vehicle_number, date, df) # 4. 无有效数据则跳过 if len(valid_list) == 0: print(f"UUID {group_uuid} 无有效数据,跳过") return (group_uuid, "success (no valid data)") # 5. 处理有效列表(修复原代码重复用i、追加元素的问题) valid_list.sort() valid_ids = [] # 单独定义ID列表,避免污染原valid_list for file_path in valid_list: name = os.path.basename(file_path) # 提取ID(假设文件名格式为 "xxx.id.json" 之类,-5 对应后缀长度) try: id = int(name[0:-5]) valid_ids.append(id) except ValueError: print(f"UUID {group_uuid} 文件名 {name} 解析ID失败,跳过") continue # 6. 执行图像解码 parse_video(vehicle_number, date, group_uuid, valid_ids, clip_path_list, 'camera4', df, output_path) print(f"UUID {group_uuid} 处理完成") return (group_uuid, "success") if __name__ == "__main__": product_type = '2d-3d-association' # 真值产物类型 project_name = 'G1M3_0630' # 项目名称 CAR = 'G1M3_0630' #'G1M3_AFS1616' #'G1M3_0877' #'G1M3_AFS1616' #'G1M3_FDL2232' output = '/data/zschen/dm3d/' # output = '/home/xdzhu/Data/test0877/' groupuuidpath = '/data/zschen/dm3d/0630_total_20260111.txt' textlists = open(groupuuidpath,'r').readlines() textlists.sort() uuid_date = {} group_uuidlists = [] for text in textlists: context = text.strip('\n').split(' ') date = context[1] # if date[0:8] == '20251129': # continue if context[0] not in group_uuidlists: group_uuidlists.append(context[0]) if context[0] not in uuid_date: uuid_date[context[0]] = [] uuid_date[context[0]].append(date) task_list = [(uuid, uuid_date, CAR) for uuid in group_uuidlists] cpu_count = multiprocessing.cpu_count() process_num = max(1, cpu_count - 2) # 至少1个进程 # process_num = 1 print(f"开始多进程处理,共 {len(group_uuidlists)} 个UUID,进程数: {process_num}") # 创建进程池并执行 with Pool(processes=process_num) as pool: # 映射任务到进程池,返回结果列表 results = pool.map(process_single_uuid, task_list) # 3. 统计处理结果 success_count = 0 fail_count = 0 try: for uuid, res in results: if res.startswith("success"): success_count += 1 else: fail_count += 1 except: pass print(f"\n处理完成!总计: {len(group_uuidlists)} | 成功: {success_count} | 失败: {fail_count}") # 可选:打印失败的UUID详情 try: fail_details = [(uuid, res) for uuid, res in results if not res.startswith("success")] if fail_details: print("失败详情:") for uuid, res in fail_details: print(f" {uuid}: {res}") except: pass