1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
| import numpy as np from enum import Enum from typing import Dict, Optional
class ESFState(Enum): """ESF 状态""" INACTIVE = "inactive" DECELERATING = "decelerating" STOPPING = "stopping" STOPPED = "stopped"
class EmergencyStopFunction: """ 紧急停车功能(ESF) Euro NCAP 2026 合规 """ def __init__(self): self.state = ESFState.INACTIVE self.deceleration_target = -3.0 self.max_decel = -4.0 self.min_decel = -2.0 self.target_stop_distance = 100.0 self.lane_keep_gain = 0.1 self.max_steering_rate = 10.0 def activate(self, vehicle_speed: float, lane_position: float) -> Dict: """ 激活 ESF Args: vehicle_speed: 当前车速 (m/s) lane_position: 车道位置 (-1 到 1) Returns: 控制指令 """ self.state = ESFState.DECELERATING return { 'deceleration': self._calculate_deceleration(vehicle_speed), 'steering_angle': self._calculate_steering(lane_position), 'hazard_lights': True, 'horn': True, 'ecall': True } def update(self, vehicle_speed: float, lane_position: float) -> Dict: """ 更新 ESF 控制 Returns: 控制指令 """ if self.state == ESFState.INACTIVE: return {'active': False} decel = self._calculate_deceleration(vehicle_speed) steering = self._calculate_steering(lane_position) if vehicle_speed < 0.5: self.state = ESFState.STOPPED return { 'deceleration': 0.0, 'steering_angle': 0.0, 'hazard_lights': True, 'parking_brake': True, 'ecall': True, 'stopped': True } return { 'deceleration': decel, 'steering_angle': steering, 'hazard_lights': True, 'horn': False } def _calculate_deceleration(self, vehicle_speed: float) -> float: """ 计算减速度指令 目标:在 target_stop_distance 内停车 """ if vehicle_speed < 0.5: return 0.0 required_decel = -vehicle_speed**2 / (2 * self.target_stop_distance) decel = np.clip(required_decel, self.max_decel, self.min_decel) return decel def _calculate_steering(self, lane_position: float) -> float: """ 计算转向角度 目标:保持车道中心 """ steering = -self.lane_keep_gain * lane_position steering = np.clip(steering, -self.max_steering_rate, self.max_steering_rate) return steering def deactivate(self): """取消 ESF""" self.state = ESFState.INACTIVE
class DMS_ADAS_Integration: """ DMS-ADAS 集成系统 """ def __init__(self): self.unresponsive_detector = UnresponsiveDriverDetector() self.esf = EmergencyStopFunction() def process(self, monitoring_data: DriverMonitoringData) -> Dict: """ 处理监控数据并返回控制指令 Returns: { 'driver_state': str, 'warning_level': int, 'esf_active': bool, 'control_commands': dict } """ result = { 'driver_state': 'normal', 'warning_level': 0, 'esf_active': False, 'control_commands': None } detection_result = self.unresponsive_detector.update(monitoring_data) result['driver_state'] = detection_result['state'] action = detection_result['action'] if action == 'INITIAL_WARNING': result['warning_level'] = 1 result['control_commands'] = { 'audio_alert': True, 'visual_alert': True, 'haptic_alert': False } elif action == 'ESCALATION_WARNING': result['warning_level'] = 2 result['control_commands'] = { 'audio_alert': True, 'visual_alert': True, 'haptic_alert': True, 'horn': True } elif action == 'ACTIVATE_ESF': result['esf_active'] = True result['control_commands'] = self.esf.activate( monitoring_data.vehicle_speed, monitoring_data.lane_position ) elif self.unresponsive_detector.current_state == DriverState.UNRESPONSIVE: result['esf_active'] = True result['control_commands'] = self.esf.update( monitoring_data.vehicle_speed, monitoring_data.lane_position ) return result
if __name__ == "__main__": system = DMS_ADAS_Integration() for i in range(30): data = DriverMonitoringData( timestamp=i, eyes_closed=True, eyes_closed_duration=i, head_drooping=True if i > 5 else False, last_steering_time=0, last_pedal_time=0, vehicle_speed=30.0 - i * 1.0, lane_position=0.1 ) result = system.process(data) print(f"[{i}s] State: {result['driver_state']}, " f"Warning: {result['warning_level']}, " f"ESF: {result['esf_active']}")
|