低延迟嵌入式 DMS:多任务神经网络实时部署(Jetson Nano/Xavier NX 实测)

论文信息

  • 标题: Low-Latency Embedded Driver Monitoring System with a Multi-Task Neural Network
  • 作者: Carmelo Scribano, Giovanni Cappelletti, Elia Giacobazzi, Giorgia Franchini, Paolo Burgio, Marko Bertogna
  • 机构: University of Modena and Reggio Emilia, Italy
  • 链接: arXiv:2605.02563
  • 代码: GitHub - cscribano/MtDMS

核心创新

单次前向传播同时输出 6 类 DMS 指标,在 Jetson Nano 上实现 ≤100ms 端到端延迟。

传统 DMS 管道串联多个模型(人脸检测 → 特征提取 → 分类),延迟高、计算冗余。本文提出多任务 CNN,一次推理完成所有任务,适合边缘部署。

方法详解

1. 系统架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
┌────────────────────────────────────────────────────────────────┐
│ Multi-Task DMS Pipeline │
├────────────────────────────────────────────────────────────────┤
│ │
[摄像头输入][人脸检测 SSD][人脸 ROI 裁剪]
│ ↓ │
[Multi-Task CNN]
│ ↓ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 输出 209 维向量: │ │
│ │ - 98 个面部关键点(回归) │ │
│ │ - 左/右眼开度(回归 + 分类) │ │
│ │ - 嘴巴开度(3 分类) │ │
│ │ - 头部姿态 yaw/pitch/roll(回归) │ │
│ │ - 分心动作识别(3 分类) │ │
│ └──────────────────────────────────────────────────────┘ │
│ ↓ │
[后处理模块]
│ ↓ │
[PERCLOS / 疲劳 / 分心状态]
└────────────────────────────────────────────────────────────────┘

2. 多任务 CNN 设计

2.1 输入输出定义

输入:

  • RGB 人脸图像 $I \in \mathbb{R}^{3 \times w \times h}$(裁剪后的人脸区域)

输出(209 维向量):

输出项 类型 维度 说明
面部关键点 回归 98×2 = 196 归一化坐标 (0-1)
左眼开度 回归 1 连续值 (0=闭合, 1=完全睁开)
右眼开度 回归 1 连续值 (0=闭合, 1=完全睁开)
左眼可见性 二分类 1 0=被遮挡, 1=可见
右眼可见性 二分类 1 0=被遮挡, 1=可见
嘴巴开度 3 分类 3 闭合/半开/全开 (softmax)
头部姿态 回归 3 yaw, pitch, roll(欧拉角)
分心动作 3 分类 3 正常/使用手机/吸烟 (softmax)
总计 - 209 -

2.2 网络结构

骨干网络: MobileNet-v2(平衡效率与性能)

关键设计:

  • 深度可分离卷积(减少参数量)
  • 倒残差块(Inverted Residual)
  • 多尺度特征融合($\mathcal{F}_1, \mathcal{F}_2, \mathcal{F}_3$)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
Multi-Task DMS 模型架构

论文:Low-Latency Embedded Driver Monitoring System with a Multi-Task Neural Network
"""

import torch
import torch.nn as nn
from torchvision.models.mobilenetv2 import MobileNetV2, InvertedResidual

class MultiTaskDMS(nn.Module):
"""
多任务 DMS 模型

