边缘端 DMS 实时部署:低成本硬件上的 17 类驾驶员行为识别

发布时间: 2026-04-14
关键词: Edge DMS、Raspberry Pi 5、Coral Edge-TPU、实时推理、量化部署


论文核心发现

2025 年 12 月发布的研究展示了在低成本边缘硬件上部署实时 DMS 的完整方案:

硬件 帧率 延迟 成本
Raspberry Pi 5 (CPU) ~16 FPS < 60 ms $80
Google Coral Edge-TPU ~25 FPS ~40 ms $150

支持 17 类驾驶员行为,涵盖分心、疲劳、交互等场景。


17 类行为分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
┌─────────────────────────────────────────────────────┐
│ 17 类驾驶员行为分类 │
├─────────────────────────────────────────────────────┤
│ │
│ 手机使用(6 类) │
│ ───────────────── │
│ • phone_talk_left 左手打电话 │
│ • phone_talk_right 右手打电话 │
│ • phone_text_left 左手发短信 │
│ • phone_text_right 右手发短信 │
│ │
│ 消耗类行为(3 类) │
│ ───────────────── │
│ • eating 吃东西 │
│ • drinking 喝水 │
│ • smoking 吸烟 │
│ │
│ 注意力转移(3 类) │
│ ───────────────── │
│ • look_left 向左看 │
│ • look_down 低头 │
│ • look_right 向右看/与乘客交谈 │
│ │
│ 舱内动作(2 类) │
│ ───────────────── │
│ • reaching_behind 向后取物 │
│ • grooming 整理仪容 │
│ │
│ 疲劳相关(2 类) │
│ ───────────────── │
│ • yawning 打哈欠 │
│ • eyes_closed_sleep 闭眼/睡眠 │
│ │
│ 控制面板交互(1 类) │
│ ───────────────── │
│ • control_panel 操作中控 │
│ │
│ 正常驾驶 │
│ ───────────────── │
│ • normal 正常驾驶 │
│ │
└─────────────────────────────────────────────────────┘

系统架构

端到端流水线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
┌─────────────────────────────────────────────────────┐
│ DMS 边缘部署流水线 │
├─────────────────────────────────────────────────────┤
│ │
│ 摄像头采集 │
│ ────────── │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 单帧推理 │ MobileNetV3 + 分类头 │
│ │ pt(c) 概率分布 │ INT8 量化 │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 混淆感知后处理 │ 减少视觉相似误报 │
│ │ 类别映射 │ (如 grooming vs phone) │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 时序决策头 │ 置信度阈值 + 持续性门控 │
│ │ 事件级警告 │ 消除帧级抖动 │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ 输出 │ 实时叠加 + 警告事件 │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘

关键设计决策

设计点 方案 原因
单摄像头 前向 RGB 摄像头 成本低、安装简单
模型选择 MobileNetV3 轻量化、边缘友好
量化方式 INT8 减少延迟和内存
时序决策 持续性门控 消除帧级抖动
混淆处理 显式混淆类别 减少视觉相似误报

时序决策头详解

为什么需要时序决策?

帧级分类存在两个问题:

  1. 抖动:连续帧之间预测不一致
  2. 瞬态误报:短暂动作(如看后视镜)被误判为分心

解决方案:持续性门控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import Optional, List, Tuple

@dataclass
class Alert:
"""事件级警告"""
behavior: str
start_frame: int
end_frame: int
confidence: float

class TemporalDecisionHead:
"""时序决策头:将帧级预测转换为事件级警告"""

def __init__(self,
confidence_threshold: float = 0.7,
persistence_frames: int = 15,
cooldown_frames: int = 30):
"""
Args:
confidence_threshold: 置信度阈值
persistence_frames: 持续帧数要求(15帧 @ 15fps = 1秒)
cooldown_frames: 冷却帧数(防止重复警告)
"""
self.confidence_threshold = confidence_threshold
self.persistence_frames = persistence_frames
self.cooldown_frames = cooldown_frames

# 历史缓冲区
self.history = deque(maxlen=persistence_frames)
self.current_alert = None
self.alert_start_frame = None
self.last_alert_end_frame = -cooldown_frames

def update(self,
frame_idx: int,
probabilities: np.ndarray,
class_names: List[str]) -> Optional[Alert]:
"""更新时序决策

Args:
frame_idx: 当前帧索引
probabilities: (17,) 概率分布
class_names: 类别名称列表

Returns:
Alert 或 None
"""
# 获取最大概率类别
max_idx = np.argmax(probabilities)
max_prob = probabilities[max_idx]
max_class = class_names[max_idx]

# 正常驾驶不产生警告
if max_class == 'normal':
self.history.clear()
self.current_alert = None
return None

# 置信度检查
if max_prob < self.confidence_threshold:
return None

# 记录历史
self.history.append((frame_idx, max_class, max_prob))

