1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
| """ EEG疲劳特征分析 基于脑电频段能量变化 """
import numpy as np from scipy import signal from scipy.fft import fft, fftfreq from typing import Dict, Tuple
class EEGFatigueAnalyzer: """ EEG疲劳分析器 分析脑电信号频段特征: - Theta波增加:疲劳主要指标 - Alpha波增加:疲劳次级指标 - (Alpha + Theta) / Beta:疲劳指数 """ def __init__(self, sampling_rate: int = 256): """ 初始化分析器 Args: sampling_rate: EEG采样率 (Hz) """ self.fs = sampling_rate self.bands = { 'delta': (0.5, 4), 'theta': (4, 8), 'alpha': (8, 13), 'beta': (13, 30), 'gamma': (30, 100) } self.fatigue_thresholds = { 'theta_alpha_ratio': 1.5, 'fatigue_index': 2.0, } def analyze(self, eeg_signal: np.ndarray) -> Dict[str, float]: """ 分析EEG信号疲劳特征 Args: eeg_signal: EEG信号 (samples,) Returns: 特征字典 """ eeg_filtered = self._preprocess(eeg_signal) freqs, psd = self._compute_psd(eeg_filtered) band_powers = {} for band_name, (f_low, f_high) in self.bands.items(): mask = (freqs >= f_low) & (freqs <= f_high) band_powers[band_name] = np.trapz(psd[mask], freqs[mask]) features = { 'delta_power': band_powers['delta'], 'theta_power': band_powers['theta'], 'alpha_power': band_powers['alpha'], 'beta_power': band_powers['beta'], 'gamma_power': band_powers['gamma'], 'theta_ratio': band_powers['theta'] / sum(band_powers.values()), 'alpha_ratio': band_powers['alpha'] / sum(band_powers.values()), 'theta_alpha_ratio': band_powers['theta'] / (band_powers['alpha'] + 1e-8), 'fatigue_index': (band_powers['theta'] + band_powers['alpha']) / (band_powers['beta'] + 1e-8), 'fatigue_score': self._calculate_fatigue_score(band_powers), } return features def _preprocess(self, eeg_signal: np.ndarray) -> np.ndarray: """EEG预处理""" sos = signal.butter(4, [0.5, 50], 'band', fs=self.fs, output='sos') eeg_filtered = signal.sosfilt(sos, eeg_signal) b, a = signal.iirnotch(50, 30, self.fs) eeg_filtered = signal.filtfilt(b, a, eeg_filtered) eeg_filtered = eeg_filtered - np.mean(eeg_filtered) return eeg_filtered def _compute_psd(self, eeg_signal: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """计算功率谱密度""" freqs, psd = signal.welch(eeg_signal, fs=self.fs, nperseg=1024) return freqs, psd def _calculate_fatigue_score(self, band_powers: Dict[str, float]) -> float: """ 计算综合疲劳评分 基于多个指标的加权组合 """ total_power = sum(band_powers.values()) + 1e-8 theta_ratio = band_powers['theta'] / total_power alpha_ratio = band_powers['alpha'] / total_power fatigue_index = (band_powers['theta'] + band_powers['alpha']) / (band_powers['beta'] + 1e-8) theta_score = min(100, theta_ratio * 500) alpha_score = min(100, alpha_ratio * 400) index_score = min(100, fatigue_index * 20) fatigue_score = theta_score * 0.4 + alpha_score * 0.3 + index_score * 0.3 return fatigue_score def detect_fatigue(self, eeg_signal: np.ndarray) -> Tuple[bool, Dict]: """ 检测疲劳状态 Args: eeg_signal: EEG信号 Returns: (is_fatigued, info) """ features = self.analyze(eeg_signal) is_fatigued = ( features['fatigue_index'] > self.fatigue_thresholds['fatigue_index'] or features['theta_alpha_ratio'] > self.fatigue_thresholds['theta_alpha_ratio'] ) info = { 'is_fatigued': is_fatigued, 'fatigue_score': features['fatigue_score'], 'fatigue_index': features['fatigue_index'], 'theta_alpha_ratio': features['theta_alpha_ratio'], } return is_fatigued, info
if __name__ == "__main__": analyzer = EEGFatigueAnalyzer(sampling_rate=256) t = np.linspace(0, 5, 256 * 5) eeg_awake = ( np.random.randn(len(t)) * 10 + 20 * np.sin(2 * np.pi * 15 * t) + 10 * np.sin(2 * np.pi * 10 * t) ) features_awake = analyzer.analyze(eeg_awake) is_fatigued, info = analyzer.detect_fatigue(eeg_awake) print("=== 清醒状态 ===") print(f"疲劳指数: {features_awake['fatigue_index']:.2f}") print(f"疲劳评分: {features_awake['fatigue_score']:.1f}") print(f"是否疲劳: {is_fatigued}") eeg_fatigued = ( np.random.randn(len(t)) * 10 + 40 * np.sin(2 * np.pi * 6 * t) + 30 * np.sin(2 * np.pi * 10 * t) + 5 * np.sin(2 * np.pi * 15 * t) ) features_fatigued = analyzer.analyze(eeg_fatigued) is_fatigued, info = analyzer.detect_fatigue(eeg_fatigued) print("\n=== 疲劳状态 ===") print(f"疲劳指数: {features_fatigued['fatigue_index']:.2f}") print(f"疲劳评分: {features_fatigued['fatigue_score']:.1f}") print(f"是否疲劳: {is_fatigued}")
|