1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
| import numpy as np from typing import Tuple, List
class MultiWindowFeatureExtractor: """ 多时间窗口特征提取器 论文核心思想: - 短窗口(1-3秒):捕捉即时眼动异常 - 中窗口(5-10秒):分析眼动模式稳定性 - 长窗口(30-60秒):评估整体驾驶状态演变 """ def __init__( self, fps: int = 30, short_window: int = 3, medium_window: int = 10, long_window: int = 60 ): self.fps = fps self.windows = { 'short': short_window * fps, 'medium': medium_window * fps, 'long': long_window * fps } def extract_features( self, gaze_data: np.ndarray, blink_data: np.ndarray, pupil_data: np.ndarray ) -> dict: """ 提取多窗口特征 Args: gaze_data: 视线坐标序列,每个点 (x, y) blink_data: 眨眼状态序列,1=闭眼,0=睁眼 pupil_data: 瞳孔直径序列 Returns: dict: 各时间窗口的特征向量 """ features = {} for name, window_size in self.windows.items(): features[name] = self._compute_window_features( gaze_data, blink_data, pupil_data, window_size ) return features def _compute_window_features( self, gaze: np.ndarray, blink: np.ndarray, pupil: np.ndarray, window: int ) -> np.ndarray: """计算单个窗口的特征向量""" n_samples = len(gaze) - window features_list = [] for i in range(n_samples): gaze_win = gaze[i:i+window] blink_win = blink[i:i+window] pupil_win = pupil[i:i+window] gaze_entropy = self._compute_gaze_entropy(gaze_win) blink_rate = np.mean(blink_win) * self.fps pupil_var = np.std(pupil_win) saccade_amplitude = self._compute_saccade_amplitude(gaze_win) fixation_duration = self._compute_fixation_duration(gaze_win) features_list.append([ gaze_entropy, blink_rate, pupil_var, saccade_amplitude, fixation_duration ]) return np.array(features_list) def _compute_gaze_entropy(self, gaze: np.ndarray) -> float: """ 计算眼动熵值 论文核心发现:认知分心时,眼动熵值显著降低 (视线变得更加"规律化",缺乏正常扫描行为) """ grid_size = 10 x_bins = np.linspace(0, 1, grid_size + 1) y_bins = np.linspace(0, 1, grid_size + 1) hist, _, _ = np.histogram2d( gaze[:, 0], gaze[:, 1], bins=[x_bins, y_bins] ) prob = hist.flatten() / hist.sum() prob = prob[prob > 0] entropy = -np.sum(prob * np.log2(prob)) return entropy def _compute_saccade_amplitude(self, gaze: np.ndarray) -> float: """计算扫视幅度(视线跳变距离)""" diff = np.diff(gaze, axis=0) distances = np.sqrt(diff[:, 0]**2 + diff[:, 1]**2) saccade_threshold = 0.05 saccades = distances[distances > saccade_threshold] return np.mean(saccades) if len(saccades) > 0 else 0.0 def _compute_fixation_duration(self, gaze: np.ndarray) -> float: """计算平均注视持续时间""" diff = np.diff(gaze, axis=0) distances = np.sqrt(diff[:, 0]**2 + diff[:, 1]**2) fixation_threshold = 0.02 is_fixation = distances < fixation_threshold fixation_durations = [] current_duration = 1 for fix in is_fixation: if fix: current_duration += 1 else: if current_duration > 5: fixation_durations.append(current_duration / self.fps) current_duration = 1 return np.mean(fixation_durations) if fixation_durations else 0.0
if __name__ == "__main__": np.random.seed(42) n_frames = 1800 normal_gaze = np.random.rand(n_frames, 2) * 0.3 + 0.35 distracted_gaze = np.zeros((n_frames, 2)) distracted_gaze[:, 0] = 0.5 + 0.1 * np.sin(np.linspace(0, 10*np.pi, n_frames)) distracted_gaze[:, 1] = 0.5 + 0.05 * np.cos(np.linspace(0, 5*np.pi, n_frames)) normal_blink = (np.random.rand(n_frames) < 0.003).astype(float) distracted_blink = (np.random.rand(n_frames) < 0.008).astype(float) normal_pupil = np.random.normal(4.0, 0.3, n_frames) distracted_pupil = np.random.normal(4.5, 0.5, n_frames) extractor = MultiWindowFeatureExtractor(fps=30) normal_features = extractor.extract_features(normal_gaze, normal_blink, normal_pupil) distracted_features = extractor.extract_features(distracted_gaze, distracted_blink, distracted_pupil) print("=" * 60) print("认知分心检测特征对比") print("=" * 60) for window_name in ['short', 'medium', 'long']: normal_mean = np.mean(normal_features[window_name], axis=0) distracted_mean = np.mean(distracted_features[window_name], axis=0) print(f"\n{window_name.upper()}窗口特征对比:") print(f" 眼动熵值:正常={normal_mean[0]:.3f}, 分心={distracted_mean[0]:.3f}") print(f" 眨眼频率:正常={normal_mean[1]:.3f}Hz, 分心={distracted_mean[1]:.3f}Hz") print(f" 瞳孔变异:正常={normal_mean[2]:.3f}, 分心={distracted_mean[2]:.3f}") print(f" 扫视幅度:正常={normal_mean[3]:.4f}, 分心={distracted_mean[3]:.4f}") print(f" 注视时长:正常={normal_mean[4]:.3f}s, 分心={distracted_mean[4]:.3f}s")
|