基于 MobileNet-v2 骨干,单次前向传播输出所有 DMS 指标
"""

def __init__(self, num_landmarks=98, num_actions=3, pretrained=True):
super().__init__()

# MobileNet-v2 骨干
mobilenet = MobileNetV2(pretrained=pretrained)

# 提取中间特征层
# MobileNet-v2 结构:features[0:18] 为不同阶段的 block
self.backbone = mobilenet.features

# 多尺度特征提取点
# B1: features[4] (stride 8, channels 24)
# B2: features[7] (stride 16, channels 32)
# B3: features[17] (stride 32, channels 320)

# 特征融合层
self.fusion = nn.Sequential(
nn.AdaptiveAvgPool2d(1),
nn.Flatten()
)

# 计算融合后的特征维度
# features[17] 输出 320 通道
fusion_dim = 320

# 多任务输出头
self.landmark_head = nn.Linear(fusion_dim, num_landmarks * 2)
self.eye_openness_head = nn.Linear(fusion_dim, 2) # 左右眼开度
self.eye_visibility_head = nn.Linear(fusion_dim, 2) # 左右眼可见性
self.mouth_head = nn.Linear(fusion_dim, 3) # 嘴巴状态
self.head_pose_head = nn.Linear(fusion_dim, 3) # yaw, pitch, roll
self.action_head = nn.Linear(fusion_dim, num_actions) # 分心动作

def forward(self, x):
"""
Args:
x: 输入人脸图像 (B, 3, 224, 224)

Returns:
outputs: dict, 包含所有任务输出
"""
# 骨干特征提取
features = self.backbone(x)

# 全局池化
fused = self.fusion(features) # (B, 320)

# 多任务输出
outputs = {
'landmarks': self.landmark_head(fused), # (B, 196)
'eye_openness': torch.sigmoid(self.eye_openness_head(fused)), # (B, 2)
'eye_visibility': torch.sigmoid(self.eye_visibility_head(fused)), # (B, 2)
'mouth_state': torch.softmax(self.mouth_head(fused), dim=-1), # (B, 3)
'head_pose': self.head_pose_head(fused), # (B, 3)
'action': torch.softmax(self.action_head(fused), dim=-1) # (B, 3)
}

return outputs

def get_output_vector(self, x):
"""
获取 209 维输出向量(论文定义)
"""
outputs = self.forward(x)

# 拼接所有输出
vector = torch.cat([
outputs['landmarks'], # 196
outputs['eye_openness'], # 2
outputs['eye_visibility'], # 2
outputs['mouth_state'], # 3
outputs['head_pose'], # 3
outputs['action'] # 3
], dim=-1)

return vector # (B, 209)


# ============ 测试代码 ============

if __name__ == "__main__":
# 创建模型
model = MultiTaskDMS(num_landmarks=98, num_actions=3, pretrained=False)

# 模拟输入
batch_size = 4
x = torch.randn(batch_size, 3, 224, 224)

# 前向传播
model.eval()
with torch.no_grad():
outputs = model(x)
vector = model.get_output_vector(x)

# 打印输出形状
print("=== 多任务输出 ===")
for name, output in outputs.items():
print(f"{name}: {output.shape}")

print(f"\n总输出向量: {vector.shape}")

# 统计参数量
total_params = sum(p.numel() for p in model.parameters())
print(f"\n模型参数量: {total_params / 1e6:.2f}M")

3. 疲劳检测算法

3.1 PERCLOS 计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def calculate_perclos(eye_openness_sequence, threshold=0.2, window_sec=60, fps=30):
"""
计算 PERCLOS(眼睑闭合时间百分比)

Args:
eye_openness_sequence: 眼睑开度序列 (N,),值域 [0, 1]
threshold: 闭眼阈值,开度 < threshold 视为闭眼
window_sec: 滑动窗口(秒)
fps: 帧率

Returns:
perclos: PERCLOS 值(百分比)
"""
window_frames = int(window_sec * fps)

if len(eye_openness_sequence) < window_frames:
return 0.0

# 滑动窗口计算
window = eye_openness_sequence[-window_frames:]
closed_frames = np.sum(window < threshold)
perclos = (closed_frames / window_frames) * 100

return perclos


def calculate_ear(landmarks, eye_indices):
"""
计算 EAR(Eye Aspect Ratio)

EAR = (|p2-p6| + |p3-p5|) / (2 * |p1-p4|)

Args:
landmarks: 面部关键点 (68 或 98 个点)
eye_indices: 眼睛 6 个关键点索引

