疲劳检测PERCLOS算法深度解析:从理论到量产代码

疲劳检测PERCLOS算法深度解析:从理论到量产代码

PERCLOS原理

PERCLOS (Percentage of Eyelid Closure Over Time) 是疲劳检测领域最经典、最可靠的指标,被Euro NCAP正式采纳。


核心公式

1
2
3
4
5
6
7
PERCLOS = (闭眼时间 / 总时间) × 100%

Euro NCAP阈值:
- < 15%: 正常
- 15-30%: 轻度疲劳
- 30-40%: 中度疲劳(警告)
- > 40%: 重度疲劳(紧急警告)

完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import numpy as np
from dataclasses import dataclass
from typing import List, Optional
from collections import deque
import time

@dataclass
class FatigueState:
"""疲劳状态"""
perclos: float # PERCLOS值
fatigue_level: str # 疲劳等级
confidence: float # 置信度
eye_openness_mean: float # 平均眼睑开度
blink_count: int # 眨眼次数
blink_intervals: List[float] # 眨眼间隔

class PERCLOSDetector:
"""
PERCLOS疲劳检测器

符合Euro NCAP标准的实现:
- 滑动窗口:60秒
- 采样率:≥25fps
- 闭眼阈值:眼睑开度 < 20%
"""

def __init__(
self,
fps: int = 30,
window_seconds: int = 60,
closed_threshold: float = 0.2,
perclos_levels: dict = None
):
self.fps = fps
self.window_frames = window_seconds * fps
self.closed_threshold = closed_threshold

# PERCLOS阈值(可自定义)
self.perclos_levels = perclos_levels or {
'normal': (0, 15),
'mild': (15, 30),
'moderate': (30, 40),
'severe': (40, 100)
}

# 历史缓冲区(环形)
self.eye_openness_buffer = deque(maxlen=self.window_frames)
self.timestamp_buffer = deque(maxlen=self.window_frames)

# 眨眼检测状态
self.last_blink_time = None
self.blink_intervals = deque(maxlen=100)
self.in_blink = False

def update(self, eye_openness: float, timestamp: float = None) -> FatigueState:
"""
更新检测器状态

Args:
eye_openness: 眼睑开度 [0, 1],1=完全睁开,0=完全闭合
timestamp: 时间戳(秒),默认当前时间

Returns:
FatigueState: 疲劳状态
"""
if timestamp is None:
timestamp = time.time()

# 更新缓冲区
self.eye_openness_buffer.append(eye_openness)
self.timestamp_buffer.append(timestamp)

# 检测眨眼
self._detect_blink(eye_openness, timestamp)

# 计算PERCLOS
perclos = self._compute_perclos()

# 判断疲劳等级
fatigue_level = self._classify_fatigue(perclos)

# 计算置信度
confidence = self._compute_confidence()

return FatigueState(
perclos=perclos,
fatigue_level=fatigue_level,
confidence=confidence,
eye_openness_mean=np.mean(self.eye_openness_buffer),
blink_count=len(self.blink_intervals),
blink_intervals=list(self.blink_intervals)
)

def _detect_blink(self, eye_openness: float, timestamp: float):
"""检测眨眼事件"""
is_closed = eye_openness < self.closed_threshold

if is_closed and not self.in_blink:
# 开始眨眼
self.in_blink = True
self.blink_start = timestamp

elif not is_closed and self.in_blink:
# 眨眼结束
self.in_blink = False
blink_duration = timestamp - self.blink_start

# 有效眨眼:100-400ms
if 0.1 < blink_duration < 0.4:
if self.last_blink_time is not None:
interval = timestamp - self.last_blink_time
if 0.5 < interval < 10: # 有效间隔
self.blink_intervals.append(interval)

self.last_blink_time = timestamp

def _compute_perclos(self) -> float:
"""计算PERCLOS值"""
if len(self.eye_openness_buffer) < self.fps * 10: # 至少10秒数据
return 0.0

# 统计闭眼帧数
closed_frames = sum(
1 for e in self.eye_openness_buffer
if e < self.closed_threshold
)

# 计算百分比
perclos = (closed_frames / len(self.eye_openness_buffer)) * 100

return perclos

def _classify_fatigue(self, perclos: float) -> str:
"""分类疲劳等级"""
for level, (low, high) in self.perclos_levels.items():
if low <= perclos < high:
return level
return 'severe'

