1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
| from typing import List, Tuple, Optional from dataclasses import dataclass import numpy as np
@dataclass class Detection: """检测结果""" position: Tuple[float, float, float] bounding_box: Optional[Tuple[int, int, int, int]] = None confidence: float = 0.0 class_label: str = "unknown" breathing_rate: Optional[float] = None
class SensorFusionCPD: """ 多传感器融合 CPD 系统 融合雷达和摄像头数据检测儿童 """ def __init__(self): self.radar_processor = CPDRadarProcessor(RadarConfig()) self.camera_intrinsics = np.array([ [800, 0, 640], [0, 800, 360], [0, 0, 1] ]) self.radar_to_camera = np.eye(4) self.breathing_rate_child_max = 30 self.breathing_rate_child_min = 18 self.height_child_max = 1.2 def detect(self, radar_data: np.ndarray, camera_frame: np.ndarray) -> List[Detection]: """ 融合检测 Args: radar_data: 雷达 ADC 数据 camera_frame: 摄像头图像帧 Returns: detections: 检测到的儿童列表 """ radar_result = self.radar_processor.process_frame(radar_data) if not radar_result['child_detected']: return [] camera_detections = self._process_camera(camera_frame) fused_detections = self._associate_and_fuse( radar_result['children'], camera_detections ) return fused_detections def _process_camera(self, frame: np.ndarray) -> List[Detection]: """ 摄像头处理 使用深度学习模型检测儿童 """ return [] def _associate_and_fuse(self, radar_targets: List[dict], camera_detections: List[Detection]) -> List[Detection]: """ 数据关联与融合 """ fused = [] for radar_target in radar_targets: radar_pos = np.array([ radar_target['range'], 0, 0 ]) camera_pos = self._transform_radar_to_camera(radar_pos) image_point = self._project_to_image(camera_pos) associated_camera = self._find_associated_detection( image_point, camera_detections ) detection = Detection( position=tuple(radar_pos), confidence=radar_target['confidence'], class_label="child" if self._is_child(radar_target) else "adult", breathing_rate=radar_target.get('breathing_rate') ) if associated_camera: detection.bounding_box = associated_camera.bounding_box detection.confidence = (detection.confidence + associated_camera.confidence) / 2 if detection.class_label == "child": fused.append(detection) return fused def _transform_radar_to_camera(self, radar_pos: np.ndarray) -> np.ndarray: """坐标变换""" homogeneous = np.append(radar_pos, 1) camera_pos = self.radar_to_camera @ homogeneous return camera_pos[:3] def _project_to_image(self, point_3d: np.ndarray) -> Tuple[int, int]: """3D 点投影到图像平面""" point_2d = self.camera_intrinsics @ point_3d point_2d = point_2d[:2] / point_2d[2] return int(point_2d[0]), int(point_2d[1]) def _find_associated_detection(self, image_point: Tuple[int, int], detections: List[Detection]) -> Optional[Detection]: """查找关联的摄像头检测""" if not detections: return None min_distance = float('inf') associated = None for det in detections: if det.bounding_box: cx = det.bounding_box[0] + det.bounding_box[2] // 2 cy = det.bounding_box[1] + det.bounding_box[3] // 2 distance = np.sqrt((cx - image_point[0])**2 + (cy - image_point[1])**2) if distance < min_distance and distance < 100: min_distance = distance associated = det return associated def _is_child(self, radar_target: dict) -> bool: """判断是否为儿童""" breathing_rate = radar_target.get('breathing_rate') if breathing_rate is None: return False return self.breathing_rate_child_min <= breathing_rate <= self.breathing_rate_child_max
|