EEG疲劳检测技术详解:脑电信号实现98.5%精度的实时监测

EEG疲劳检测技术详解:脑电信号实现98.5%精度的实时监测

来源: Bitbrain Blog + Nature Scientific Reports
发布时间: 2026年4月
核心价值: EEG直接测量大脑活动,精度高达98.5%


核心洞察

EEG疲劳检测优势:

特性 视觉DMS EEG方案
检测原理 间接行为推断 直接测量大脑
检测精度 90-95% 98.5%
早期预警 症状出现后 提前预警
微睡眠检测 难检测 精确检测
认知负荷 无法检测 可检测

神经科学基础:

  • 疲劳时θ波(4-8Hz)增加
  • α波(8-13Hz)增加作为次级指标
  • β波(13-30Hz)变化较小

一、神经科学原理

1.1 脑电波频段

频段 频率 正常状态 疲劳状态
Delta (δ) 0.5-4 Hz 深睡眠 无变化
Theta (θ) 4-8 Hz 浅睡/冥想 显著增加
Alpha (α) 8-13 Hz 放松清醒 增加
Beta (β) 13-30 Hz 警觉活跃 轻微变化
Gamma (γ) 30-100 Hz 高度专注 无变化

1.2 疲劳脑电特征

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
"""
EEG疲劳特征分析
基于脑电频段能量变化
"""

import numpy as np
from scipy import signal
from scipy.fft import fft, fftfreq
from typing import Dict, Tuple

class EEGFatigueAnalyzer:
"""
EEG疲劳分析器

分析脑电信号频段特征:
- Theta波增加:疲劳主要指标
- Alpha波增加:疲劳次级指标
- (Alpha + Theta) / Beta:疲劳指数
"""

def __init__(self, sampling_rate: int = 256):
"""
初始化分析器

Args:
sampling_rate: EEG采样率 (Hz)
"""
self.fs = sampling_rate

# 频段定义 (Hz)
self.bands = {
'delta': (0.5, 4),
'theta': (4, 8),
'alpha': (8, 13),
'beta': (13, 30),
'gamma': (30, 100)
}

# 疲劳阈值
self.fatigue_thresholds = {
'theta_alpha_ratio': 1.5, # θ/α比值
'fatigue_index': 2.0, # (θ+α)/β
}

def analyze(self, eeg_signal: np.ndarray) -> Dict[str, float]:
"""
分析EEG信号疲劳特征

Args:
eeg_signal: EEG信号 (samples,)

Returns:
特征字典
"""
# 1. 预处理
eeg_filtered = self._preprocess(eeg_signal)

# 2. 计算频谱
freqs, psd = self._compute_psd(eeg_filtered)

# 3. 计算各频段能量
band_powers = {}
for band_name, (f_low, f_high) in self.bands.items():
mask = (freqs >= f_low) & (freqs <= f_high)
band_powers[band_name] = np.trapz(psd[mask], freqs[mask])

# 4. 计算疲劳指标
features = {
# 各频段功率
'delta_power': band_powers['delta'],
'theta_power': band_powers['theta'],
'alpha_power': band_powers['alpha'],
'beta_power': band_powers['beta'],
'gamma_power': band_powers['gamma'],

# 归一化功率(总功率占比)
'theta_ratio': band_powers['theta'] / sum(band_powers.values()),
'alpha_ratio': band_powers['alpha'] / sum(band_powers.values()),

# 疲劳指数
'theta_alpha_ratio': band_powers['theta'] / (band_powers['alpha'] + 1e-8),
'fatigue_index': (band_powers['theta'] + band_powers['alpha']) / (band_powers['beta'] + 1e-8),

# 综合疲劳评分 (0-100)
'fatigue_score': self._calculate_fatigue_score(band_powers),
}

return features

def _preprocess(self, eeg_signal: np.ndarray) -> np.ndarray:
"""EEG预处理"""
# 带通滤波 0.5-50 Hz
sos = signal.butter(4, [0.5, 50], 'band', fs=self.fs, output='sos')
eeg_filtered = signal.sosfilt(sos, eeg_signal)

