1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
| import numpy as np from collections import deque from scipy import stats
class BlinkPatternAnalyzer: """眨眼模式分析器""" def __init__(self, fps=30, window_size=900, closure_threshold=0.2): """初始化 Args: fps: 帧率 window_size: 分析窗口大小 closure_threshold: 闭合阈值 """ self.fps = fps self.window_size = window_size self.closure_threshold = closure_threshold self.ear_history = deque(maxlen=window_size) self.blink_events = deque(maxlen=100) self.is_closed = False self.closure_start = None self.last_blink_time = 0 def update(self, ear, frame_idx): """更新EAR值并检测眨眼 Args: ear: 当前帧的眼睛纵横比(EAR) frame_idx: 当前帧索引 Returns: blink_detected: 是否检测到眨眼 """ self.ear_history.append(ear) blink_detected = False if ear < self.closure_threshold and not self.is_closed: self.is_closed = True self.closure_start = frame_idx elif ear >= self.closure_threshold and self.is_closed: self.is_closed = False duration = (frame_idx - self.closure_start) / self.fps * 1000 if 50 < duration < 1000: interval = (frame_idx - self.last_blink_time) / self.fps blink_event = { 'frame': frame_idx, 'duration': duration, 'interval': interval if self.last_blink_time > 0 else 0 } self.blink_events.append(blink_event) self.last_blink_time = frame_idx blink_detected = True return blink_detected def calculate_perclos(self): """计算PERCLOS值 Returns: perclos: PERCLOS百分比 """ if len(self.ear_history) == 0: return 0 closed_frames = sum(1 for ear in self.ear_history if ear < self.closure_threshold) perclos = closed_frames / len(self.ear_history) * 100 return perclos def extract_blink_features(self): """提取眨眼特征 Returns: features: 特征字典 """ if len(self.blink_events) < 3: return None durations = [e['duration'] for e in self.blink_events if e['duration'] > 0] intervals = [e['interval'] for e in self.blink_events if e['interval'] > 0] features = { 'blink_rate': len(self.blink_events) / (self.window_size / self.fps) * 60, 'avg_duration': np.mean(durations) if durations else 0, 'std_duration': np.std(durations) if len(durations) > 1 else 0, 'max_duration': max(durations) if durations else 0, 'long_blink_ratio': sum(1 for d in durations if d > 300) / len(durations) if durations else 0, 'avg_interval': np.mean(intervals) if intervals else 0, 'std_interval': np.std(intervals) if len(intervals) > 1 else 0, 'cv_interval': np.std(intervals) / np.mean(intervals) if intervals and np.mean(intervals) > 0 else 0, 'perclos': self.calculate_perclos() } return features def assess_fatigue(self): """综合疲劳评估 Returns: fatigue_score: 疲劳分数(0-100) indicators: 各指标评估 """ features = self.extract_blink_features() if features is None: return 0, {} indicators = {} score = 0 perclos_score = min(features['perclos'] / 25 * 30, 30) indicators['perclos'] = { 'value': features['perclos'], 'score': perclos_score, 'status': 'danger' if features['perclos'] >= 25 else 'warning' if features['perclos'] >= 15 else 'normal' } score += perclos_score duration_score = min(features['avg_duration'] / 400 * 25, 25) indicators['duration'] = { 'value': features['avg_duration'], 'score': duration_score, 'status': 'danger' if features['avg_duration'] > 350 else 'warning' if features['avg_duration'] > 250 else 'normal' } score += duration_score long_blink_score = features['long_blink_ratio'] * 20 indicators['long_blink'] = { 'value': features['long_blink_ratio'], 'score': long_blink_score, 'status': 'danger' if features['long_blink_ratio'] > 0.3 else 'warning' if features['long_blink_ratio'] > 0.15 else 'normal' } score += long_blink_score cv_score = min(features['cv_interval'] * 15, 15) indicators['variability'] = { 'value': features['cv_interval'], 'score': cv_score, 'status': 'warning' if features['cv_interval'] > 0.5 else 'normal' } score += cv_score freq_deviation = abs(features['blink_rate'] - 17) / 17 freq_score = min(freq_deviation * 10, 10) indicators['frequency'] = { 'value': features['blink_rate'], 'score': freq_score, 'status': 'warning' if freq_deviation > 0.5 else 'normal' } score += freq_score return score, indicators
def calculate_ear(eye_landmarks): """计算眼睛纵横比(Eye Aspect Ratio) Args: eye_landmarks: 6个眼部关键点 [(x1,y1), (x2,y2), ..., (x6,y6)] Returns: ear: 眼睛纵横比 """ v1 = np.linalg.norm(np.array(eye_landmarks[1]) - np.array(eye_landmarks[5])) v2 = np.linalg.norm(np.array(eye_landmarks[2]) - np.array(eye_landmarks[4])) h = np.linalg.norm(np.array(eye_landmarks[0]) - np.array(eye_landmarks[3])) ear = (v1 + v2) / (2 * h) if h > 0 else 0 return ear
if __name__ == "__main__": analyzer = BlinkPatternAnalyzer() for i in range(900): base_ear = 0.3 fatigue_factor = min(i / 900, 1.0) if i % 60 < 3 + int(fatigue_factor * 5): ear = 0.1 else: ear = base_ear + np.random.normal(0, 0.02) analyzer.update(ear, i) score, indicators = analyzer.assess_fatigue() print(f"疲劳分数: {score:.1f}/100") for name, ind in indicators.items(): print(f" {name}: {ind['value']:.2f} ({ind['status']})")
|