FPGA实时ADAS方案:疲劳检测与自主停车


核心贡献

首个FPGA加速的疲劳检测+自主停车一体化系统

创新:结合生物特征监测、深度学习、硬件加速,实现比现有ADAS更快更可靠的干预

三大模块

  1. 疲劳检测:面部+眼部特征实时监测
  2. 分级响应:声音警告→减速→自主停车
  3. 硬件加速:FPGA实现低延迟实时处理

系统架构

graph TB
    subgraph 输入
        A[摄像头视频流] --> B[图像预处理]
    end
    
    subgraph 疲劳检测
        B --> C[面部检测]
        C --> D[眼部定位]
        D --> E[疲劳特征提取]
        E --> F[疲劳判定]
    end
    
    subgraph 分级响应
        F --> G{疲劳等级}
        G -->|轻微| H[声音警告]
        G -->|中度| I[减速警告]
        G -->|重度| J[自主停车]
    end
    
    subgraph 硬件实现
        K[FPGA] --> L[OpenCV加速]
        K --> M[CNN车道检测]
        K --> N[车辆控制]
    end

疲劳检测算法

1. 面部特征提取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import cv2
import numpy as np
from typing import Dict, Tuple, Optional

class FaceDetector:
"""
面部检测器

使用OpenCV进行实时面部检测

Euro NCAP要求:
- 实时检测(≥15fps)
- 遮挡容忍(眼镜/口罩)
- 光照鲁棒性
"""

def __init__(self,
cascade_path: str = None,
detection_confidence: float = 0.5):
"""
初始化

Args:
cascade_path: Haar级联文件路径
detection_confidence: 检测置信度阈值
"""
# 加载预训练模型
self.face_cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
)
self.eye_cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + 'haarcascade_eye.xml'
)

self.detection_confidence = detection_confidence

# ROI定义
self.face_roi = None
self.left_eye_roi = None
self.right_eye_roi = None

def detect(self, frame: np.ndarray) -> Dict:
"""
检测面部和眼部

Args:
frame: 输入帧 (H, W, C)

Returns:
result: 检测结果
"""
result = {
'face_detected': False,
'face_bbox': None,
'eyes_detected': False,
'left_eye': None,
'right_eye': None,
'eye_openness': []
}

# 灰度转换
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

# 面部检测
faces = self.face_cascade.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(60, 60)
)

if len(faces) == 0:
return result

# 取最大面部
face = max(faces, key=lambda x: x[2] * x[3])
x, y, w, h = face

result['face_detected'] = True
result['face_bbox'] = (x, y, w, h)
self.face_roi = gray[y:y+h, x:x+w]

# 眼部检测
roi_gray = gray[y:y+h, x:x+w]
roi_color = frame[y:y+h, x:x+w]

eyes = self.eye_cascade.detectMultiScale(roi_gray)

if len(eyes) >= 2:
result['eyes_detected'] = True

# 按位置分左右眼
eyes = sorted(eyes, key=lambda e: e[0])

# 左眼
ex, ey, ew, eh = eyes[0]
result['left_eye'] = (x + ex, y + ey, ew, eh)
self.left_eye_roi = roi_gray[ey:ey+eh, ex:ex+ew]

# 计算眼睑开度
openness = self._calculate_eye_openness(self.left_eye_roi)
result['eye_openness'].append(('left', openness))

# 右眼
ex, ey, ew, eh = eyes[-1]
result['right_eye'] = (x + ex, y + ey, ew, eh)
self.right_eye_roi = roi_gray[ey:ey+eh, ex:ex+ew]

openness = self._calculate_eye_openness(self.right_eye_roi)
result['eye_openness'].append(('right', openness))

return result

def _calculate_eye_openness(self, eye_roi: np.ndarray) -> float:
"""
计算眼睑开度

Args:
eye_roi: 眼部区域灰度图

Returns:
openness: 开度值 (0-1)
"""
if eye_roi is None or eye_roi.size == 0:
return 0.0

# 二值化
_, binary = cv2.threshold(eye_roi, 50, 255, cv2.THRESH_BINARY)

# 计算非零像素比例
white_pixels = np.sum(binary == 255)
total_pixels = binary.size

openness = white_pixels / total_pixels if total_pixels > 0 else 0.0

return openness


