1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
| """ 口罩场景鲁棒检测
基于上半脸特征推断整体状态 """
class MaskRobustDetector: """ 口罩场景鲁棒检测器 仅依赖上半脸特征 """ def __init__(self): self.available_features = [ 'eye_openness', 'eye_blink_rate', 'gaze_direction', 'head_pose', 'eyebrow_position', 'forehead_wrinkles' ] self.missing_features = [ 'mouth_openness', 'yawn_detection', 'jaw_position' ] self.adjusted_weights = { 'eye_openness': 0.35, 'eye_blink_rate': 0.25, 'gaze_direction': 0.20, 'head_pose': 0.15, 'eyebrow_position': 0.05 } def detect(self, face_landmarks, mask_detected: bool): """ 检测驾驶员状态 Args: face_landmarks: 面部关键点 mask_detected: 是否检测到口罩 Returns: state: { 'fatigue_level': int, 'distraction': bool, 'confidence': float } """ if mask_detected: return self._detect_with_mask(face_landmarks) else: return self._detect_normal(face_landmarks) def _detect_with_mask(self, landmarks): """ 口罩场景检测 仅使用上半脸特征 """ upper_face_features = self._extract_upper_face_features(landmarks) fatigue_score = self._calculate_fatigue_with_mask(upper_face_features) distraction_score = self._calculate_distraction_with_mask(upper_face_features) return { 'fatigue_level': self._score_to_level(fatigue_score), 'distraction': distraction_score > 0.6, 'confidence': 0.85, 'mask_mode': True } def _extract_upper_face_features(self, landmarks): """ 提取上半脸特征 """ left_eye = landmarks['left_eye'] right_eye = landmarks['right_eye'] left_eyebrow = landmarks['left_eyebrow'] right_eyebrow = landmarks['right_eyebrow'] features = { 'eye_openness': self._calculate_eye_openness(left_eye, right_eye), 'eye_blink_rate': self._calculate_blink_rate(), 'gaze_direction': self._estimate_gaze(left_eye, right_eye), 'head_pose': landmarks.get('head_pose', (0, 0, 0)), 'eyebrow_position': self._calculate_eyebrow_position(left_eyebrow, right_eyebrow) } return features def _calculate_fatigue_with_mask(self, features): """ 口罩场景疲劳检测 使用替代指标: 1. PERCLOS(眼睑闭合百分比) 2. 眨眼频率 3. 注视稳定性 4. 头部姿态稳定性 """ score = 0.0 if features['eye_openness'] < 0.2: score += 0.35 if features['eye_blink_rate'] > 25: score += 0.25 gaze_stability = np.std(features['gaze_direction']) if gaze_stability > 0.5: score += 0.20 head_stability = np.std(features['head_pose']) if head_stability > 10: score += 0.15 return score def _calculate_distraction_with_mask(self, features): """ 口罩场景分心检测 """ score = 0.0 gaze_deviation = np.linalg.norm(features['gaze_direction']) if gaze_deviation > 30: score += 0.50 head_yaw = abs(features['head_pose'][0]) if head_yaw > 20: score += 0.30 return score def _detect_normal(self, landmarks): """正常检测""" pass def _calculate_eye_openness(self, left_eye, right_eye): """计算眼睑开度""" pass def _calculate_blink_rate(self): """计算眨眼频率""" pass def _estimate_gaze(self, left_eye, right_eye): """估计视线方向""" pass def _calculate_eyebrow_position(self, left_eyebrow, right_eyebrow): """计算眉毛位置""" pass def _score_to_level(self, score): """分数转等级""" if score < 0.3: return 0 elif score < 0.6: return 1 else: return 2
if __name__ == "__main__": detector = MaskRobustDetector() landmarks = { 'left_eye': [(x, y) for x, y in zip(range(6), range(6))], 'right_eye': [(x, y) for x, y in zip(range(6), range(6))], 'left_eyebrow': [(x, y) for x, y in zip(range(5), range(5))], 'right_eyebrow': [(x, y) for x, y in zip(range(5), range(5))], 'head_pose': (5, 3, 0) } result = detector.detect(landmarks, mask_detected=True) print(f"检测结果: {result}")
|