Returns:
ear: 眼睛纵横比
"""
# 提取眼睛 6 个关键点
p1, p2, p3, p4, p5, p6 = [landmarks[i] for i in eye_indices]

# 计算垂直距离
vertical_1 = np.linalg.norm(p2 - p6)
vertical_2 = np.linalg.norm(p3 - p5)

# 计算水平距离
horizontal = np.linalg.norm(p1 - p4)

# EAR
ear = (vertical_1 + vertical_2) / (2.0 * horizontal)

return ear

3.2 安全分数计算

论文提出加权安全分数模型:

$$
\text{Safeness Score} = \lambda_1 S_{\text{perclos}} - \lambda_2 S_{\text{mouth}} - \lambda_3 (1 - S_{\text{head}}) - \lambda_4 (1 - S_{\text{action}})
$$

其中:

  • $S_{\text{perclos}}$:PERCLOS 风险分数(0, 1, 2)
  • $S_{\text{mouth}}$:嘴巴状态风险分数(检测哈欠)
  • $S_{\text{head}}$:头部姿态风险分数(视线偏离)
  • $S_{\text{action}}$:分心动作风险分数(手机/吸烟)

阈值设置:

指标 低风险阈值 $\tau_{\text{low}}$ 高风险阈值 $\tau_{\text{high}}$
PERCLOS 15% 30%
嘴巴开度频率 3次/分钟 5次/分钟
头部偏转角度 20° 40°
分心动作时长 3秒 5秒

4. 有限状态机(FSM)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class DMSStateMachine:
"""
DMS 状态机

状态转换逻辑:
- Calibration → Safe(初始化完成)
- Safe → Low Awareness(检测到中等风险)
- Low Awareness → Safe(风险消除)
- Low Awareness → Danger(风险升级)
- Danger → Safe(风险消除)
- 任意状态 → Calibration(重新校准)
"""

STATES = ['CALIBRATION', 'SAFE', 'LOW_AWARENESS', 'DANGER']

def __init__(self, thresholds):
self.state = 'CALIBRATION'
self.thresholds = thresholds

# 基线参数(校准阶段设置)
self.baseline_ear = None
self.baseline_head_pose = None

def update(self, metrics):
"""
根据当前指标更新状态

Args:
metrics: {
'perclos': float,
'mouth_state': int,
'head_pose_deviation': float,
'action': int
}
"""
# 计算各指标风险分数
S_perclos = self._compute_risk_score(
metrics['perclos'],
self.thresholds['perclos_low'],
self.thresholds['perclos_high']
)

S_mouth = self._compute_risk_score(
metrics['mouth_state'],
self.thresholds['mouth_low'],
self.thresholds['mouth_high']
)

S_head = self._compute_risk_score(
metrics['head_pose_deviation'],
self.thresholds['head_low'],
self.thresholds['head_high']
)

S_action = self._compute_risk_score(
metrics['action'],
self.thresholds['action_low'],
self.thresholds['action_high']
)

# 计算全局安全分数
# Safeness Score = λ1*S_perclos - λ2*S_mouth - λ3*(1-S_head) - λ4*(1-S_action)
safeness = (
0.4 * S_perclos -
0.2 * S_mouth -
0.2 * (1 - S_head) -
0.2 * (1 - S_action)
)

# 状态转换
if self.state == 'CALIBRATION':
# 校准完成后进入 Safe 状态
if self._calibration_complete():
self.state = 'SAFE'

elif self.state == 'SAFE':
if safeness < 0.3:
self.state = 'LOW_AWARENESS'

elif self.state == 'LOW_AWARENESS':
if safeness < 0:
self.state = 'DANGER'
elif safeness > 0.5:
self.state = 'SAFE'

elif self.state == 'DANGER':
if safeness > 0.3:
self.state = 'LOW_AWARENESS'
if safeness > 0.6:
self.state = 'SAFE'

return self.state, safeness

def _compute_risk_score(self, value, tau_low, tau_high):
"""
计算风险分数(0, 1, 2)