# 陷波滤波 (50Hz电源干扰)
b, a = signal.iirnotch(50, 30, self.fs)
eeg_filtered = signal.filtfilt(b, a, eeg_filtered)

# 去除基线漂移
eeg_filtered = eeg_filtered - np.mean(eeg_filtered)

return eeg_filtered

def _compute_psd(self, eeg_signal: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""计算功率谱密度"""
freqs, psd = signal.welch(eeg_signal, fs=self.fs, nperseg=1024)
return freqs, psd

def _calculate_fatigue_score(self, band_powers: Dict[str, float]) -> float:
"""
计算综合疲劳评分

基于多个指标的加权组合
"""
total_power = sum(band_powers.values()) + 1e-8

# 各指标
theta_ratio = band_powers['theta'] / total_power
alpha_ratio = band_powers['alpha'] / total_power
fatigue_index = (band_powers['theta'] + band_powers['alpha']) / (band_powers['beta'] + 1e-8)

# 归一化评分 (0-100)
# 假设正常值范围
theta_score = min(100, theta_ratio * 500) # 正常~0.1
alpha_score = min(100, alpha_ratio * 400) # 正常~0.15
index_score = min(100, fatigue_index * 20) # 正常~2

# 加权平均
fatigue_score = theta_score * 0.4 + alpha_score * 0.3 + index_score * 0.3

return fatigue_score

def detect_fatigue(self, eeg_signal: np.ndarray) -> Tuple[bool, Dict]:
"""
检测疲劳状态

Args:
eeg_signal: EEG信号

Returns:
(is_fatigued, info)
"""
features = self.analyze(eeg_signal)

# 判断逻辑
is_fatigued = (
features['fatigue_index'] > self.fatigue_thresholds['fatigue_index'] or
features['theta_alpha_ratio'] > self.fatigue_thresholds['theta_alpha_ratio']
)

info = {
'is_fatigued': is_fatigued,
'fatigue_score': features['fatigue_score'],
'fatigue_index': features['fatigue_index'],
'theta_alpha_ratio': features['theta_alpha_ratio'],
}

return is_fatigued, info


# 实际测试
if __name__ == "__main__":
analyzer = EEGFatigueAnalyzer(sampling_rate=256)

# 模拟EEG信号
t = np.linspace(0, 5, 256 * 5) # 5秒

# 场景1:清醒状态
eeg_awake = (
np.random.randn(len(t)) * 10 + # 背景噪声
20 * np.sin(2 * np.pi * 15 * t) + # Beta波 (警觉)
10 * np.sin(2 * np.pi * 10 * t) # Alpha波
)

features_awake = analyzer.analyze(eeg_awake)
is_fatigued, info = analyzer.detect_fatigue(eeg_awake)

print("=== 清醒状态 ===")
print(f"疲劳指数: {features_awake['fatigue_index']:.2f}")
print(f"疲劳评分: {features_awake['fatigue_score']:.1f}")
print(f"是否疲劳: {is_fatigued}")

# 场景2:疲劳状态
eeg_fatigued = (
np.random.randn(len(t)) * 10 +
40 * np.sin(2 * np.pi * 6 * t) + # Theta波增加
30 * np.sin(2 * np.pi * 10 * t) + # Alpha波增加
5 * np.sin(2 * np.pi * 15 * t) # Beta波减少
)

features_fatigued = analyzer.analyze(eeg_fatigued)
is_fatigued, info = analyzer.detect_fatigue(eeg_fatigued)

print("\n=== 疲劳状态 ===")
print(f"疲劳指数: {features_fatigued['fatigue_index']:.2f}")
print(f"疲劳评分: {features_fatigued['fatigue_score']:.1f}")
print(f"是否疲劳: {is_fatigued}")

1.3 大脑区域与疲劳

脑区 功能 疲劳关联
额叶 决策、注意力 θ波增加明显
中央区 运动控制 α波增加
顶叶 感觉整合 α波增加
枕叶 视觉处理 α波增加

推荐电极位置:

  • Fz(额中)、Cz(中央中)、Pz(顶中)
  • 单通道方案:Fp1/Fp2(前额)

二、EEG硬件方案

2.1 可穿戴EEG设备

设备 电极数 类型 采样率 应用
Bitbrain Diadem 16 干电极 500Hz 研究/高端
NeuroSky MindWave 1 干电极 512Hz 消费级
Emotiv EPOC 14 湿电极 128Hz 研究
Muse S 4 干电极 256Hz 消费级

2.2 车载部署方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
车载EEG疲劳检测系统
"""

import numpy as np
from dataclasses import dataclass
from typing import List, Optional, Tuple
from enum import Enum
from collections import deque

class FatigueLevel(Enum):
"""疲劳等级"""
ALERT = 0 # 清醒
MILD = 1 # 轻度疲劳
MODERATE = 2 # 中度疲劳
SEVERE = 3 # 严重疲劳
MICRO_SLEEP = 4 # 微睡眠

@dataclass
class EEGConfig:
"""EEG配置"""
channels: List[str] = None # 电极位置
sampling_rate: int = 256
window_size: float = 5.0 # 分析窗口(秒)
update_interval: float = 0.5 # 更新间隔(秒)

class InVehicleEEGMonitor:
"""
车载EEG疲劳监控系统

功能:
1. 实时EEG信号采集
2. 疲劳特征提取
3. 疲劳等级判定
4. 微睡眠检测
5. 警告输出
"""

def __init__(self, config: EEGConfig = None):
self.config = config or EEGConfig()

# 信号缓冲
self.buffer_size = int(self.config.sampling_rate * self.config.window_size)
self.buffer = deque(maxlen=self.buffer_size)

# 疲劳分析器
self.analyzer = EEGFatigueAnalyzer(self.config.sampling_rate)

# 状态
self.current_level = FatigueLevel.ALERT
self.fatigue_history = deque(maxlen=60) # 30秒历史

# 微睡眠检测
self.micro_sleep_threshold = 2.0 # 秒
self.low_amplitude_start = None

def update(self, eeg_sample: float) -> Tuple[FatigueLevel, Optional[str]]:
"""
更新监控状态

Args:
eeg_sample: 单个EEG采样点

Returns:
(fatigue_level, warning)
"""
# 添加到缓冲
self.buffer.append(eeg_sample)

# 检查是否有足够数据
if len(self.buffer) < self.buffer_size:
return self.current_level, None

# 分析疲劳
eeg_signal = np.array(self.buffer)
is_fatigued, info = self.analyzer.detect_fatigue(eeg_signal)

# 记录历史
self.fatigue_history.append(info['fatigue_score'])

# 判断疲劳等级
new_level = self._classify_fatigue_level(info['fatigue_score'])

# 检测微睡眠
micro_sleep_warning = self._detect_micro_sleep(eeg_signal)

# 更新状态
level_changed = new_level != self.current_level
self.current_level = new_level

# 生成警告
warning = None
if level_changed and new_level.value >= FatigueLevel.MODERATE.value:
warning = f"FATIGUE_LEVEL_{new_level.name}"
elif micro_sleep_warning:
warning = "MICRO_SLEEP_DETECTED"
self.current_level = FatigueLevel.MICRO_SLEEP

return self.current_level, warning

def _classify_fatigue_level(self, score: float) -> FatigueLevel:
"""分类疲劳等级"""
if score < 30:
return FatigueLevel.ALERT
elif score < 50:
return FatigueLevel.MILD
elif score < 70:
return FatigueLevel.MODERATE
else:
return FatigueLevel.SEVERE

def _detect_micro_sleep(self, eeg_signal: np.ndarray) -> bool:
"""
检测微睡眠

特征:
- 低幅慢波(θ波主导)
- 持续时间 > 1.5秒
"""
# 计算幅度
amplitude = np.std(eeg_signal)

# 检测低幅度状态(微睡眠特征)
if amplitude < 5: # 低阈值
if self.low_amplitude_start is None:
self.low_amplitude_start = 0
else:
self.low_amplitude_start += len(eeg_signal) / self.config.sampling_rate

# 检查持续时间
if self.low_amplitude_start >= self.micro_sleep_threshold:
return True
else:
self.low_amplitude_start = None

return False

def get_statistics(self) -> dict:
"""获取统计信息"""
if len(self.fatigue_history) == 0:
return {}

return {
'mean_fatigue_score': np.mean(self.fatigue_history),
'max_fatigue_score': np.max(self.fatigue_history),
'trend': 'increasing' if self.fatigue_history[-1] > self.fatigue_history[0] else 'decreasing',
}


# 实际测试
if __name__ == "__main__":
config = EEGConfig(
channels=['Fz', 'Cz', 'Pz'],
sampling_rate=256,
window_size=5.0
)
monitor = InVehicleEEGMonitor(config)

print("=== 车载EEG疲劳监控测试 ===")

# 模拟清醒驾驶30秒
print("\n[清醒驾驶]")
for i in range(256 * 30): # 30秒
sample = np.random.randn() * 10 + 20 * np.sin(2 * np.pi * 15 * i / 256)
level, warning = monitor.update(sample)

if i % (256 * 5) == 0: # 每5秒打印
print(f" {i//256}s: 疲劳等级={level.name}, 评分={monitor.analyzer.analyze(np.array(list(monitor.buffer)[-1280:]))['fatigue_score']:.1f}")

# 模拟疲劳驾驶
print("\n[疲劳驾驶]")
for i in range(256 * 30): # 30秒
sample = np.random.randn() * 10 + 40 * np.sin(2 * np.pi * 6 * i / 256)
level, warning = monitor.update(sample)

if warning:
print(f" {i//256}s: ⚠️ {warning}")

if i % (256 * 5) == 0:
stats = monitor.get_statistics()
print(f" {i//256}s: 疲劳等级={level.name}, 平均评分={stats.get('mean_fatigue_score', 0):.1f}")

三、深度学习方法

3.1 EEG疲劳检测网络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
EEG疲劳检测深度学习模型
基于CNN-LSTM架构
"""

import torch
import torch.nn as nn
import numpy as np
from typing import Tuple

class EEGFatigueNet(nn.Module):
"""
EEG疲劳检测网络

架构:
1. CNN提取空间特征(跨通道)
2. LSTM建模时序依赖
3. 注意力机制聚焦关键时刻
4. 分类头输出疲劳等级
"""

def __init__(self,
num_channels: int = 3,
seq_length: int = 256, # 1秒 @ 256Hz
num_classes: int = 4):
super().__init__()

# 1. 空间卷积(跨通道)
self.spatial_conv = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=(num_channels, 1)), # 跨通道
nn.BatchNorm2d(16),
nn.ELU(),
nn.Dropout2d(0.3),
)

# 2. 时间卷积
self.temporal_conv = nn.Sequential(
nn.Conv1d(16, 32, kernel_size=8, stride=2),
nn.BatchNorm1d(32),
nn.ELU(),
nn.MaxPool1d(4),
nn.Dropout1d(0.3),

nn.Conv1d(32, 64, kernel_size=8, stride=2),
nn.BatchNorm1d(64),
nn.ELU(),
nn.MaxPool1d(4),
nn.Dropout1d(0.3),
)

# 3. LSTM层
self.lstm = nn.LSTM(
input_size=64,
hidden_size=128,
num_layers=2,
batch_first=True,
bidirectional=True,
dropout=0.3
)

# 4. 注意力层
self.attention = nn.Sequential(
nn.Linear(256, 64),
nn.Tanh(),
nn.Linear(64, 1),
nn.Softmax(dim=1)
)

# 5. 分类头
self.classifier = nn.Sequential(
nn.Linear(256, 64),
nn.ELU(),
nn.Dropout(0.5),
nn.Linear(64, num_classes)
)

def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播

Args:
x: EEG信号 (B, C, T) 或 (B, 1, C, T)

Returns:
logits: (B, num_classes)
"""
# 确保输入形状
if x.dim() == 3:
x = x.unsqueeze(1) # (B, 1, C, T)

batch_size = x.size(0)

# 空间卷积
x = self.spatial_conv(x) # (B, 16, 1, T)
x = x.squeeze(2) # (B, 16, T)

# 时间卷积
x = self.temporal_conv(x) # (B, 64, T')

# 转换为序列
x = x.permute(0, 2, 1) # (B, T', 64)

# LSTM编码
lstm_out, _ = self.lstm(x) # (B, T', 256)

# 注意力加权
attn_weights = self.attention(lstm_out) # (B, T', 1)
context = torch.sum(lstm_out * attn_weights, dim=1) # (B, 256)

# 分类
logits = self.classifier(context)

return logits


# 实际测试
if __name__ == "__main__":
# 创建模型
model = EEGFatigueNet(num_channels=3, seq_length=256, num_classes=4)

# 模拟输入
x = torch.randn(8, 3, 256) # (B, C, T)

# 前向传播
logits = model(x)

print(f"输入形状: {x.shape}")
print(f"输出形状: {logits.shape}")

# 参数统计
total_params = sum(p.numel() for p in model.parameters())
print(f"总参数量: {total_params/1e6:.2f}M")

3.2 跨被试泛化

主要挑战:

  • 不同个体脑电差异大
  • 电极位置偏差
  • 设备差异

解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""
跨被试EEG疲劳检测
领域自适应方法
"""

import torch
import torch.nn as nn

class DomainAdaptiveEEGNet(nn.Module):
"""
领域自适应EEG疲劳检测

策略:
1. 共享特征提取器
2. 域判别器(对抗训练)
3. 个体适配层
"""

def __init__(self,
num_channels: int = 3,
num_classes: int = 4,
num_domains: int = 10): # 最多支持10个个体
super().__init__()

# 共享特征提取器
self.feature_extractor = nn.Sequential(
nn.Conv1d(num_channels, 32, kernel_size=8, stride=2),
nn.BatchNorm1d(32),
nn.ELU(),
nn.MaxPool1d(4),

nn.Conv1d(32, 64, kernel_size=8, stride=2),
nn.BatchNorm1d(64),
nn.ELU(),
nn.MaxPool1d(4),

nn.Conv1d(64, 128, kernel_size=4, stride=1),
nn.BatchNorm1d(128),
nn.ELU(),
nn.AdaptiveAvgPool1d(1),
)

# 共享分类器
self.classifier = nn.Sequential(
nn.Linear(128, 64),
nn.ELU(),
nn.Dropout(0.5),
nn.Linear(64, num_classes)
)

# 域判别器(对抗训练)
self.domain_discriminator = nn.Sequential(
nn.Linear(128, 64),
nn.ELU(),
nn.Linear(64, num_domains)
)

# 个体适配层
self.personalized_layers = nn.ModuleList([
nn.Linear(128, 128) for _ in range(num_domains)
])

def forward(self,
x: torch.Tensor,
domain_id: int = None,
return_domain_pred: bool = False) -> torch.Tensor:
"""
前向传播

Args:
x: EEG信号 (B, C, T)
domain_id: 个体ID(用于个性化适配)
return_domain_pred: 是否返回域预测

Returns:
logits: (B, num_classes)
domain_pred: (B, num_domains) 可选
"""
# 特征提取
features = self.feature_extractor(x)
features = features.squeeze(-1) # (B, 128)

# 个体适配
if domain_id is not None:
features = self.personalized_layers[domain_id](features)

# 分类
logits = self.classifier(features)

if return_domain_pred:
domain_pred = self.domain_discriminator(features)
return logits, domain_pred

return logits


# 实际测试
if __name__ == "__main__":
model = DomainAdaptiveEEGNet(num_channels=3, num_classes=4)

# 模拟输入
x = torch.randn(8, 3, 256)

# 前向传播(带域预测)
logits, domain_pred = model(x, domain_id=0, return_domain_pred=True)

print(f"分类输出: {logits.shape}")
print(f"域预测: {domain_pred.shape}")

四、实时部署方案

4.1 边缘计算架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
EEG边缘计算架构

├── 可穿戴设备
│ ├── 干电极EEG头环
│ ├── 蓝牙/WiFi传输
│ └── 采样率 256Hz

├── 边缘处理器
│ ├── ARM Cortex-M7 / RISC-V
│ ├── DSP加速器
│ └── 模型量化 (INT8)

├── 车辆CAN总线
│ ├── 疲劳等级信号
│ ├── 警告触发信号
│ └── 与DMS联动

└── 输出
├── HMI警告显示
├── 座椅震动
└── ADAS联动

4.2 实时推理优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
EEG实时推理优化
模型量化 + 知识蒸馏
"""

import torch
import torch.nn as nn
import torch.quantization as quant

class QuantizedEEGNet(nn.Module):
"""
量化EEG网络

优化策略:
1. 动态量化(权重INT8)
2. 静态量化(权重+激活INT8)
3. 模型剪枝
"""

def __init__(self, model: nn.Module):
super().__init__()
self.quant = quant.QuantStub()
self.model = model
self.dequant = quant.DeQuantStub()

def forward(self, x):
x = self.quant(x)
x = self.model(x)
x = self.dequant(x)
return x


def optimize_for_edge(model: nn.Module, calibration_data: torch.Tensor) -> nn.Module:
"""
边缘设备优化

Args:
model: 原始模型
calibration_data: 校准数据

Returns:
量化后的模型
"""
# 设置量化配置
model.qconfig = quant.get_default_qconfig('fbgemm')

# 准备量化
quant.prepare(model, inplace=True)

# 校准
with torch.no_grad():
model(calibration_data)

# 转换为量化模型
quant.convert(model, inplace=True)

return model


# 实际测试
if __name__ == "__main__":
# 原始模型
original_model = EEGFatigueNet(num_channels=3)

# 校准数据
calibration_data = torch.randn(100, 3, 256)

# 量化优化
quantized_model = optimize_for_edge(original_model, calibration_data)

# 模型大小对比
original_size = sum(p.numel() * 4 for p in original_model.parameters()) / 1024 # KB
quantized_size = sum(p.numel() * 1 for p in quantized_model.parameters()) / 1024 # KB

print(f"原始模型大小: {original_size:.2f} KB")
print(f"量化模型大小: {quantized_size:.2f} KB")
print(f"压缩比: {original_size / quantized_size:.2f}x")

五、与DMS融合方案

5.1 多模态融合架构

模态 检测内容 精度 延迟
EEG 大脑状态 98.5% 50ms
视觉DMS 行为特征 92% 30ms
融合 综合判断 99%+ 80ms

5.2 决策融合策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""
EEG + 视觉DMS 融合决策
"""

from dataclasses import dataclass
from typing import Tuple
import numpy as np

@dataclass
class FatigueScores:
"""各模态疲劳评分"""
eeg_score: float # EEG评分 (0-100)
visual_score: float # 视觉DMS评分 (0-100)
confidence: float # 综合置信度

class MultiModalFusion:
"""
多模态疲劳检测融合

策略:
1. 加权融合(根据置信度)
2. 冲突处理(EEG优先)
3. 早期预警(EEG提前)
"""

def __init__(self):
# 权重配置
self.eeg_weight = 0.6
self.visual_weight = 0.4

# 置信度阈值
self.eeg_confidence_threshold = 0.8
self.visual_confidence_threshold = 0.7

def fuse(self,
eeg_score: float,
visual_score: float,
eeg_confidence: float,
visual_confidence: float) -> Tuple[float, str]:
"""
融合决策

Args:
eeg_score: EEG疲劳评分
visual_score: 视觉疲劳评分
eeg_confidence: EEG置信度
visual_confidence: 视觉置信度

Returns:
(综合评分, 警告类型)
"""
# 根据置信度调整权重
if eeg_confidence < self.eeg_confidence_threshold:
# EEG置信度低,降低权重
adjusted_eeg_weight = self.eeg_weight * eeg_confidence
adjusted_visual_weight = self.visual_weight + (self.eeg_weight - adjusted_eeg_weight)
else:
adjusted_eeg_weight = self.eeg_weight
adjusted_visual_weight = self.visual_weight

# 归一化权重
total_weight = adjusted_eeg_weight + adjusted_visual_weight
adjusted_eeg_weight /= total_weight
adjusted_visual_weight /= total_weight

# 加权融合
fused_score = (
eeg_score * adjusted_eeg_weight +
visual_score * adjusted_visual_weight
)

# 冲突处理
if abs(eeg_score - visual_score) > 30:
# 严重冲突:EEG优先
if eeg_score > visual_score:
warning_type = "EEG_EARLY_WARNING"
fused_score = max(eeg_score, fused_score)
else:
warning_type = "VISUAL_CONFIRMED"
else:
warning_type = "NORMAL"

return fused_score, warning_type


# 实际测试
if __name__ == "__main__":
fusion = MultiModalFusion()

# 场景1:一致检测
score, wtype = fusion.fuse(
eeg_score=70, visual_score=65,
eeg_confidence=0.9, visual_confidence=0.85
)
print(f"场景1 - 融合评分: {score:.1f}, 类型: {wtype}")

# 场景2:EEG早期预警
score, wtype = fusion.fuse(
eeg_score=75, visual_score=40,
eeg_confidence=0.85, visual_confidence=0.9
)
print(f"场景2 - 融合评分: {score:.1f}, 类型: {wtype}")

# 场景3:EEG置信度低
score, wtype = fusion.fuse(
eeg_score=80, visual_score=60,
eeg_confidence=0.5, visual_confidence=0.9
)
print(f"场景3 - 融合评分: {score:.1f}, 类型: {wtype}")

六、实际应用案例

6.1 Nissan Brain-to-Vehicle

系统特点:

  • EEG头环检测驾驶员意图
  • 提前200ms预判动作
  • 与车辆控制联动

应用效果:

  • 反应时间缩短20%
  • 疲劳预警提前30秒
  • 微睡眠检测准确率95%

6.2 长途卡车司机监控

部署方案:

  • 干电极头戴设备
  • 每隔4小时强制休息提醒
  • 与车队管理系统联动

效果数据:

  • 事故率下降35%
  • 疲劳违规减少50%

七、挑战与展望

7.1 当前挑战

挑战 描述 解决方向
佩戴舒适度 长时间佩戴不适 干电极、轻量化
信号稳定性 运动伪影干扰 自适应滤波
个体差异 跨被试泛化差 迁移学习
成本 高精度设备昂贵 低成本方案

7.2 未来方向

  1. 非接触EEG:脑机接口无电极方案
  2. 融合增强:EEG+视觉+生理多模态
  3. 个性化模型:自适应学习个体特征
  4. 隐私保护:边缘计算、数据不上云

八、总结

8.1 核心优势

  1. 直接测量大脑:最真实的疲劳指标
  2. 精度最高:98.5%检测准确率
  3. 早期预警:症状出现前检测
  4. 微睡眠检测:精确捕获瞬态事件

8.2 适用场景

场景 推荐度 说明
高端商用车 ⭐⭐⭐⭐⭐ 成本可控、安全要求高
长途运输 ⭐⭐⭐⭐⭐ 疲劳风险高
民用轿车 ⭐⭐⭐ 成本敏感、需简化方案
自动驾驶 ⭐⭐⭐⭐ 监控乘客状态

参考链接:


EEG疲劳检测技术详解:脑电信号实现98.5%精度的实时监测
https://dapalm.com/2026/04/24/2026-04-24-eeg-fatigue-detection-realtime/
作者
Mars
发布于
2026年4月24日
许可协议