1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
| import time from typing import Dict, Optional from dataclasses import dataclass from enum import Enum
class DriverState(Enum): """驾驶员状态""" NORMAL = "normal" DROWSY = "drowsy" UNRESPONSIVE = "unresponsive" EMERGENCY = "emergency"
@dataclass class DriverInput: """驾驶员输入状态""" steering_angle: float steering_velocity: float brake_pedal: float accelerator_pedal: float timestamp: float
class UnresponsiveDriverDetector: """ 无响应驾驶员检测器 融合 DMS 视觉状态 + 驾驶员输入 + 车辆状态 """ def __init__(self): self.eyes_closed_threshold = 2.0 self.head_down_threshold = 3.0 self.no_input_threshold = 5.0 self.warning_response_threshold = 3.0 self.eyes_closed_start: Optional[float] = None self.head_down_start: Optional[float] = None self.no_input_start: Optional[float] = None self.warning_issued_at: Optional[float] = None self.current_state = DriverState.NORMAL def update_eye_state(self, is_closed: bool, timestamp: float): """更新眼部状态""" if is_closed: if self.eyes_closed_start is None: self.eyes_closed_start = timestamp else: self.eyes_closed_start = None def update_head_state(self, is_down: bool, timestamp: float): """更新头部状态""" if is_down: if self.head_down_start is None: self.head_down_start = timestamp else: self.head_down_start = None def update_input_state(self, input_data: DriverInput): """更新输入状态""" has_steering = abs(input_data.steering_velocity) > 5 has_brake = input_data.brake_pedal > 0.1 has_accelerator = input_data.accelerator_pedal > 0.1 has_input = has_steering or has_brake or has_accelerator if has_input: self.no_input_start = None else: if self.no_input_start is None: self.no_input_start = input_data.timestamp def detect(self, current_time: float) -> DriverState: """ 检测驾驶员状态 Args: current_time: 当前时间戳 Returns: state: 驾驶员状态 """ eyes_closed_duration = 0 if self.eyes_closed_start is not None: eyes_closed_duration = current_time - self.eyes_closed_start head_down_duration = 0 if self.head_down_start is not None: head_down_duration = current_time - self.head_down_start no_input_duration = 0 if self.no_input_start is not None: no_input_duration = current_time - self.no_input_start warning_no_response_duration = 0 if self.warning_issued_at is not None: warning_no_response_duration = current_time - self.warning_issued_at if eyes_closed_duration >= 1.0 or head_down_duration >= 2.0: self.current_state = DriverState.DROWSY if (eyes_closed_duration >= self.eyes_closed_threshold or head_down_duration >= self.head_down_threshold or no_input_duration >= self.no_input_threshold): self.current_state = DriverState.UNRESPONSIVE if self.warning_issued_at is None: self.issue_warning(current_time) if (self.current_state == DriverState.UNRESPONSIVE and warning_no_response_duration >= self.warning_response_threshold): self.current_state = DriverState.EMERGENCY return self.current_state def issue_warning(self, timestamp: float): """发出警告""" self.warning_issued_at = timestamp print(f"⚠️ 警告:驾驶员无响应!请立即接管车辆!") def reset(self): """重置状态""" self.eyes_closed_start = None self.head_down_start = None self.no_input_start = None self.warning_issued_at = None self.current_state = DriverState.NORMAL
class InterventionController: """ 干预控制器 根据驾驶员状态执行不同干预策略 """ def __init__(self): self.intervention_levels = { DriverState.NORMAL: "none", DriverState.DROWSY: "warning", DriverState.UNRESPONSIVE: "warning_intense", DriverState.EMERGENCY: "safe_stop" } def execute(self, state: DriverState, adas_interface): """ 执行干预 Args: state: 驾驶员状态 adas_interface: ADAS 接口 """ strategy = self.intervention_levels[state] if strategy == "none": pass elif strategy == "warning": adas_interface.play_warning_sound() adas_interface.flash_warning_light() elif strategy == "warning_intense": adas_interface.play_warning_sound(intensity="high") adas_interface.flash_warning_light(intensity="high") adas_interface.apply_short_brake() elif strategy == "safe_stop": adas_interface.activate_hazard_lights() adas_interface.enable_lane_keeping() adas_interface.gradual_brake_to_stop() adas_interface.call_emergency_services() print("🚨 紧急停车!已呼叫救援!")
class UnresponsiveDriverSystem: """ 无响应驾驶员干预系统 完整的 DMS + ADAS 融合系统 """ def __init__(self): self.detector = UnresponsiveDriverDetector() self.controller = InterventionController() self.adas_interface = ADASInterface() def process_frame( self, eye_state: Dict, head_state: Dict, driver_input: DriverInput, current_time: float ): """ 处理单帧数据 Args: eye_state: 眼部状态 {'is_closed': bool, 'perclos': float} head_state: 头部状态 {'is_down': bool, 'pitch': float} driver_input: 驾驶员输入 current_time: 当前时间 """ self.detector.update_eye_state(eye_state['is_closed'], current_time) self.detector.update_head_state(head_state['is_down'], current_time) self.detector.update_input_state(driver_input) state = self.detector.detect(current_time) self.controller.execute(state, self.adas_interface) return state
class ADASInterface: """ADAS 接口模拟""" def play_warning_sound(self, intensity="normal"): print(f"🔊 播放警告声音(强度:{intensity})") def flash_warning_light(self, intensity="normal"): print(f"💡 闪烁警告灯(强度:{intensity})") def apply_short_brake(self): print("🛑 短促刹车提醒") def activate_hazard_lights(self): print("⚠️ 激活危险警示灯") def enable_lane_keeping(self): print("🚗 启用车道保持") def gradual_brake_to_stop(self): print("🛑 逐渐减速停车") def call_emergency_services(self): print("📞 呼叫紧急服务(eCall)")
if __name__ == "__main__": system = UnresponsiveDriverSystem() current_time = 0.0 print("\n=== 阶段 1:正常驾驶 ===") for i in range(5): eye_state = {'is_closed': False, 'perclos': 0.1} head_state = {'is_down': False, 'pitch': 0} driver_input = DriverInput( steering_angle=0.1, steering_velocity=10, brake_pedal=0, accelerator_pedal=0.3, timestamp=current_time ) state = system.process_frame(eye_state, head_state, driver_input, current_time) print(f"时间 {current_time:.1f}s: 状态={state.value}") current_time += 1.0 print("\n=== 阶段 2:驾驶员睡着 ===") for i in range(8): eye_state = {'is_closed': True, 'perclos': 0.8} head_state = {'is_down': True, 'pitch': -20} driver_input = DriverInput( steering_angle=0, steering_velocity=0, brake_pedal=0, accelerator_pedal=0, timestamp=current_time ) state = system.process_frame(eye_state, head_state, driver_input, current_time) print(f"时间 {current_time:.1f}s: 状态={state.value}") current_time += 1.0
|