S = [value > τ_low] + [value > τ_high]
"""
score = 0
if value > tau_low:
score += 1
if value > tau_high:
score += 1
return score

def _calibration_complete(self):
"""检查校准是否完成"""
return self.baseline_ear is not None and self.baseline_head_pose is not None

def calibrate(self, ear, head_pose):
"""设置基线参数"""
self.baseline_ear = ear
self.baseline_head_pose = head_pose


# ============ 测试状态机 ============

if __name__ == "__main__":
# 初始化状态机
thresholds = {
'perclos_low': 15,
'perclos_high': 30,
'mouth_low': 3,
'mouth_high': 5,
'head_low': 20,
'head_high': 40,
'action_low': 3,
'action_high': 5
}

fsm = DMSStateMachine(thresholds)

# 模拟校准
fsm.calibrate(ear=0.25, head_pose=(0, 0, 0))
print(f"校准后状态: {fsm.state}")

# 模拟驾驶场景
test_scenarios = [
{'perclos': 10, 'mouth_state': 0, 'head_pose_deviation': 5, 'action': 0},
{'perclos': 20, 'mouth_state': 2, 'head_pose_deviation': 15, 'action': 0},
{'perclos': 35, 'mouth_state': 4, 'head_pose_deviation': 30, 'action': 1},
{'perclos': 45, 'mouth_state': 5, 'head_pose_deviation': 50, 'action': 2},
]

for i, metrics in enumerate(test_scenarios):
state, safeness = fsm.update(metrics)
print(f"场景 {i+1}: 状态={state}, 安全分数={safeness:.2f}")

实验结果

1. 延迟对比

平台 模型 分辨率 延迟(FP32) 延迟(FP16) 延迟(INT8)
Jetson Nano Multi-Task 224×224 87ms 52ms 38ms
Jetson Nano 串行管道 224×224 156ms 98ms 72ms
Jetson Xavier NX Multi-Task 224×224 23ms 14ms 11ms
Jetson Xavier NX 串行管道 224×224 45ms 28ms 22ms
RTX 3080 Multi-Task 224×224 8ms 5ms -

关键发现:

  • 多任务模型比串行管道快 40-50%
  • INT8 量化后 Jetson Nano 可达 38ms 延迟(满足实时要求)
  • Xavier NX 完全满足 Euro NCAP 实时性要求(≤30ms)

2. 精度对比

任务 单任务模型精度 多任务模型精度 差异
面部关键点 98.2% 97.8% -0.4%
眼睛开度 96.5% 95.9% -0.6%
头部姿态 94.3% 93.7% -0.6%
分心动作 92.1% 91.4% -0.7%

结论: 多任务学习带来的精度损失可接受(<1%),换取显著的速度提升。

3. 功耗与散热

平台 模式 功耗 温升
Jetson Nano 5W 4.2W +12°C
Jetson Nano 10W 6.8W +18°C
Jetson Xavier NX 15W 12.1W +15°C

Jetson 部署实践

1. TensorRT 优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""
TensorRT 优化脚本

将 PyTorch 模型转换为 TensorRT 引擎
"""

import torch
import torch.onnx
import tensorrt as trt
import onnx

def export_onnx(model, input_shape=(1, 3, 224, 224), onnx_path="mtdms.onnx"):
"""
导出 ONNX 模型
"""
model.eval()
dummy_input = torch.randn(*input_shape)

torch.onnx.export(
model,
dummy_input,
onnx_path,
opset_version=11,
input_names=['input'],
output_names=['landmarks', 'eye_openness', 'eye_visibility',
'mouth_state', 'head_pose', 'action'],
dynamic_axes={
'input': {0: 'batch_size'},
'landmarks': {0: 'batch_size'},
'eye_openness': {0: 'batch_size'},
'eye_visibility': {0: 'batch_size'},
'mouth_state': {0: 'batch_size'},
'head_pose': {0: 'batch_size'},
'action': {0: 'batch_size'}
}
)

print(f"ONNX 模型已保存: {onnx_path}")
return onnx_path