# 检查持续性
if self._check_persistence(max_class):
# 生成警告
if self.current_alert != max_class:
# 新警告
self.current_alert = max_class
self.alert_start_frame = frame_idx - self.persistence_frames

# 冷却检查
if frame_idx - self.last_alert_end_frame >= self.cooldown_frames:
return Alert(
behavior=max_class,
start_frame=self.alert_start_frame,
end_frame=frame_idx,
confidence=np.mean([p for _, _, p in self.history])
)

return None

def _check_persistence(self, target_class: str) -> bool:
"""检查持续性:连续 persistence_frames 都是同一类别"""
if len(self.history) < self.persistence_frames:
return False

recent = list(self.history)[-self.persistence_frames:]
classes = [c for _, c, _ in recent]

# 所有的都是同一类别
return all(c == target_class for c in classes)

def reset(self):
"""重置状态"""
self.history.clear()
self.current_alert = None
self.alert_start_frame = None

边缘部署代码示例

TensorFlow Lite INT8 量化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import tensorflow as tf
import numpy as np

class DMSModelQuantizer:
"""DMS 模型量化器"""

def __init__(self, model_path: str, calibration_data: np.ndarray):
"""
Args:
model_path: 原始模型路径
calibration_data: 校准数据 (N, H, W, 3)
"""
self.model = tf.keras.models.load_model(model_path)
self.calibration_data = calibration_data

def quantize_to_int8(self, output_path: str):
"""量化为 INT8 TFLite 模型"""

# 转换器
converter = tf.lite.TFLiteConverter.from_keras_model(self.model)

# 量化配置
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = self._representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

# 转换
quantized_model = converter.convert()

# 保存
with open(output_path, 'wb') as f:
f.write(quantized_model)

return quantized_model

def _representative_dataset(self):
"""校准数据生成器"""
for i in range(len(self.calibration_data)):
yield [self.calibration_data[i:i+1]]

# 使用示例
if __name__ == '__main__':
# 加载校准数据(真实驾驶场景)
calibration_images = np.load('calibration_data.npy') # (100, 224, 224, 3)

# 量化
quantizer = DMSModelQuantizer('dms_model.h5', calibration_images)
quantized_model = quantizer.quantize_to_int8('dms_model_int8.tflite')

print(f"量化后模型大小: {len(quantized_model) / 1024:.1f} KB")

Raspberry Pi 5 部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import tflite_runtime.interpreter as tflite
import numpy as np
import cv2
import time

class EdgeDMSInference:
"""边缘 DMS 推理引擎"""

def __init__(self, model_path: str, num_threads: int = 4):
"""
Args:
model_path: TFLite 模型路径
num_threads: CPU 线程数
"""
# 加载模型
self.interpreter = tflite.Interpreter(
model_path=model_path,
num_threads=num_threads
)
self.interpreter.allocate_tensors()

# 输入输出信息
self.input_details = self.interpreter.get_input_details()
self.output_details = self.interpreter.get_output_details()

# 输入尺寸
self.input_shape = self.input_details[0]['shape']
self.input_height = self.input_shape[1]
self.input_width = self.input_shape[2]

# 量化参数
self.input_scale = self.input_details[0]['quantization_parameters']['scales'][0]
self.input_zero_point = self.input_details[0]['quantization_parameters']['zero_points'][0]

# 类别名称
self.class_names = [
'normal', 'phone_talk_left', 'phone_talk_right',
'phone_text_left', 'phone_text_right', 'eating',
'drinking', 'smoking', 'look_left', 'look_down',
'look_right', 'reaching_behind', 'grooming',
'control_panel', 'yawning', 'eyes_closed_sleep'
]

# 时序决策头
self.temporal_head = TemporalDecisionHead()

# 性能统计
self.inference_times = []

def preprocess(self, frame: np.ndarray) -> np.ndarray:
"""预处理"""
# 缩放
resized = cv2.resize(frame, (self.input_width, self.input_height))

# 归一化
normalized = resized.astype(np.float32) / 255.0

# 量化为 INT8
quantized = (normalized / self.input_scale + self.input_zero_point).astype(np.uint8)

# 添加 batch 维度
return np.expand_dims(quantized, axis=0)

def inference(self, frame: np.ndarray, frame_idx: int) -> dict:
"""推理

Returns:
{
'behavior': str,
'confidence': float,
'alert': Optional[Alert],
'latency_ms': float
}
"""
start_time = time.time()

# 预处理
input_data = self.preprocess(frame)

# 推理
self.interpreter.set_tensor(self.input_details[0]['index'], input_data)
self.interpreter.invoke()

# 获取输出
output = self.interpreter.get_tensor(self.output_details[0]['index'])

# 反量化
output_scale = self.output_details[0]['quantization_parameters']['scales'][0]
output_zero_point = self.output_details[0]['quantization_parameters']['zero_points'][0]
probabilities = (output.astype(np.float32) - output_zero_point) * output_scale

# Softmax
probabilities = self._softmax(probabilities[0])

