1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
| """ 功能损伤检测器
检测驾驶员的驾驶能力损伤,而非醉酒状态 """
import numpy as np
class FunctionalImpairmentDetector: """ 功能损伤检测器 通过行为指标判断驾驶能力损伤 """ def __init__(self): self.indicators = { 'eye_movement': { 'weight': 0.3, 'metrics': ['saccade_velocity', 'fixation_stability', 'pupil_response'] }, 'steering_behavior': { 'weight': 0.3, 'metrics': ['correction_frequency', 'jerk', 'micro_corrections'] }, 'lane_keeping': { 'weight': 0.2, 'metrics': ['deviation_std', 'correction_latency'] }, 'response_time': { 'weight': 0.2, 'metrics': ['brake_reaction', 'steering_reaction'] } } self.impairment_threshold = 0.7 def detect( self, eye_data: dict, steering_data: dict, lane_data: dict, response_data: dict ) -> dict: """ 检测功能损伤 Args: eye_data: 眼动数据 steering_data: 转向数据 lane_data: 车道保持数据 response_data: 响应时间数据 Returns: { 'is_impaired': bool, 'impairment_score': float, 'cause_hints': list, 'recommendation': str } """ eye_score = self._evaluate_eye_movement(eye_data) steering_score = self._evaluate_steering(steering_data) lane_score = self._evaluate_lane_keeping(lane_data) response_score = self._evaluate_response(response_data) total_score = ( eye_score * self.indicators['eye_movement']['weight'] + steering_score * self.indicators['steering_behavior']['weight'] + lane_score * self.indicators['lane_keeping']['weight'] + response_score * self.indicators['response_time']['weight'] ) is_impaired = total_score < self.impairment_threshold cause_hints = self._infer_cause(eye_score, steering_score, response_score) if is_impaired: if total_score < 0.3: recommendation = '严重损伤,建议停车休息' elif total_score < 0.5: recommendation = '明显损伤,需要警告' else: recommendation = '轻度损伤,持续监控' else: recommendation = '驾驶能力正常' return { 'is_impaired': is_impaired, 'impairment_score': 1 - total_score, 'cause_hints': cause_hints, 'recommendation': recommendation, 'breakdown': { 'eye_movement': eye_score, 'steering': steering_score, 'lane_keeping': lane_score, 'response': response_score } } def _evaluate_eye_movement(self, data: dict) -> float: """评估眼动指标""" score = 1.0 saccade_velocity = data.get('saccade_velocity', 100) if saccade_velocity < 50: score -= 0.3 elif saccade_velocity > 200: score -= 0.2 fixation_stability = data.get('fixation_stability', 1.0) score *= fixation_stability pupil_response = data.get('pupil_response', 1.0) if pupil_response < 0.3: score -= 0.3 return max(0, score) def _evaluate_steering(self, data: dict) -> float: """评估转向指标""" score = 1.0 micro_corrections = data.get('micro_corrections', 10) if micro_corrections > 30: score -= 0.3 elif micro_corrections < 5: score -= 0.4 jerk = data.get('jerk', 0.1) if jerk > 0.5: score -= 0.3 return max(0, score) def _evaluate_lane_keeping(self, data: dict) -> float: """评估车道保持""" deviation_std = data.get('deviation_std', 0.1) score = max(0, 1 - deviation_std * 5) return score def _evaluate_response(self, data: dict) -> float: """评估响应时间""" score = 1.0 brake_reaction = data.get('brake_reaction', 0.5) if brake_reaction > 1.5: score -= 0.4 elif brake_reaction > 1.0: score -= 0.2 return max(0, score) def _infer_cause( self, eye_score: float, steering_score: float, response_score: float ) -> list: """推断可能原因""" causes = [] if eye_score < 0.5: causes.append('可能:疲劳 / 药物 / 酒精') if steering_score < 0.5 and response_score < 0.5: causes.append('可能:疲劳 / 睡眠不足') if steering_score < 0.5 and response_score > 0.7: causes.append('可能:酒精 / 药物') return causes
|