def _compute_confidence(self) -> float:
"""计算检测置信度"""
# 数据量充足度
data_ratio = len(self.eye_openness_buffer) / self.window_frames

# 眨眼检测可靠性
blink_reliability = min(len(self.blink_intervals) / 10, 1.0)

# 综合置信度
confidence = 0.7 * data_ratio + 0.3 * blink_reliability

return min(confidence, 1.0)


class AdvancedFatigueDetector(PERCLOSDetector):
"""
增强版疲劳检测器

额外特征:
1. 眨眼频率变化
2. 眨眼间隔变异性
3. 眼睑开度标准差
4. 微睡眠检测
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)

# 微睡眠检测
self.microsleep_threshold = 0.5 # 500ms
self.microsleep_count = 0
self.last_microsleep_time = None

def update(self, eye_openness: float, timestamp: float = None) -> FatigueState:
"""增强版更新"""
# 基础PERCLOS检测
state = super().update(eye_openness, timestamp)

# 额外分析
additional_metrics = self._analyze_additional_metrics()

# 综合判断
state = self._integrate_metrics(state, additional_metrics)

return state

def _analyze_additional_metrics(self) -> dict:
"""分析额外疲劳指标"""
if len(self.eye_openness_buffer) < self.fps * 30:
return {}

data = np.array(self.eye_openness_buffer)

# 1. 眨眼频率变化(前30秒 vs 后30秒)
window1 = data[:len(data)//2]
window2 = data[len(data)//2:]

blinks1 = np.sum(window1 < self.closed_threshold)
blinks2 = np.sum(window2 < self.closed_threshold)

blink_freq_change = (blinks2 - blinks1) / max(blinks1, 1)

# 2. 眨眼间隔变异性
if len(self.blink_intervals) > 5:
interval_var = np.std(list(self.blink_intervals))
else:
interval_var = 0

# 3. 眼睑开度变异性
openness_std = np.std(data)

# 4. 长闭眼检测(微睡眠)
microsleep_events = self._detect_microsleeps(data)

return {
'blink_freq_change': blink_freq_change,
'interval_variability': interval_var,
'openness_std': openness_std,
'microsleep_count': len(microsleep_events)
}

def _detect_microsleeps(self, data: np.ndarray) -> List[dict]:
"""检测微睡眠事件"""
microsleeps = []

# 找连续闭眼段
closed = data < self.closed_threshold

in_microsleep = False
start_idx = 0

for i, is_closed in enumerate(closed):
if is_closed and not in_microsleep:
in_microsleep = True
start_idx = i
elif not is_closed and in_microsleep:
in_microsleep = False
duration = (i - start_idx) / self.fps

# 微睡眠:500ms - 3s
if self.microsleep_threshold < duration < 3.0:
microsleeps.append({
'start_frame': start_idx,
'duration': duration
})
self.microsleep_count += 1

return microsleeps

def _integrate_metrics(self, state: FatigueState, metrics: dict) -> FatigueState:
"""综合多指标判断"""
if not metrics:
return state

# 疲劳加权分数
fatigue_score = state.perclos / 100 # 基础PERCLOS

# 眨眼频率变化权重
if abs(metrics['blink_freq_change']) > 0.3:
fatigue_score += 0.1 * (1 if metrics['blink_freq_change'] > 0 else -0.5)

# 间隔变异性权重
if metrics['interval_variability'] > 2.0:
fatigue_score += 0.1

# 微睡眠权重(严重指标)
if metrics['microsleep_count'] > 0:
fatigue_score += 0.2 * metrics['microsleep_count']

# 重新分类
fatigue_score = min(fatigue_score, 1.0)

if fatigue_score < 0.15:
state.fatigue_level = 'normal'
elif fatigue_score < 0.30:
state.fatigue_level = 'mild'
elif fatigue_score < 0.40:
state.fatigue_level = 'moderate'
else:
state.fatigue_level = 'severe'

return state


# 测试代码
if __name__ == "__main__":
detector = AdvancedFatigueDetector(fps=30, window_seconds=60)

print("=" * 60)
print("疲劳检测测试")
print("=" * 60)

# 模拟正常驾驶
print("\n场景1: 正常驾驶")
np.random.seed(42)
for i in range(1800): # 60秒
# 正常眼睑开度
eye_openness = 0.8 + 0.1 * np.random.randn()
eye_openness = np.clip(eye_openness, 0, 1)

# 偶尔眨眼
if i % 180 < 6: # 每6秒眨眼
eye_openness = 0.1

state = detector.update(eye_openness)

if i % 900 == 899: # 每30秒输出
print(f" {i//30}s: PERCLOS={state.perclos:.1f}%, "
f"等级={state.fatigue_level}, "
f"眨眼={state.blink_count}次")

# 模拟疲劳驾驶
print("\n场景2: 疲劳驾驶")
detector = AdvancedFatigueDetector(fps=30, window_seconds=60)

for i in range(1800):
# 疲劳:眼睑开度下降,眨眼增多
base_openness = 0.6 + 0.1 * np.sin(i * 0.01) # 波动
eye_openness = base_openness + 0.05 * np.random.randn()
eye_openness = np.clip(eye_openness, 0, 1)

# 频繁眨眼
if i % 60 < 8: # 每2秒眨眼
eye_openness = 0.1

# 模拟微睡眠
if 900 < i < 920: # 30-31秒:微睡眠
eye_openness = 0.05

state = detector.update(eye_openness)

if i % 900 == 899:
print(f" {i//30}s: PERCLOS={state.perclos:.1f}%, "
f"等级={state.fatigue_level}, "
f"眨眼={state.blink_count}次, "
f"微睡眠={detector.microsleep_count}次")

Euro NCAP测试场景

疲劳检测测试用例

场景编号 描述 触发条件 警告等级
FT-01 PERCLOS警告 PERCLOS ≥ 30% 二级警告
FT-02 微睡眠检测 闭眼 > 500ms 一级警告
FT-03 延迟警告 PERCLOS持续5秒 警告
FT-04 频繁眨眼 眨眼率 > 30次/分 一级警告
FT-05 眼睑下垂 平均开度 < 50% 提示

生产级优化

嵌入式优化代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// C语言嵌入式实现
#include <stdint.h>
#include <stdbool.h>

#define FPS 30
#define WINDOW_FRAMES (60 * FPS)
#define CLOSED_THRESHOLD 0.2f

typedef struct {
float eye_openness_buffer[WINDOW_FRAMES];
uint16_t write_idx;
uint16_t count;

float perclos;
uint8_t fatigue_level; // 0=normal, 1=mild, 2=moderate, 3=severe
uint16_t blink_count;
} PERCLOS_Detector;

void perclos_init(PERCLOS_Detector* det) {
det->write_idx = 0;
det->count = 0;
det->perclos = 0.0f;
det->fatigue_level = 0;
det->blink_count = 0;
}

void perclos_update(PERCLOS_Detector* det, float eye_openness) {
// 写入环形缓冲区
det->eye_openness_buffer[det->write_idx] = eye_openness;
det->write_idx = (det->write_idx + 1) % WINDOW_FRAMES;

if (det->count < WINDOW_FRAMES) {
det->count++;
}

// 计算PERCLOS(每秒更新一次)
static uint16_t frame_counter = 0;
frame_counter++;

if (frame_counter >= FPS) {
frame_counter = 0;
perclos_compute(det);
}
}

void perclos_compute(PERCLOS_Detector* det) {
if (det->count < 10 * FPS) {
return; // 数据不足
}

// 统计闭眼帧数
uint16_t closed_frames = 0;

for (uint16_t i = 0; i < det->count; i++) {
if (det->eye_openness_buffer[i] < CLOSED_THRESHOLD) {
closed_frames++;
}
}

// 计算百分比
det->perclos = (float)closed_frames / det->count * 100.0f;

// 分类疲劳等级
if (det->perclos < 15.0f) {
det->fatigue_level = 0; // normal
} else if (det->perclos < 30.0f) {
det->fatigue_level = 1; // mild
} else if (det->perclos < 40.0f) {
det->fatigue_level = 2; // moderate
} else {
det->fatigue_level = 3; // severe
}
}

参考资源

  1. Euro NCAP DSM Protocol: https://www.euroncap.com/en/for-engineers/protocols/
  2. PERCLOS原始论文: Dinges & Grace, 1998
  3. NHTSA疲劳检测研究: https://www.nhtsa.gov/

本文详细解析PERCLOS疲劳检测算法,提供可量产代码。