latency_ms = (time.time() - start_time) * 1000
self.inference_times.append(latency_ms)

# 时序决策
alert = self.temporal_head.update(frame_idx, probabilities, self.class_names)

# 获取预测类别
max_idx = np.argmax(probabilities)

return {
'behavior': self.class_names[max_idx],
'confidence': probabilities[max_idx],
'probabilities': probabilities,
'alert': alert,
'latency_ms': latency_ms
}

def _softmax(self, x: np.ndarray) -> np.ndarray:
"""Softmax"""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()

def get_stats(self) -> dict:
"""获取性能统计"""
return {
'avg_latency_ms': np.mean(self.inference_times),
'max_latency_ms': np.max(self.inference_times),
'min_latency_ms': np.min(self.inference_times),
'fps': 1000 / np.mean(self.inference_times)
}

实时推理循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import cv2
import time

class RealTimeDMS:
"""实时 DMS 系统"""

def __init__(self, model_path: str, camera_id: int = 0):
self.inference_engine = EdgeDMSInference(model_path)
self.cap = cv2.VideoCapture(camera_id)
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
self.cap.set(cv2.CAP_PROP_FPS, 30)

self.frame_idx = 0

def run(self):
"""运行实时推理"""
while True:
ret, frame = self.cap.read()
if not ret:
break

# 推理
result = self.inference_engine.inference(frame, self.frame_idx)

# 显示结果
self._display(frame, result)

# 警告处理
if result['alert']:
self._handle_alert(result['alert'])

self.frame_idx += 1

# 按 q 退出
if cv2.waitKey(1) & 0xFF == ord('q'):
break

# 打印统计
stats = self.inference_engine.get_stats()
print(f"\n性能统计:")
print(f" 平均延迟: {stats['avg_latency_ms']:.1f} ms")
print(f" FPS: {stats['fps']:.1f}")

self.cap.release()
cv2.destroyAllWindows()

def _display(self, frame: np.ndarray, result: dict):
"""显示结果"""
# 绘制行为和置信度
text = f"{result['behavior']}: {result['confidence']:.2f}"
cv2.putText(frame, text, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

# 如果有警告,绘制警告
if result['alert']:
cv2.putText(frame, f"ALERT: {result['alert'].behavior}",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)

# 显示延迟
cv2.putText(frame, f"{result['latency_ms']:.1f} ms",
(10, frame.shape[0] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

cv2.imshow('DMS', frame)

def _handle_alert(self, alert):
"""处理警告"""
print(f"[Frame {alert.end_frame}] {alert.behavior} "
f"(confidence: {alert.confidence:.2f})")

if __name__ == '__main__':
dms = RealTimeDMS('dms_model_int8.tflite')
dms.run()

混淆感知设计

常见混淆场景

真实行为 易混淆行为 原因
grooming phone_talk 手靠近面部
control_panel phone_text 手在胸前操作
yawning eating 嘴巴张开
look_down phone_text 头部低垂

解决方案

  1. 显式混淆类别:将易混淆行为作为独立类别训练
  2. 后处理映射:根据上下文映射到最终行为
  3. 时序过滤:持续性检查消除短暂混淆
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 混淆感知后处理
CONFUSION_MAP = {
'grooming': 'grooming', # 保持独立
'control_panel': 'control_panel', # 保持独立
# 易混淆映射
'phone_talk_if_grooming': 'grooming', # 如果前后是 grooming
}

def postprocess_with_confusion(predictions: List[str]) -> str:
"""混淆感知后处理"""
# 简单示例:多数投票
from collections import Counter
counts = Counter(predictions)
return counts.most_common(1)[0][0]

性能对比

不同硬件表现

硬件 模型 帧率 延迟 功耗
Raspberry Pi 5 (CPU) MobileNetV3 INT8 16 FPS 60 ms ~5W
Google Coral Edge-TPU MobileNetV3 INT8 25 FPS 40 ms ~2W
Jetson Nano MobileNetV3 FP16 30 FPS 33 ms ~10W
Intel NCS2 MobileNetV3 FP16 20 FPS 50 ms ~3W

准确率评估

数据集 准确率 说明
自有数据集 92.3% 多样化场景
DMD (Distracted Driver) 89.7% 公开基准
实车测试 88.5% 真实驾驶

对 IMS 开发的启示

1. 边缘优先设计

原则 说明
轻量模型 MobileNetV3 / EfficientNet-Lite
INT8 量化 减少延迟和内存
单摄像头 简化部署
时序决策 消除抖动

2. 部署平台选择

场景 推荐平台
原型开发 Raspberry Pi 5 (成本低、开发快)
量产部署 Qualcomm 8255 / Renesas V4H
后装市场 Coral Edge-TPU (功耗低)

3. Euro NCAP 合规

  • 边缘部署必须满足 < 100 ms 延迟要求
  • 时序决策头满足”持续性检测”要求
  • 混淆感知设计减少误报

参考资源