def build_tensorrt_engine(onnx_path, engine_path, precision='fp16'):
"""
构建 TensorRT 引擎

Args:
onnx_path: ONNX 模型路径
engine_path: TensorRT 引擎保存路径
precision: 'fp32', 'fp16', 'int8'
"""
logger = trt.Logger(trt.Logger.WARNING)
builder = trt.Builder(logger)
network = builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
parser = trt.OnnxParser(network, logger)

# 解析 ONNX 模型
with open(onnx_path, 'rb') as f:
if not parser.parse(f.read()):
for error in range(parser.num_errors):
print(parser.get_error(error))
return None

# 配置 builder
config = builder.create_builder_config()

# 设置精度
if precision == 'fp16':
config.set_flag(trt.BuilderFlag.FP16)
elif precision == 'int8':
config.set_flag(trt.BuilderFlag.INT8)
# 需要提供校准数据集
# config.int8_calibrator = MyCalibrator()

# 设置最大工作空间
config.max_workspace_size = 1 << 30 # 1GB

# 构建引擎
engine = builder.build_engine(network, config)

# 保存引擎
with open(engine_path, 'wb') as f:
f.write(engine.serialize())

print(f"TensorRT 引擎已保存: {engine_path}")
return engine_path


def benchmark_inference(engine_path, input_shape=(1, 3, 224, 224), num_iterations=100):
"""
基准测试推理延迟
"""
import pycuda.driver as cuda
import pycuda.autoinit
import numpy as np
import time

# 加载引擎
logger = trt.Logger(trt.Logger.WARNING)
with open(engine_path, 'rb') as f:
engine = trt.Runtime(logger).deserialize_cuda_engine(f.read())

context = engine.create_execution_context()

# 分配内存
input_name = 'input'
output_names = ['landmarks', 'eye_openness', 'eye_visibility',
'mouth_state', 'head_pose', 'action']

# 输入缓冲
input_size = trt.volume(input_shape) * np.dtype(np.float32).itemsize
input_buffer = cuda.mem_alloc(input_size)

# 输出缓冲
output_buffers = []
for name in output_names:
output_shape = engine.get_binding_shape(name)
output_size = trt.volume(output_shape) * np.dtype(np.float32).itemsize
output_buffers.append(cuda.mem_alloc(output_size))

# CUDA 流
stream = cuda.Stream()

# 预热
dummy_input = np.random.randn(*input_shape).astype(np.float32)
cuda.memcpy_htod_async(input_buffer, dummy_input, stream)
context.execute_async_v2([int(input_buffer)] + [int(b) for b in output_buffers], stream.handle)
stream.synchronize()

# 基准测试
latencies = []
for _ in range(num_iterations):
start = time.time()

cuda.memcpy_htod_async(input_buffer, dummy_input, stream)
context.execute_async_v2([int(input_buffer)] + [int(b) for b in output_buffers], stream.handle)
stream.synchronize()

latencies.append((time.time() - start) * 1000)

print(f"平均延迟: {np.mean(latencies):.2f} ms")
print(f"P99 延迟: {np.percentile(latencies, 99):.2f} ms")
print(f"吞吐量: {1000 / np.mean(latencies):.1f} FPS")


if __name__ == "__main__":
# 示例使用
from mtdms_model import MultiTaskDMS

# 1. 创建模型
model = MultiTaskDMS()
model.load_state_dict(torch.load('mtdms_weights.pth'))

# 2. 导出 ONNX
onnx_path = export_onnx(model)

# 3. 构建 TensorRT 引擎
engine_path = build_tensorrt_engine(onnx_path, 'mtdms_fp16.engine', precision='fp16')

# 4. 基准测试
benchmark_inference(engine_path)

2. 完整推理管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""
Jetson 实时 DMS 推理管道

整合人脸检测、多任务模型、后处理
"""

import cv2
import numpy as np
import torch
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit

class RealTimeDMS:
"""
实时 DMS 系统

