Files
yolov26_3d/tools/pdcl_inference/extract_data_from_pdcl.py
2026-06-24 09:35:46 +08:00

695 lines
28 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
import json,glob
import os
import tarfile
import av
import io
import cv2
import numpy as np
import pandas as pd
from typing import Dict, List, Any
from enum import IntEnum
from datetime import datetime, timezone, timedelta
import multiprocessing
from multiprocessing import Pool
from dotenv import load_dotenv
load_dotenv()
from pdcl_dss import DSSClient, Raw, Group, Clip
from pdcl_pyclip.reader import ClipReader
from pdcl_pyclip.msg_camera import VideoMessage
from pdcl_pyclip.decoder_struct import StructDecoder
from pdcl_pyclip.decoder_protobuf import ProtobufDecoder
os.environ['STS_UID'] = 'dis-uploader'
os.environ['STS_SECRET_KEY'] = '277310cc09724d315514a79701fecb0f'
client = DSSClient()
GET_JSON_BY_UKEY_API_URL = (
"https://d.minieye.tech/dis-api/dashboard/calib-file-manage/json-files-by-ukey"
)
BEIJING_TZ = timezone(timedelta(hours=8))
det_cls_map = { 'kVehicle':0,
'kPed':1,
'kBike':2,
'kCyclist':3,
'kCone':4,
'kRoadBarrier':4,
'kPedHead':5,
'kSmallTrafficSign': 6,
'kWarningTriangle' :6,
'kBigTrafficSign':7,
'kVehiclePlate' :8,
'kVehicleWheel':9,
'kTrafficLight':10,
'kTrafficLightBulb' :11,
'kTrafficLightDigit':12
}
posecls = {
'kInvalid' : 0, # 背景
'kLeftTail' : 1 , # 左侧同向车
'kMidTail' : 2, # 中间同向车
'kRightTail' : 3, # 右侧同向车
'kLeftHead' : 4, # 左侧对向车
'kMidHead' : 5, # 中间对向车
'kRightHead' : 6, # 右侧对向车
'kLeftSide' : 7, # 朝左横向车
'kRightSide' : 8, # 朝右横向车
'kLeftCutIn' : 9, # 左侧切入
'kRightCutIn' : 10, # 右侧切入
'kLeftCutOut' : 11, # 左侧切出
'kRightCutOut' : 12, # 右侧切出
'kOccluded' : 13, # 车头遮挡/车尾遮挡/横向遮挡 (deprecated)
'kSide' : 14, # 横向车(不知道朝左,还是朝右)
'kOccludedTail' : 15, # 车尾遮挡
'kOccludedHead' : 16, # 车头遮挡
'kOccludedSide' : 17, # 横向遮挡
'kUnknownPose' : 20 # unknown pose
}
subcls = {
'kNegative' : 0, # 背景
'kBus' : 1, # 大巴
'kCar' : 2, # 小轿车,suv
'kMiniBus' : 3, # 面包车
'kBucketTruck' : 4, # 斗卡
'kContainerTruck' : 5, # 箱卡
'kTricycle' : 6, # 三轮车
'kTanker' : 7, # 油罐车,晒水车(车身带有圆形,椭圆形,半圆形的罐)
'kCementTankTruck' : 8, # 水泥罐车
'kPickup' : 9, # 皮卡
'kSedimentTruck' : 10, # 渣土车
'kIveco' : 11, # 依维柯
'kSpecialCar' : 12, # 异型车
'kCityAuto' : 13, # 市政车
'kVehicleUnknown' : 14, # 未知车辆
'kWrecker' : 15, # 小型拖车,清障车
'kFlatTransporter' : 16,# 大型平板拖车
'kTractorHead' : 17, # 卡车车头,牵引车车头
'kSpecialFlatTransporter' : 18, # 大型特种拖车(相对于大型平板拖车会多挡板)
'kCarCarrier' : 19, # 轿运车
# 盖布车(车身上的货物全部被布盖着,同时对车辆尾部进行了遮挡)
'kTruckWithTrap' : 20,
'kEngineeringVehicle' : 21, # 工程车
'kSweeper' : 22, # 扫地车
'kRoadServiceCar' : 23, # 道路维修车
'kSanitationTruck' : 24, # 环卫车
'kGarbageTruck' : 25, # 垃圾车
}
def timestamp_to_ymd(timestamp_ms: int) -> str:
"""
将毫秒时间戳转换为北京时间 YYYYMMDD 格式
例如1712345678900 -> '20250406'
"""
try:
timestamp_s = timestamp_ms / 1000
utc_time = datetime.fromtimestamp(timestamp_s, tz=timezone.utc)
beijing_time = utc_time.astimezone(BEIJING_TZ)
return beijing_time.strftime('%Y%m%d%H%M%S')
except Exception as e:
raise ValueError(f"无效毫秒时间戳 {timestamp_ms!r}: {e}") from e
def exist_product_in_group(group_meta, product_type):
'''
判断group是否存在某种真值产物
'''
if group_meta['failReason'] == '' and group_meta['products'] is not None:
with Group(group_meta['id']) as group:
if group.get_product(product_type) is not None:
return True
return False
def plane_to_ndarray(plane):
"""
将 VideoPlane 对象转换为 NumPy 数组
"""
# 获取平面的步长stride和行数
stride = plane.line_size
height = plane.height
width = plane.width
# 根据步长创建数组
# 如果步长等于宽度,可以直接重塑
if stride == width:
array = np.frombuffer(plane, dtype=np.uint8).reshape(height, width)
else:
# 如果步长不等于宽度,需要处理填充字节
array = np.frombuffer(plane, dtype=np.uint8)
# 取每行的前width个字节
array = array.reshape(height, stride)[:, :width]
return array
def yuvj420p_to_nv12(y_plane, u_plane, v_plane):
"""
将 YUVJ420P3个平面转换为 NV122个平面格式。
参数:
y_plane (np.ndarray): 亮度Y平面形状为 (height, width), dtype=uint8
u_plane (np.ndarray): 色度U平面形状为 (height\\\\2, width\\\\2), dtype=uint8
v_plane (np.ndarray): 色度V平面形状为 (height\\\\2, width\\\\2), dtype=uint8
返回:
tuple: (y_plane_nv12, uv_plane_nv12)
- y_plane_nv12: 亮度平面,与输入相同
- uv_plane_nv12: 交织的UV平面形状为 (height\\\\2, width)
"""
# 1. Y平面直接复制保持不变
y_plane_nv12 = y_plane.copy()
# 2. 获取尺寸信息
uv_height = u_plane.shape[0] # height \\\\ 2
uv_width = u_plane.shape[1] # width \\\\ 2
# 3. 创建目标UV平面宽度是原UV宽度的2倍因为要交错存放U和V
uv_plane_nv12 = np.zeros((uv_height, uv_width * 2), dtype=np.uint8)
# 4. 将U和V平面数据交错地填入目标UV平面
# UV平面的偶数索引位置放U奇数索引位置放V
uv_plane_nv12[:, 0::2] = u_plane # 所有行第0,2,4,...列 = U
uv_plane_nv12[:, 1::2] = v_plane # 所有行第1,3,5,...列 = V
return y_plane_nv12, uv_plane_nv12
def nv12_to_bgr_manual(y_plane, uv_plane):
"""
手动将 NV12 转换为 BGR
"""
yuv_image = np.concatenate((y_plane, uv_plane), axis=0)
bgr_image = cv2.cvtColor(yuv_image, cv2.COLOR_YUV2BGR_NV12) # 使用OpenCV进行YUV到BGR的转换
return bgr_image
def cam_img_kb(x,y,z,cam_intrinsic,distort_param):
# objp=np.array([x/z,y/z,1]).reshape(1,-1,3)
# rvec = np.array([[[0., 0., 0.]]])
# tvec = np.array([[[0., 0., 0.]]])
# image_coord, _ = cv2.fisheye.projectPoints(objp, rvec, tvec,cam_intrinsic, distort_param)
# image_coord = image_coord.reshape(-1)
# return image_coord
camxyz = np.array([x/z, y/z]).reshape(-1,2)
a = camxyz[...,0]
b = camxyz[...,1]
r = np.sqrt(a * a + b * b)
theta = np.arctan(r)
k1 = distort_param[0]
k2 = distort_param[1]
k3 = distort_param[2]
k4 = distort_param[3]
thetaD = theta * (1 + k1 * theta**2 + k2 * theta**4 + k3 * theta**6 + k4 * theta**8)
xp = thetaD / r * a
yp = thetaD / r * b
fx = cam_intrinsic[0,0]
fy = cam_intrinsic[1,1]
cx = cam_intrinsic[0,2]
cy = cam_intrinsic[1,2]
imgx = (fx * xp + cx)
imgy = (fy * yp + cy)
# mask_2d = (imgx>=0)&(imgx<3840)&(imgy>=0)&(imgy<2160)
pix_point = np.round(np.concatenate((imgx.reshape((-1,1)),imgy.reshape((-1,1))),axis=-1))
# pix_point = pix_point[mask_2d].astype(np.int32)
pix_point = pix_point.astype(np.int32)
return pix_point.reshape(-1)
def xyz_xy(x,y,distort_param,K):
r = np.sqrt(x * x + y * y)
r_2 = r * r
r_4 = r_2 * r_2
r_6 = r_4 * r_2
m_distort_coeffs = distort_param
coeffs = np.zeros(12,dtype=np.float32)
for i in range(min(m_distort_coeffs.shape[0],12)):
coeffs[i] = m_distort_coeffs[i]
radial_ratio = (1 + coeffs[0] * r_2 + coeffs[1] * r_4 + coeffs[4] * r_6) / (1 + coeffs[5] * r_2 + coeffs[6] * r_4 + coeffs[7] * r_6)
res_x = x * radial_ratio + 2 * coeffs[2] * x * y + coeffs[3] * (r_2 + 2 * pow(x, 2)) + coeffs[8] * r_2 + coeffs[9] * r_4
res_y = y * radial_ratio + 2 * coeffs[2] * (r_2 + 2 * pow(y, 2)) + coeffs[3] * x * y + coeffs[10] * r_2 + coeffs[11] * r_4
image_coord = np.dot(K,np.array([res_x,res_y,1])).T
return image_coord
def Lidar2OrigImg(pts3d, lidar_to_cam, distort_param, K, origW, origH):
pts3d_cam = np.dot(lidar_to_cam[:3, :3],[pts3d[0],pts3d[1],pts3d[2]]) + lidar_to_cam[:3, 3]
if len(distort_param)==5:
pts2d_cam = xyz_xy(pts3d_cam[0] /pts3d_cam[2], pts3d_cam[1] / pts3d_cam[2], distort_param, K)
else:
pts2d_cam = cam_img_kb(pts3d_cam[0],pts3d_cam[1],pts3d_cam[2],K,distort_param)
# pts2d_cam = cam_img_kb(pts3d_cam[0],pts3d_cam[1],pts3d_cam[2],K,distort_param)
# pts2d_cam = xyz_xy(pts3d_cam[0] / pts3d_cam[2],pts3d_cam[1] / pts3d_cam[2],distort_param,K)
pts2d_cam = pts2d_cam[0:2]
pts2d_cam[0] = np.clip(pts2d_cam[0],0,origW-1)
pts2d_cam[1] = np.clip(pts2d_cam[1],0,origH-1)
return pts2d_cam, pts3d_cam
def show_group_uuid(raw_id: str) -> None:
'''
通过 raw_id 列出所有通过清洗的 group_uuid
'''
with Raw(raw_id) as raw:
group_uuid_list = raw.list_group_ukeys()
for group_uuid in group_uuid_list:
print(group_uuid)
def call_api(body: Dict[str, str], url: str) -> Dict[str, Any]:
"""
调用标定文件管理API
Args:
body: 请求体参数包含plate_number
url: API接口地址
Returns:
API返回的完整响应数据
"""
headers = {
"User-Agent": "Apifox/1.0.0 (https://apifox.com)",
"Content-Type": "application/json",
"Accept": "*/*",
"Host": "d.minieye.tech",
"Connection": "keep-alive",
}
try:
response = requests.post(url, headers=headers, json=body)
response.raise_for_status() # 检查HTTP错误
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return {}
except json.JSONDecodeError as e:
print(f"JSON解析失败: {e}")
return {}
def wirite_calib_by_ukey(ukey: str, output_path: str) -> None:
"""
通过 ukey 获取标定信息并写入本地文件
"""
full_info = call_api(ukey, GET_JSON_BY_UKEY_API_URL)["json_files_content"]
for key, value in full_info.items():
result_dir = os.path.join(output_path, os.path.dirname(key))
os.makedirs(result_dir, exist_ok=True)
with open(os.path.join(output_path, key), 'w') as fw:
json.dump(value, fw, indent=4)
def get_calib_info(group_uuid: str, output_path: str) -> None:
'''
通过 group_uuid 获取标定信息
'''
with Group(group_uuid) as group:
calib_ukey = group.meta.get('calib_filepath', None)
if calib_ukey is None:
print(f"未找到标定信息 for group_uuid: {group_uuid}")
return
ukey = {"ukey": calib_ukey}
wirite_calib_by_ukey(ukey, output_path)
def getRoadCurve(textpath):
Curuuids = []
textlists = open(textpath,'r').readlines()
for text in textlists:
context = text.strip('\n').split(' ')
uuid = context[0]
frameid = context[1]
couvature = float(context[2])
if abs(couvature) < 150:
if uuid not in Curuuids:
Curuuids.append(uuid)
return Curuuids
def find_index_by_timestamp(df: pd.DataFrame, camera_id: str, timestamp_ns: int) -> int:
json_index = df[abs(df[f'{camera_id}'] - timestamp_ns) < 1e-3].index[0]
return json_index
def parse_video(car_name: str, date_name: str, groupuuid: str, json_list: List, clip_paths: List, camera_id: str, df: pd.DataFrame, output_path: str) -> Dict:
'''
解析视频流,保存为图片
'''
print(f" 开始解析视频并保存图片...")
def h265_to_image(frame):
container = av.open(io.BytesIO(frame))
for frame_yuvj420p in container.decode(video=0):
y_plane = plane_to_ndarray(frame_yuvj420p.planes[0])
u_plane = plane_to_ndarray(frame_yuvj420p.planes[1])
v_plane = plane_to_ndarray(frame_yuvj420p.planes[2])
# 转换为 NV12
y_nv12, uv_nv12 = yuvj420p_to_nv12(y_plane, u_plane, v_plane)
frame = nv12_to_bgr_manual(y_nv12, uv_nv12)
return frame
raise ValueError("解码失败")
image_path = os.path.join(output_path, 'images')
os.makedirs(image_path, exist_ok=True)
struct_decoder = StructDecoder()
saved_count = 0
for clip_path in clip_paths:
reader = ClipReader(clip_path)
for schema, channel, msg in reader.iter_messages(topics=[camera_id]):
if schema.encoding == "struct":
data = struct_decoder.decode(schema, channel, msg)
if not isinstance(data, VideoMessage):
continue
camera_ts_ns = msg.log_time
json_index = find_index_by_timestamp(df, camera_id, camera_ts_ns)
if json_list is not None:
if json_index not in json_list:
continue
frame = data.payload
img = h265_to_image(frame)
# filepath = os.path.join(image_path, f"{json_index:06d}.jpg")
filepath = image_path + '/' + '%s_%s_%s_%06d_%d.png'%(car_name, date_name, groupuuid, json_index,df['camera4_frame_id'][json_index])
# 使用OpenCV保存图片
cv2.imwrite(str(filepath), img)
saved_count += 1
print(f" 保存了 {saved_count} 张图片到 {image_path}")
def get_clip_paths(group_uuid: str) -> List[str]:
'''
获取 group 下的所有 clip 路径
'''
clip_path_list = []
with Group(group_uuid) as group:
for clip_ukey in group.list_clip_ukeys():
clip = Clip(clip_ukey)
files, raw_files = clip.list_files()
cache_file_path = clip.get_cache_path(files[0])
clip_path_list.append(cache_file_path)
return clip_path_list
class NumericType(IntEnum):
UNKNOWN = 0
UINT8 = 1
INT8 = 2
UINT16 = 3
INT16 = 4
UINT32 = 5
INT32 = 6
FLOAT32 = 7
FLOAT64 = 8
def get_frame_timestamps_data_from_tar(group_uuid: str) -> pd.DataFrame:
with Group(group_uuid) as group:
# 获取时间同步表
with group.open('clean_data_archive.tar', mode="rb") as f:
tar = tarfile.open(fileobj=f)
csv_obj = tar.extractfile(tar.getmember("frame_timestamps.csv"))
df = pd.read_csv(csv_obj)
return df
def load_point_cloud(json_index, df, clip_path_list):
# 类型映射表
type_mapping = {
NumericType.FLOAT32: 'f4', NumericType.FLOAT64: 'f8',
NumericType.UINT8: 'u1', NumericType.UINT16: 'u2', NumericType.UINT32: 'u4',
NumericType.INT8: 'i1', NumericType.INT16: 'i2', NumericType.INT32: 'i4'
}
# 根据时间戳搜索点云数据
protobuf_decoder = ProtobufDecoder()
lidar_ts_ns = df.loc[json_index, 'Pandar128']
start_ts_ns = lidar_ts_ns - 1e9
end_ts_ns = lidar_ts_ns + 1e9
for clip_path in clip_path_list:
reader = ClipReader(clip_path)
for schema, channel, msg in reader.iter_messages(topics=['Pandar128'], start_time=start_ts_ns, end_time=end_ts_ns):
if schema.encoding == "protobuf":
data = protobuf_decoder.decode(schema, channel, msg)
frame_ts = msg.log_time
if abs(frame_ts - lidar_ts_ns) < 1e3:
fields = data.fields
# 1. 计算每个点的长度和类型
stride = 0
dtype_list = []
for field in fields:
dtype = type_mapping[field.type]
field_size = np.dtype(dtype).itemsize
stride = max(stride, field.offset + field_size)
dtype_list.append((field.name, type_mapping[field.type]))
# 2. 解析二进制数据
num_points = len(data.data) // stride
data = np.frombuffer(data.data, dtype=np.dtype(dtype_list), count=num_points)
return np.vstack([data['x'], data['y'], data['z'], data['intensity']]).T
raise Exception("未搜索到点云")
def parse_truth_data(group_uuid: str, curvelists: list, output_path: str, car_name: str, date_name: str, df: pd.DataFrame) -> None:
'''
解析各类真值数据
'''
def untar_file(group_uuid: str, output_path: str, tar_filename: str, product_type: str, car_name: str, date_name: str, df: pd.DataFrame) -> None:
with Group(group_uuid) as group:
if group.get_product(product_type) is not None:
with group.open(tar_filename, mode="rb") as f:
tar = tarfile.open(fileobj=f)
# 只提取JSON文件和calib文件跳过depth_map_fg
for member in tar.getmembers():
# 跳过深度图文件夹
if 'depth_map_fg' in member.name:
continue
# 优先判断calib文件因为calib文件也可能是.json格式
if 'calib' in member.name:
# 修改提取路径将calib文件放到calib/目录
calib_filename = os.path.basename(member.name)
calib_dir = os.path.join(output_path, 'calib')
os.makedirs(calib_dir, exist_ok=True)
# 提取文件内容并保存
file_obj = tar.extractfile(member)
if file_obj:
with open(os.path.join(calib_dir, calib_filename), 'wb') as f_out:
f_out.write(file_obj.read())
# 提取annotations JSON文件到annotations文件夹
elif 'camera_radar_lidar_jsons' in member.name and member.name.endswith('.json'):
# 提取文件名中的json_index
json_filename = os.path.basename(member.name)
try:
# 从文件名中提取json_index (例如 "000001.json" -> 1)
json_index = int(json_filename[:-5]) # 去掉.json后缀
# 获取对应的frame_id
frame_id = df['camera4_frame_id'][json_index]
# 使用和图片相同的命名格式
new_filename = '%s_%s_%s_%06d_%d.json' % (car_name, date_name, group_uuid, json_index, frame_id)
annotations_dir = os.path.join(output_path, 'annotations')
os.makedirs(annotations_dir, exist_ok=True)
# 提取文件内容并保存
file_obj = tar.extractfile(member)
if file_obj:
with open(os.path.join(annotations_dir, new_filename), 'wb') as f_out:
f_out.write(file_obj.read())
except (ValueError, KeyError, IndexError) as e:
print(f"警告: 无法处理文件 {member.name}, 错误: {e}")
else:
print(f"Group {group_uuid} does not have product {product_type}.")
def gen_valid_file(group_uuid: str, curvelists: list, output_path: str, tar_filename: str, product_type: str) -> None:
valid_list = []
with Group(group_uuid) as group:
if group.get_product(product_type) is not None:
with group.open(tar_filename, mode="rb") as f:
with tarfile.open(fileobj=f, mode="r|*") as tar:
for member in tar:
# print("读取", member.name)
if 'calib/calib_pandar128_to_cam.json' in member.name:
fr = tar.extractfile(member)
calib_param = json.load(fr)
lidar_to_cam = np.array(calib_param['camera4']["Extrinsic"])
cam_intrinsic = np.array(calib_param['camera4']["Intrinsic"])
distortion_param = np.array(calib_param['camera4']["Distortion"])
if 'camera_radar_lidar_jsons' not in member.name or '.json' not in member.name:
continue
##########
valid_list.append(member.name)
# ##############curve####
# if len(curvelists) != 0:
# if group_uuid in curvelists:
# valid_list.append(member.name)
# continue
# #############
# fr = tar.extractfile(member)
# objs = json.load(fr)['asso_list']
# valid_obj = 0
# for obj in objs:
# label_2d = obj['camera_mea']
# label_det_cls = det_cls_map[label_2d['cls']]
# pose = posecls.get(label_2d['pose'])
# sublabel = subcls.get(label_2d['subcls'])
# if label_det_cls != 0:
# continue
# label_lidar = obj['lidar_mea']
# if label_lidar is None:
# continue
# obj_lidar = [float(label_lidar[1]),float(label_lidar[2]),float(label_lidar[3]),float(label_lidar[4]),float(label_lidar[5]),float(label_lidar[6]),float(label_lidar[7])]
# cam2d,cam3dpoint = Lidar2OrigImg(np.array([obj_lidar[0],obj_lidar[1],obj_lidar[2]]), lidar_to_cam, distortion_param, cam_intrinsic, 1920, 1080)
# if abs(cam3dpoint[0]) < 15 and abs(cam3dpoint[2]) < 80 and obj_lidar[3] > 5.5: ####大车
# valid_obj += 1
# valid_list.append(member.name)
# break ##
# # elif abs(cam3dpoint[0]) < 8 and abs(cam3dpoint[2]) < 8 : ####大车切车
# # valid_obj += 1
# # valid_list.append(member.name)
# # break ##
else:
print(f"Group {group_uuid} does not have product {product_type}.")
return valid_list
# product_type_list = [
# 'auto-2d-box-label',
# 'auto-3d-box-label',
# '2d-3d-association',
# 'static-map'
# ]
product_type_list = [
'2d-3d-association'
]
for product_type in product_type_list:
tar_filename = f"{product_type}_archive.tar"
valid_list = gen_valid_file(group_uuid, curvelists, output_path, tar_filename, product_type)
if len(valid_list) != 0:
untar_file(group_uuid, output_path, tar_filename, product_type, car_name, date_name, df)
return valid_list
def gettraindatagroupuuid(path):
used_uuids = []
imglists = glob.glob(path + '*/*/images/*.png')
imglists.sort()
for img in imglists:
name = os.path.basename(img)
co = name.split('_')
uuid = co[3]
if uuid not in used_uuids:
used_uuids.append(uuid)
return used_uuids
def process_single_uuid(args):
"""
封装单个UUID的完整处理流程作为多进程执行的任务函数
:param args: (group_uuid, uuid_date_dict, vehicle_number)
:return: 处理结果(成功/失败),便于后续统计
"""
group_uuid, uuid_date_dict, vehicle_number = args
# existpath = '/mnt/hfs/xdzhu/G1M3_PDCL/0105_1616/'
# used_uuids = os.listdir(existpath)
# existpath2 = '/mnt/hfs/xdzhu/G1M3_PDCL/0105_1616_2/'
# used_uuids2 = os.listdir(existpath2)
# if group_uuid not in used_uuids and group_uuid not in used_uuids2:
# 1. 初始化变量(原循环内的逻辑)
curvelists = [] # 每个进程独立初始化,避免多进程共享冲突
date = uuid_date_dict[group_uuid][0] # 取该UUID的第一个日期
output_path = os.path.join(output, group_uuid) # 更规范的路径拼接
# 2. 获取时间同步表和clip路径需要在parse_truth_data之前获取df
df = get_frame_timestamps_data_from_tar(group_uuid)
clip_path_list = get_clip_paths(group_uuid)
# 3. 解析真值数据传递vehicle_number, date, df参数以便重命名JSON文件
valid_list = parse_truth_data(group_uuid, curvelists, output_path, vehicle_number, date, df)
# 4. 无有效数据则跳过
if len(valid_list) == 0:
print(f"UUID {group_uuid} 无有效数据,跳过")
return (group_uuid, "success (no valid data)")
# 5. 处理有效列表修复原代码重复用i、追加元素的问题
valid_list.sort()
valid_ids = [] # 单独定义ID列表避免污染原valid_list
for file_path in valid_list:
name = os.path.basename(file_path)
# 提取ID假设文件名格式为 "xxx.id.json" 之类,-5 对应后缀长度)
try:
id = int(name[0:-5])
valid_ids.append(id)
except ValueError:
print(f"UUID {group_uuid} 文件名 {name} 解析ID失败跳过")
continue
# 6. 执行图像解码
parse_video(vehicle_number, date, group_uuid, valid_ids, clip_path_list, 'camera4', df, output_path)
print(f"UUID {group_uuid} 处理完成")
return (group_uuid, "success")
if __name__ == "__main__":
product_type = '2d-3d-association' # 真值产物类型
project_name = 'G1M3_0630' # 项目名称
CAR = 'G1M3_0630' #'G1M3_AFS1616' #'G1M3_0877' #'G1M3_AFS1616' #'G1M3_FDL2232'
output = '/data/zschen/dm3d/'
# output = '/home/xdzhu/Data/test0877/'
groupuuidpath = '/data/zschen/dm3d/0630_total_20260111.txt'
textlists = open(groupuuidpath,'r').readlines()
textlists.sort()
uuid_date = {}
group_uuidlists = []
for text in textlists:
context = text.strip('\n').split(' ')
date = context[1]
# if date[0:8] == '20251129':
# continue
if context[0] not in group_uuidlists:
group_uuidlists.append(context[0])
if context[0] not in uuid_date:
uuid_date[context[0]] = []
uuid_date[context[0]].append(date)
task_list = [(uuid, uuid_date, CAR) for uuid in group_uuidlists]
cpu_count = multiprocessing.cpu_count()
process_num = max(1, cpu_count - 2) # 至少1个进程
# process_num = 1
print(f"开始多进程处理,共 {len(group_uuidlists)} 个UUID进程数: {process_num}")
# 创建进程池并执行
with Pool(processes=process_num) as pool:
# 映射任务到进程池,返回结果列表
results = pool.map(process_single_uuid, task_list)
# 3. 统计处理结果
success_count = 0
fail_count = 0
try:
for uuid, res in results:
if res.startswith("success"):
success_count += 1
else:
fail_count += 1
except:
pass
print(f"\n处理完成!总计: {len(group_uuidlists)} | 成功: {success_count} | 失败: {fail_count}")
# 可选打印失败的UUID详情
try:
fail_details = [(uuid, res) for uuid, res in results if not res.startswith("success")]
if fail_details:
print("失败详情:")
for uuid, res in fail_details:
print(f" {uuid}: {res}")
except:
pass