# 测试
if __name__ == "__main__":
detector = FaceDetector()

# 模拟帧
frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
result = detector.detect(frame)

print(f"面部检测: {result['face_detected']}")
print(f"眼部检测: {result['eyes_detected']}")
if result['eye_openness']:
for eye, openness in result['eye_openness']:
print(f" {eye}眼开度: {openness:.2f}")

2. 疲劳特征融合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from collections import deque
import time

class DrowsinessFeatureFusion:
"""
疲劳特征融合

融合多种疲劳特征:
- PERCLOS:眼睑闭合时间占比
- 眨眼频率:每分钟眨眼次数
- 点头频率:头部下垂频率
- 打哈欠:嘴部张开时间
"""

def __init__(self,
window_seconds: int = 60,
fps: int = 30):
"""
初始化

Args:
window_seconds: 特征计算窗口(秒)
fps: 视频帧率
"""
self.window_size = window_seconds * fps
self.fps = fps

# 特征历史
self.eye_openness_history = deque(maxlen=self.window_size)
self.blink_timestamps = deque(maxlen=100)
self.nod_timestamps = deque(maxlen=50)
self.yawn_timestamps = deque(maxlen=20)

# 状态变量
self.last_blink_time = 0
self.last_nod_time = 0
self.last_yawn_time = 0

# 阈值
self.eye_close_threshold = 0.3
self.blink_min_interval = 0.15 # 秒
self.nod_threshold = 15 # 度
self.yawn_threshold = 0.5 # 嘴部开度

def update(self,
eye_openness: float,
head_pitch: float,
mouth_openness: float,
timestamp: float) -> Dict:
"""
更新疲劳特征

Args:
eye_openness: 眼睑开度 (0-1)
head_pitch: 头部俯仰角 (度)
mouth_openness: 嘴部开度 (0-1)
timestamp: 当前时间戳

Returns:
features: 疲劳特征字典
"""
# 记录眼睑开度
self.eye_openness_history.append(eye_openness)

# 检测眨眼
if eye_openness < self.eye_close_threshold:
if timestamp - self.last_blink_time > self.blink_min_interval:
self.blink_timestamps.append(timestamp)
self.last_blink_time = timestamp

# 检测点头
if head_pitch > self.nod_threshold:
if timestamp - self.last_nod_time > 1.0: # 1秒间隔
self.nod_timestamps.append(timestamp)
self.last_nod_time = timestamp

# 检测打哈欠
if mouth_openness > self.yawn_threshold:
if timestamp - self.last_yawn_time > 5.0: # 5秒间隔
self.yawn_timestamps.append(timestamp)
self.last_yawn_time = timestamp

# 计算特征
features = {
'perclos': self._calculate_perclos(),
'blink_rate': self._calculate_blink_rate(timestamp),
'nod_rate': self._calculate_nod_rate(timestamp),
'yawn_rate': self._calculate_yawn_rate(timestamp),
'fatigue_score': 0.0
}

# 计算疲劳评分
features['fatigue_score'] = self._calculate_fatigue_score(features)

return features

def _calculate_perclos(self) -> float:
"""计算PERCLOS"""
if len(self.eye_openness_history) < self.window_size // 2:
return 0.0

closed_count = sum(
1 for e in self.eye_openness_history
if e < self.eye_close_threshold
)

return closed_count / len(self.eye_openness_history)

def _calculate_blink_rate(self, current_time: float) -> float:
"""计算眨眼频率(次/分钟)"""
if len(self.blink_timestamps) < 2:
return 0.0

# 过滤1分钟内的眨眼
recent_blinks = [
t for t in self.blink_timestamps
if current_time - t < 60
]

return len(recent_blinks)

def _calculate_nod_rate(self, current_time: float) -> float:
"""计算点头频率(次/分钟)"""
recent_nods = [
t for t in self.nod_timestamps
if current_time - t < 60
]

return len(recent_nods)

def _calculate_yawn_rate(self, current_time: float) -> float:
"""计算打哈欠频率(次/分钟)"""
recent_yawns = [
t for t in self.yawn_timestamps
if current_time - t < 60
]

return len(recent_yawns)

def _calculate_fatigue_score(self, features: Dict) -> float:
"""
计算综合疲劳评分

综合PERCLOS、眨眼频率、点头频率、打哈欠
"""
score = 0.0

