1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
| import numpy as np from typing import Dict, List, Tuple
class EyeFeatureExtractor: """眼部特征提取器""" def __init__(self): self.history_size = 60 self.eye_openness_history = [] self.pupil_response_history = [] self.blink_rate_history = [] def extract(self, frame_data: Dict) -> Dict: """ 提取眼部特征 Args: frame_data: 单帧数据 - eye_openness: 眼睑开度 (0-1) - pupil_diameter: 瞳孔直径 (mm) - blink_detected: 是否检测到眨眼 - gaze_direction: 注视方向 (x, y) Returns: features: 眼部特征字典 """ self.eye_openness_history.append(frame_data['eye_openness']) self.pupil_response_history.append(frame_data['pupil_diameter']) if len(self.eye_openness_history) > self.history_size: self.eye_openness_history.pop(0) self.pupil_response_history.pop(0) features = {} features['avg_eye_openness'] = np.mean(self.eye_openness_history) features['eye_openness_variance'] = np.var(self.eye_openness_history) if frame_data['blink_detected']: self.blink_rate_history.append(1) else: self.blink_rate_history.append(0) if len(self.blink_rate_history) > 300: self.blink_rate_history.pop(0) features['blink_rate'] = sum(self.blink_rate_history) / (len(self.blink_rate_history) / 30) if len(self.pupil_response_history) > 10: pupil_diff = np.diff(self.pupil_response_history) features['pupil_response_speed'] = np.mean(np.abs(pupil_diff)) else: features['pupil_response_speed'] = 0 gaze_history = [frame_data.get('gaze_history', [])] features['nystagmus_detected'] = self._detect_nystagmus(gaze_history) return features def _detect_nystagmus(self, gaze_history: List) -> bool: """ 检测眼震 眼震特征:眼球快速、不自主的来回运动 """ return False
class ImpairmentDetector: """ 损伤检测器 融合眼部特征 + 行为特征 + 车辆状态 """ def __init__(self): self.eye_extractor = EyeFeatureExtractor() self.eye_openness_low = 0.3 self.blink_rate_low = 0.1 self.pupil_response_low = 0.05 def detect( self, frame_data: Dict, steering_data: Dict, vehicle_data: Dict ) -> Dict: """ 检测损伤状态 Args: frame_data: 视觉帧数据 steering_data: 方向盘数据 vehicle_data: 车辆状态数据 Returns: result: { "is_impaired": bool, "impairment_type": str, # "alcohol", "drugs", "fatigue", "normal" "confidence": float, "key_indicators": List[str] } """ eye_features = self.eye_extractor.extract(frame_data) behavior_features = self._analyze_behavior(steering_data, vehicle_data) key_indicators = [] impairment_scores = { "alcohol": 0.0, "drugs": 0.0, "fatigue": 0.0 } if eye_features['avg_eye_openness'] < self.eye_openness_low: impairment_scores["alcohol"] += 0.3 impairment_scores["fatigue"] += 0.4 key_indicators.append("眼睑下垂") if eye_features['nystagmus_detected']: impairment_scores["alcohol"] += 0.4 key_indicators.append("眼震") if eye_features['pupil_response_speed'] < self.pupil_response_low: impairment_scores["alcohol"] += 0.2 impairment_scores["drugs"] += 0.3 key_indicators.append("瞳孔反应迟钝") if eye_features['blink_rate'] < self.blink_rate_low: impairment_scores["alcohol"] += 0.1 key_indicators.append("眨眼频率低") if behavior_features['steering_jitter'] > 0.5: impairment_scores["alcohol"] += 0.2 key_indicators.append("方向盘抖动") if behavior_features['lane_deviation'] > 0.3: impairment_scores["alcohol"] += 0.2 impairment_scores["fatigue"] += 0.3 key_indicators.append("车道偏移") max_type = max(impairment_scores, key=impairment_scores.get) max_score = impairment_scores[max_type] is_impaired = max_score > 0.5 return { "is_impaired": is_impaired, "impairment_type": max_type if is_impaired else "normal", "confidence": min(max_score, 1.0), "key_indicators": key_indicators, "scores": impairment_scores } def _analyze_behavior(self, steering_data: Dict, vehicle_data: Dict) -> Dict: """分析驾驶行为""" return { "steering_jitter": steering_data.get('jitter', 0), "lane_deviation": vehicle_data.get('lane_deviation', 0), "speed_variance": np.var(vehicle_data.get('speed_history', [0])) }
|