面向 Jetson 平台优化
"""

def __init__(self,
face_detector_path='face_detector.engine',
mtdms_path='mtdms_fp16.engine'):

# 加载人脸检测器(SSD)
self.face_detector = self._load_trt_engine(face_detector_path)

# 加载多任务模型
self.mtdms = self._load_trt_engine(mtdms_path)

# 历史数据(用于 PERCLOS 计算)
self.eye_openness_history = []
self.mouth_state_history = []
self.max_history = 1800 # 60秒 @ 30fps

# 校准参数
self.baseline_ear = None

def process_frame(self, frame):
"""
处理单帧图像

Args:
frame: BGR 图像 (H, W, 3)

Returns:
result: {
'fatigue_level': int,
'distraction': bool,
'eye_openness': float,
'head_pose': tuple,
'action': str,
'landmarks': np.ndarray
}
"""
# 1. 人脸检测
face_bbox = self._detect_face(frame)

if face_bbox is None:
return None

# 2. 裁剪人脸
x1, y1, x2, y2 = face_bbox
face = frame[y1:y2, x1:x2]
face = cv2.resize(face, (224, 224))
face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
face = face.transpose(2, 0, 1).astype(np.float32) / 255.0
face = np.expand_dims(face, 0)

# 3. 多任务推理
outputs = self._infer_mtdms(face)

# 4. 更新历史
eye_openness = (outputs['eye_openness'][0, 0] + outputs['eye_openness'][0, 1]) / 2
self.eye_openness_history.append(eye_openness)
if len(self.eye_openness_history) > self.max_history:
self.eye_openness_history.pop(0)

# 5. 计算 PERCLOS
perclos = self._calculate_perclos()

# 6. 判断疲劳等级
fatigue_level = 0
if perclos > 15:
fatigue_level = 1
if perclos > 30:
fatigue_level = 2

# 7. 判断分心
action_idx = outputs['action'].argmax()
action_map = {0: 'normal', 1: 'phone', 2: 'smoking'}
distraction = action_idx > 0

return {
'fatigue_level': fatigue_level,
'distraction': distraction,
'eye_openness': eye_openness,
'head_pose': tuple(outputs['head_pose'][0]),
'action': action_map[action_idx],
'landmarks': outputs['landmarks'][0].reshape(-1, 2),
'perclos': perclos
}

def _detect_face(self, frame):
"""人脸检测"""
# 使用 TensorRT 引擎推理
# 简化实现:使用 OpenCV DNN
blob = cv2.dnn.blobFromImage(frame, 1.0, (320, 240))
self.face_detector.setInput(blob)
detections = self.face_detector.forward()

# 提取边界框
h, w = frame.shape[:2]
for i in range(detections.shape[2]):
confidence = detections[0, 0, i, 2]
if confidence > 0.5:
x1 = int(detections[0, 0, i, 3] * w)
y1 = int(detections[0, 0, i, 4] * h)
x2 = int(detections[0, 0, i, 5] * w)
y2 = int(detections[0, 0, i, 6] * h)
return (x1, y1, x2, y2)

return None

def _infer_mtdms(self, face_tensor):
"""多任务模型推理"""
# TensorRT 推理
# 简化实现:使用 PyTorch
with torch.no_grad():
outputs = self.mtdms(torch.from_numpy(face_tensor).cuda())
return outputs

def _calculate_perclos(self):
"""计算 PERCLOS"""
if len(self.eye_openness_history) < 900: # 至少 30 秒数据
return 0.0

window = self.eye_openness_history[-1800:] # 60 秒窗口
threshold = 0.2
closed_frames = sum(1 for e in window if e < threshold)
return (closed_frames / len(window)) * 100

def _load_trt_engine(self, path):
"""加载 TensorRT 引擎"""
logger = trt.Logger(trt.Logger.WARNING)
with open(path, 'rb') as f:
return trt.Runtime(logger).deserialize_cuda_engine(f.read())


# ============ 主程序 ============

if __name__ == "__main__":
import time

# 初始化 DMS
dms = RealTimeDMS()

# 打开摄像头
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)

fps_counter = []

while True:
ret, frame = cap.read()
if not ret:
break

start = time.time()

# 处理帧
result = dms.process_frame(frame)

latency = (time.time() - start) * 1000
fps_counter.append(latency)

if result:
# 绘制结果
cv2.putText(frame, f"Fatigue: {result['fatigue_level']}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(frame, f"PERCLOS: {result['perclos']:.1f}%", (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(frame, f"Action: {result['action']}", (10, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

cv2.putText(frame, f"FPS: {1000/np.mean(fps_counter[-30:]):.1f}", (550, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

cv2.imshow('DMS', frame)

if cv2.waitKey(1) & 0xFF == ord('q'):
break

cap.release()
cv2.destroyAllWindows()

print(f"平均延迟: {np.mean(fps_counter):.2f} ms")
print(f"P99 延迟: {np.percentile(fps_counter, 99):.2f} ms")

IMS 开发启示

1. 部署优先级

平台 适用场景 推荐配置
Jetson Nano 后装市场、低成本方案 INT8 量化 + 224×224 分辨率
Jetson Xavier NX 前装量产、高端车型 FP16 + 320×320 分辨率
QCS8255 Qualcomm 方案 SNPE + INT8

2. Euro NCAP 2026 对接

实时性要求:

  • Euro NCAP 要求 DMS 响应时间 ≤3 秒
  • 本方案单帧延迟 38ms(Jetson Nano INT8),完全满足要求

检测场景覆盖:

Euro NCAP 场景 本方案支持 备注
疲劳检测(PERCLOS) 60 秒窗口计算
眼睛闭合检测 眼睑开度回归
视线偏离检测 头部姿态估计
手机使用检测 分心动作分类
吸烟检测 分心动作分类

3. 与高通平台集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# SNPE 部署接口
class SnapdragonDMS:
"""
Qualcomm SNPE 部署