# PERCLOS贡献(权重0.4)
perclos = features['perclos']
if perclos > 0.4:
score += 0.4
elif perclos > 0.25:
score += 0.3
elif perclos > 0.15:
score += 0.2
elif perclos > 0.08:
score += 0.1

# 眨眼频率贡献(权重0.25)
blink_rate = features['blink_rate']
if blink_rate > 30 or blink_rate < 5:
score += 0.25
elif blink_rate > 25:
score += 0.15

# 点头频率贡献(权重0.2)
nod_rate = features['nod_rate']
if nod_rate > 5:
score += 0.2
elif nod_rate > 3:
score += 0.15

# 打哈欠贡献(权重0.15)
yawn_rate = features['yawn_rate']
if yawn_rate > 3:
score += 0.15
elif yawn_rate > 1:
score += 0.1

return score


# 测试
if __name__ == "__main__":
fusion = DrowsinessFeatureFusion()

print("疲劳特征融合测试:")
print("-" * 50)

# 模拟逐渐疲劳
for t in range(1800): # 60秒 @ 30fps
# 模拟眼睑开度(逐渐疲劳)
mean_openness = max(0.3, 0.9 - t / 6000)
eye_openness = np.random.normal(mean_openness, 0.1)
eye_openness = np.clip(eye_openness, 0, 1)

# 模拟头部姿态
head_pitch = np.random.normal(t / 100, 5)

# 模拟嘴部开度
mouth_openness = np.random.random() * 0.3

timestamp = t / 30

features = fusion.update(eye_openness, head_pitch, mouth_openness, timestamp)

if t % 900 == 0: # 每30秒打印
print(f"t={timestamp:.0f}s: PERCLOS={features['perclos']:.2%}, "
f"眨眼={features['blink_rate']:.0f}/min, "
f"疲劳评分={features['fatigue_score']:.2f}")

3. 分级响应系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
from enum import Enum
from typing import Optional

class DrowsinessLevel(Enum):
"""疲劳等级"""
AWAKE = 0
SLIGHTLY_DROWSY = 1
MODERATELY_DROWSY = 2
SEVERELY_DROWSY = 3
UNRESPONSIVE = 4

class ResponseSystem:
"""
分级响应系统

根据疲劳等级执行不同响应:
1. 轻微疲劳:声音警告
2. 中度疲劳:减速警告
3. 重度疲劳:自主停车
4. 无响应:紧急停车
"""

def __init__(self,
warning_cooldown: float = 5.0,
stop_threshold: float = 0.8):
"""
初始化

Args:
warning_cooldown: 警告冷却时间(秒)
stop_threshold: 紧急停车阈值
"""
self.warning_cooldown = warning_cooldown
self.stop_threshold = stop_threshold

# 状态变量
self.current_level = DrowsinessLevel.AWAKE
self.last_warning_time = 0
self.unresponsive_start = None
self.vehicle_speed = 0

# 车辆控制状态
self.brake_applied = False
self.steering_control = False
self.emergency_flashers = False

def update(self,
fatigue_score: float,
driver_responsive: bool,
vehicle_speed: float,
timestamp: float) -> Dict:
"""
更新响应状态

Args:
fatigue_score: 疲劳评分 (0-1)
driver_responsive: 驾驶员是否响应
vehicle_speed: 车速 (km/h)
timestamp: 时间戳

Returns:
response: 响应动作
"""
self.vehicle_speed = vehicle_speed

# 判断疲劳等级
if not driver_responsive:
new_level = DrowsinessLevel.UNRESPONSIVE
elif fatigue_score >= 0.8:
new_level = DrowsinessLevel.SEVERELY_DROWSY
elif fatigue_score >= 0.5:
new_level = DrowsinessLevel.MODERATELY_DROWSY
elif fatigue_score >= 0.3:
new_level = DrowsinessLevel.SLIGHTLY_DROWSY
else:
new_level = DrowsinessLevel.AWAKE

# 状态转换
response = self._handle_state_transition(
new_level, timestamp, driver_responsive
)

self.current_level = new_level

return response

def _handle_state_transition(self,
new_level: DrowsinessLevel,
timestamp: float,
driver_responsive: bool) -> Dict:
"""处理状态转换"""
response = {
'level': new_level,
'action': None,
'warning_type': None,
'vehicle_control': None
}

# 清醒状态
if new_level == DrowsinessLevel.AWAKE:
self._reset_controls()
response['action'] = 'NONE'
return response

# 轻微疲劳
if new_level == DrowsinessLevel.SLIGHTLY_DROWSY:
if timestamp - self.last_warning_time > self.warning_cooldown:
response['action'] = 'WARNING'
response['warning_type'] = 'AUDIBLE'
response['vehicle_control'] = None
self.last_warning_time = timestamp

# 中度疲劳
elif new_level == DrowsinessLevel.MODERATELY_DROWSY:
if timestamp - self.last_warning_time > self.warning_cooldown:
response['action'] = 'WARNING_AND_SLOW'
response['warning_type'] = 'AUDIBLE_VISUAL_HAPTIC'
response['vehicle_control'] = 'GRADUAL_DECELERATION'
self.last_warning_time = timestamp

# 重度疲劳
elif new_level == DrowsinessLevel.SEVERELY_DROWSY:
response['action'] = 'AUTONOMOUS_STOP'
response['warning_type'] = 'ALL'
response['vehicle_control'] = 'CONTROLLED_STOP'
self.brake_applied = True
self.emergency_flashers = True

# 无响应
elif new_level == DrowsinessLevel.UNRESPONSIVE:
if self.unresponsive_start is None:
self.unresponsive_start = timestamp

unresponsive_duration = timestamp - self.unresponsive_start

if unresponsive_duration > 10: # 10秒无响应
response['action'] = 'EMERGENCY_STOP'
response['warning_type'] = 'ALL'
response['vehicle_control'] = 'EMERGENCY_BRAKING'
self.brake_applied = True
self.steering_control = True
self.emergency_flashers = True
else:
response['action'] = 'ESCALATE_WARNING'
response['warning_type'] = 'ALL'
response['vehicle_control'] = 'GRADUAL_DECELERATION'

return response

def _reset_controls(self):
"""重置车辆控制"""
self.brake_applied = False
self.steering_control = False
self.emergency_flashers = False
self.unresponsive_start = None


# 完整测试
if __name__ == "__main__":
detector = FaceDetector()
fusion = DrowsinessFeatureFusion()
response_system = ResponseSystem()

print("完整系统测试:")
print("-" * 50)

# 模拟视频流
for t in range(3600): # 120秒 @ 30fps
# 模拟帧
frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
timestamp = t / 30

# 面部检测
result = detector.detect(frame)

# 疲劳特征
eye_openness = 0.5
head_pitch = 0
mouth_openness = 0.1

if result['eye_openness']:
eye_openness = np.mean([o for _, o in result['eye_openness']])

# 模拟疲劳进展
fatigue_progress = min(1.0, t / 3000)
eye_openness = max(0.2, eye_openness - fatigue_progress * 0.4)
head_pitch = fatigue_progress * 20

# 更新特征
features = fusion.update(eye_openness, head_pitch, mouth_openness, timestamp)

# 更新响应
driver_responsive = fatigue_progress < 0.9
vehicle_speed = 60 - fatigue_progress * 40

response = response_system.update(
features['fatigue_score'],
driver_responsive,
vehicle_speed,
timestamp
)

# 打印关键事件
if response['action'] and response['action'] != 'NONE':
if t % 300 == 0: # 每10秒打印
print(f"t={timestamp:.0f}s: Level={response['level'].name}, "
f"Action={response['action']}, "
f"Score={features['fatigue_score']:.2f}")

FPGA硬件实现

1. 架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# FPGA架构定义(高层次描述)
fpga_architecture = {
"处理单元": {
"ARM Cortex-A9": "主控制器,运行Linux",
"FPGA逻辑": "图像处理加速器",
"DSP切片": "CNN推理加速"
},

"内存架构": {
"DDR3": "1GB系统内存",
"BRAM": "512KB片上缓存",
"DMA": "视频流直接传输"
},

"接口": {
"摄像头": "MIPI CSI-2",
"车辆CAN": "CAN-FD接口",
"音频": "I2S输出"
},

"性能指标": {
"帧率": "30fps @ 720p",
"延迟": "<50ms端到端",
"功耗": "<5W"
}
}

2. HLS实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# Vivado HLS代码示例(伪代码)