适配 QCS8255 / QCS8295 平台
"""

def __init__(self, dlc_path):
import snpe

# 加载 DLC 模型
self.model = snpe.Model(dlc_path)

# 配置运行时
self.runtime = snpe.Runtime(
runtime=snpe.Runtime.GPU, # 或 DSP
precision=snpe.Precision.INT8
)

def infer(self, image):
"""
推理接口

Args:
image: numpy array (H, W, C)

Returns:
outputs: dict
"""
# 预处理
input_tensor = self._preprocess(image)

# SNPE 推理
outputs = self.model.execute(input_tensor, self.runtime)

return outputs

4. 性能优化建议

优化项 方法 预期收益
模型量化 FP32 → INT8 2.3x 加速
输入分辨率 320×320 → 224×224 1.5x 加速
批处理 单帧 → 批处理 4 帧 1.3x 加速
算子融合 TensorRT 自动融合 1.2x 加速
知识蒸馏 大模型 → 小模型 1.5x 加速(精度损失 <1%)

参考文献

  1. Howard, A. G., et al. “MobileNets: Efficient Convolutional Neural Networks for Mobile Vision Applications.” arXiv, 2017.
  2. Walsman, A., et al. “Distracted Driver Detection using Deep Learning.” IEEE IV, 2022.
  3. Euro NCAP. “Euro NCAP Assessment Protocol - Safe Driving.” 2026.

总结: 本文提出的 Multi-Task DMS 通过单次前向传播输出 6 类 DMS 指标,在 Jetson Nano 上实现 38ms 延迟,满足 Euro NCAP 2026 实时性要求。对于 IMS 开发,建议优先采用多任务架构,结合 TensorRT/INT8 量化进行边缘部署优化。


低延迟嵌入式 DMS:多任务神经网络实时部署(Jetson Nano/Xavier NX 实测)
https://dapalm.com/2026/06/05/2026-06-05-MultiTask-DMS-Low-Latency-Deployment/
作者
Mars
发布于
2026年6月5日
许可协议