def face_detection_hls(image_stream, output_stream):
"""
面部检测HLS实现

优化技术:
1. 流水线化
2. 循环展开
3. 内存分区
"""
#pragma HLS INTERFACE axis port=image_stream
#pragma HLS INTERFACE axis port=output_stream
#pragma HLS PIPELINE II=1

# 局部缓存
image_window = [[0 for _ in range(24)] for _ in range(24)]
#pragma HLS ARRAY_PARTITION variable=image_window complete dim=0

# 滑动窗口
for y in range(HEIGHT - 24):
for x in range(WIDTH - 24):
# 提取窗口
extract_window(image_stream, image_window, x, y)

# Haar特征计算
score = compute_haar_features(image_window)

# 非极大值抑制
if score > THRESHOLD:
output_stream.write((x, y, 24, 24, score))


def compute_haar_features(window):
"""
Haar特征计算

使用积分图加速
"""
#pragma HLS PIPELINE

# 积分图
integral = compute_integral_image(window)

# Haar特征
features = 0
for i in range(NUM_FEATURES):
#pragma HLS UNROLL factor=4

# 提取特征矩形
rect1 = get_feature_region(integral, i, 0)
rect2 = get_feature_region(integral, i, 1)

# 计算
features += (rect1 - rect2) * weights[i]

return features

3. 性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 性能优化清单
optimization_checklist = {
"图像处理": {
"灰度转换": "硬件加速,1周期/像素",
"缩放": "双线性插值,流水线化",
"积分图": "并行计算,2周期/像素"
},

"面部检测": {
"级联分类器": "早期退出优化",
"滑动窗口": "并行处理多尺度",
"非极大值抑制": "排序网络加速"
},

"眼部检测": {
"模板匹配": "相关系数计算加速",
"眼睑开度": "边缘检测+霍夫变换"
},

"内存优化": {
"缓存策略": "行缓存+窗口缓存",
"DMA传输": "双缓冲",
"内存访问": "突发传输优化"
}
}

实验结果

性能对比

指标 CPU实现 FPGA实现 加速比
帧率 8 fps 32 fps 4x
延迟 180 ms 45 ms 4x
功耗 15 W 4.5 W 3.3x
检测精度 92% 91% -

疲劳检测性能

疲劳等级 检测率 误报率 平均检测时间
轻微疲劳 85% 8% 45秒
中度疲劳 94% 5% 25秒
重度疲劳 98% 2% 10秒

IMS应用启示

1. 硬件选型

方案 芯片 性能 成本 适用场景
低成本 Zynq-7000 720p@30fps $低 入门车型
标准型 Zynq UltraScale+ 1080p@60fps $中 主流车型
高端型 Versal AI Core 4K@60fps $高 高端车型

2. 算法优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# IMS优化建议
ims_optimization = {
"面部检测": {
"替代方案": "BlazeFace / RetinaFace",
"精度提升": "+5%",
"速度提升": "2x"
},
"眼部检测": {
"替代方案": "MediaPipe Face Mesh",
"精度提升": "+8%",
"特性": "468关键点"
},
"疲劳判定": {
"替代方案": "Transformer时序模型",
"精度提升": "+10%",
"特性": "长期依赖建模"
}
}

3. Euro NCAP对接

Euro NCAP要求 本文支持 改进方向
疲劳检测 KSS≥7 ✅ 支持 添加KSS映射
分心检测 ⚠️ 部分 添加视线追踪
损伤检测 ❌ 不支持 添加行为建模
无响应检测 ✅ 支持 缩短检测时间
自主停车 ✅ 支持 优化停车轨迹

参考资料

  1. Almomany, A. et al. “Real-Time FPGA-Based ADAS Solution for Driver Drowsiness Detection and Autonomous Stopping.” Emerging Science Journal 2025.
  2. Xilinx. “Vivado Design Suite User Guide: High-Level Synthesis.” 2024.
  3. OpenCV Documentation: https://docs.opencv.org/

本文详细解读FPGA加速的疲劳检测系统,包含完整代码实现与硬件优化指导。


FPGA实时ADAS方案:疲劳检测与自主停车
https://dapalm.com/2026/06/20/2026-06-20-fpga-drowsiness-detection-autonomous-stop/
作者
Mars
发布于
2026年6月20日
许可协议